1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from bluetooth_packets_python3 import hci_packets 20from cert.capture import Capture 21from cert.captures import SecurityCaptures 22from cert.closable import Closable 23from cert.closable import safeClose 24from cert.event_stream import EventStream, IEventStream 25from cert.event_stream import FilteringEventStream 26from cert.matchers import IsoMatchers 27from cert.truth import assertThat 28from datetime import timedelta 29from facade import common_pb2 as common 30from google.protobuf import empty_pb2 as empty_proto 31from iso import facade_pb2 as iso_facade_pb2 32 33 34class CisTestParameters(): 35 36 def __init__(self, cis_id, nse, max_sdu_m_to_s, max_sdu_s_to_m, max_pdu_m_to_s, max_pdu_s_to_m, phy_m_to_s, 37 phy_s_to_m, bn_m_to_s, bn_s_to_m): 38 self.cis_id = cis_id 39 self.nse = nse 40 self.max_sdu_m_to_s = max_sdu_m_to_s 41 self.max_sdu_s_to_m = max_sdu_s_to_m 42 self.max_pdu_m_to_s = max_pdu_m_to_s 43 self.max_pdu_s_to_m = max_pdu_s_to_m 44 self.phy_m_to_s = phy_m_to_s 45 self.phy_s_to_m = phy_s_to_m 46 self.bn_m_to_s = bn_m_to_s 47 self.bn_s_to_m = bn_s_to_m 48 49 50class PyLeIsoStream(IEventStream): 51 52 def __init__(self, device, cis_handle, iso_data_stream): 53 self._device = device 54 self._cis_handle = cis_handle 55 self._le_iso_data_stream = iso_data_stream 56 self._our_le_iso_cis_view = FilteringEventStream( 57 self._le_iso_data_stream, IsoMatchers.PacketPayloadWithMatchingCisHandle(self._cis_handle)) 58 59 def get_event_queue(self): 60 return self._our_le_iso_cis_view.get_event_queue() 61 62 def send(self, payload): 63 self._device.iso.SendIsoPacket(iso_facade_pb2.IsoPacket(handle=self._cis_handle, payload=payload)) 64 65 66class PyLeIso(Closable): 67 """ 68 Abstraction for iso tasks and GRPC calls 69 """ 70 71 _iso_event_stream = None 72 73 def __init__(self, device): 74 logging.info("DUT: Init") 75 self._device = device 76 self._device.wait_channel_ready() 77 self._iso_event_stream = EventStream(self._device.iso.FetchIsoEvents(empty_proto.Empty())) 78 self._iso_data_stream = EventStream(self._device.iso.FetchIsoData(empty_proto.Empty())) 79 80 def close(self): 81 if self._iso_event_stream is not None: 82 safeClose(self._iso_event_stream) 83 else: 84 logging.info("DUT: ISO Event Stream is None!") 85 if self._iso_data_stream is not None: 86 safeClose(self._iso_data_stream) 87 else: 88 logging.info("DUT: ISO Data Stream is None!") 89 90 logging.info("DUT: close") 91 92 def le_set_cig_parameters(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy, 93 packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id, 94 max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, rtn_m_to_s, rtn_s_to_m): 95 96 resp = self._device.iso.LeSetCigParameters( 97 iso_facade_pb2.LeSetCigParametersRequest( 98 cig_id=cig_id, 99 sdu_interval_m_to_s=sdu_interval_m_to_s, 100 sdu_interval_s_to_m=sdu_interval_s_to_m, 101 peripherals_clock_accuracy=peripherals_clock_accuracy, 102 packing=packing, 103 framing=framing, 104 max_transport_latency_m_to_s=max_transport_latency_m_to_s, 105 max_transport_latency_s_to_m=max_transport_latency_s_to_m, 106 cis_id=cis_id, 107 max_sdu_m_to_s=max_sdu_m_to_s, 108 max_sdu_s_to_m=max_sdu_s_to_m, 109 phy_m_to_s=phy_m_to_s, 110 phy_s_to_m=phy_s_to_m, 111 rtn_m_to_s=rtn_m_to_s, 112 rtn_s_to_m=rtn_s_to_m)) 113 114 def le_set_cig_parameters_test(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, 115 iso_interval, peripherals_clock_accuracy, packing, framing, 116 max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_configs): 117 configs = [] 118 for cc in cis_configs: 119 configs.append( 120 iso_facade_pb2.LeSetCigParametersTestRequest.LeCisParametersTestConfig( 121 cis_id=cc.cis_id, 122 nse=cc.nse, 123 max_sdu_m_to_s=cc.max_sdu_m_to_s, 124 max_sdu_s_to_m=cc.max_sdu_s_to_m, 125 max_pdu_m_to_s=cc.max_pdu_m_to_s, 126 max_pdu_s_to_m=cc.max_pdu_s_to_m, 127 phy_m_to_s=cc.phy_m_to_s, 128 phy_s_to_m=cc.phy_s_to_m, 129 bn_m_to_s=cc.bn_m_to_s, 130 bn_s_to_m=cc.bn_s_to_m, 131 )) 132 133 resp = self._device.iso.LeSetCigParameters( 134 iso_facade_pb2.LeSetCigParametersTestRequest( 135 cig_id=cig_id, 136 sdu_interval_m_to_s=sdu_interval_m_to_s, 137 sdu_interval_s_to_m=sdu_interval_s_to_m, 138 ft_m_to_s=ft_m_to_s, 139 ft_s_to_m=ft_s_to_m, 140 iso_interval=iso_interval, 141 peripherals_clock_accuracy=peripherals_clock_accuracy, 142 packing=packing, 143 framing=framing, 144 max_transport_latency_m_to_s=max_transport_latency_m_to_s, 145 max_transport_latency_s_to_m=max_transport_latency_s_to_m, 146 cis_configs=configs)) 147 148 def wait_le_set_cig_parameters_complete(self): 149 set_cig_params_complete_capture = PyLeIso.IsoCigComplete(iso_facade_pb2.IsoMsgType.ISO_PARAMETERS_SET_COMPLETE) 150 151 assertThat(self._iso_event_stream).emits(set_cig_params_complete_capture, timeout=timedelta(seconds=5)) 152 return set_cig_params_complete_capture.get() 153 154 @staticmethod 155 def IsoCigComplete(type=None): 156 return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles) 157 158 @staticmethod 159 def _extract_cis_handles(event): 160 if event is None: 161 return None 162 return event.cis_handle 163 164 def le_create_cis(self, cis_and_acl_handle_array): 165 handles_pairs = [] 166 for hp_tmp in cis_and_acl_handle_array: 167 handles_pairs.append( 168 iso_facade_pb2.LeCreateCisRequest.HandlePair(cis_handle=hp_tmp[0], acl_handle=hp_tmp[1])) 169 170 self._device.iso.LeCreateCis(iso_facade_pb2.LeCreateCisRequest(handle_pair=handles_pairs)) 171 172 def wait_le_cis_established(self): 173 cis_establshed_capture = PyLeIso.IsoCigEstablished(iso_facade_pb2.IsoMsgType.ISO_CIS_ESTABLISHED) 174 assertThat(self._iso_event_stream).emits(cis_establshed_capture, timeout=timedelta(seconds=5)) 175 cis_handle = cis_establshed_capture.get()[0] 176 return PyLeIsoStream(self._device, cis_handle, self._iso_data_stream) 177 178 @staticmethod 179 def IsoCigEstablished(type): 180 return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles) 181