1# 2# Copyright 2021 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import time 17import logging 18 19from bluetooth_packets_python3 import hci_packets 20from cert.event_stream import EventStream 21from cert.gd_base_test import GdBaseTestClass 22from cert.matchers import HciMatchers, IsoMatchers, L2capMatchers 23from cert.metadata import metadata 24from cert.py_hci import PyHci 25from cert.py_l2cap import PyLeL2cap 26from cert.py_le_iso import PyLeIso 27from cert.py_le_iso import CisTestParameters 28from cert.truth import assertThat 29from datetime import timedelta 30from facade import common_pb2 as common 31from hci.facade import controller_facade_pb2 as controller_facade 32from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade 33from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade 34from google.protobuf import empty_pb2 as empty_proto 35from neighbor.facade import facade_pb2 as neighbor_facade 36from l2cap.le.cert.cert_le_l2cap import CertLeL2cap 37from iso.cert.cert_le_iso import CertLeIso 38 39import time 40from bluetooth_packets_python3.hci_packets import OpCode 41 42 43class LeIsoTest(GdBaseTestClass): 44 """ 45 Collection of tests that each sample results from 46 different (unique) combinations of io capabilities, authentication requirements, and oob data. 47 """ 48 49 def setup_class(self): 50 super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES') 51 52 def setup_test(self): 53 super().setup_test() 54 55 self.dut_l2cap = PyLeL2cap(self.dut) 56 self.cert_l2cap = CertLeL2cap(self.cert) 57 self.dut_address = common.BluetoothAddressWithType( 58 address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) 59 self.cert_address = common.BluetoothAddressWithType( 60 address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS) 61 dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy( 62 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 63 address_with_type=self.dut_address, 64 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 65 minimum_rotation_time=0, 66 maximum_rotation_time=0) 67 self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy) 68 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 69 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 70 address_with_type=self.cert_address, 71 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 72 minimum_rotation_time=0, 73 maximum_rotation_time=0) 74 self.cert_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 75 76 self.dut_iso = PyLeIso(self.dut) 77 self.cert_iso = CertLeIso(self.cert) 78 79 def teardown_test(self): 80 self.dut_iso.close() 81 self.cert_iso.close() 82 83 self.cert_l2cap.close() 84 self.dut_l2cap.close() 85 super().teardown_test() 86 87 #cert becomes central of connection, dut peripheral 88 def _setup_link_from_cert(self): 89 # DUT Advertises 90 gap_name = hci_packets.GapData() 91 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 92 gap_name.data = list(bytes(b'Im_The_DUT')) 93 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 94 config = le_advertising_facade.AdvertisingConfig( 95 advertisement=[gap_data], 96 interval_min=512, 97 interval_max=768, 98 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 99 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 100 channel_map=7, 101 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 102 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 103 create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) 104 self.cert_l2cap.connect_le_acl(self.dut_address) 105 106 def _setup_cis_from_cert(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval, 107 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, 108 max_transport_latency_s_to_m, cis_configs): 109 self.cert_iso.le_set_cig_parameters_test(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, 110 iso_interval, peripherals_clock_accuracy, packing, framing, 111 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 112 cis_configs) 113 114 cis_handles = self.cert_iso.wait_le_set_cig_parameters_complete() 115 116 cis_handle = cis_handles[0] 117 118 acl_connection_handle = self.cert_l2cap._le_acl.handle 119 self.cert_iso.le_cretate_cis([(cis_handle, acl_connection_handle)]) 120 dut_cis_stream = self.dut_iso.wait_le_cis_established() 121 cert_cis_stream = self.cert_iso.wait_le_cis_established() 122 return (dut_cis_stream, cert_cis_stream) 123 124 @metadata( 125 pts_test_id="IAL/CIS/UNF/SLA/BV-01-C", 126 pts_test_name="connected isochronous stream, unframed data, peripheral role") 127 def test_iso_cis_unf_sla_bv_01_c(self): 128 """ 129 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 130 """ 131 cig_id = 0x01 132 sdu_interval_m_to_s = 0 133 sdu_interval_s_to_m = 0x186a 134 ft_m_to_s = 0 135 ft_s_to_m = 1 136 iso_interval = 0x0A 137 peripherals_clock_accuracy = 0 138 packing = 0 139 framing = 0 140 max_transport_latency_m_to_s = 0 141 max_transport_latency_s_to_m = 0 142 cis_configs = [ 143 CisTestParameters( 144 cis_id=0x01, 145 nse=2, 146 max_sdu_m_to_s=100, 147 max_sdu_s_to_m=100, 148 max_pdu_m_to_s=100, 149 max_pdu_s_to_m=100, 150 phy_m_to_s=0x02, 151 phy_s_to_m=0x00, 152 bn_m_to_s=0, 153 bn_s_to_m=2, 154 ) 155 ] 156 157 self._setup_link_from_cert() 158 (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert( 159 cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval, 160 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, 161 cis_configs) 162 dut_cis_stream.send(b'abcdefgh' * 10) 163 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 164 165 @metadata( 166 pts_test_id="IAL/CIS/UNF/SLA/BV-25-C", 167 pts_test_name="connected isochronous stream, unframed data, peripheral role") 168 def test_iso_cis_unf_sla_bv_25_c(self): 169 """ 170 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 171 """ 172 cig_id = 0x01 173 sdu_interval_m_to_s = 0x7530 174 sdu_interval_s_to_m = 0x7530 175 ft_m_to_s = 3 176 ft_s_to_m = 2 177 iso_interval = 0x18 178 peripherals_clock_accuracy = 0 179 packing = 0 180 framing = 0 181 max_transport_latency_m_to_s = 0 182 max_transport_latency_s_to_m = 0 183 cis_configs = [ 184 CisTestParameters( 185 cis_id=0x01, 186 nse=5, 187 max_sdu_m_to_s=100, 188 max_sdu_s_to_m=100, 189 max_pdu_m_to_s=100, 190 max_pdu_s_to_m=100, 191 phy_m_to_s=0x02, 192 phy_s_to_m=0x00, 193 bn_m_to_s=3, 194 bn_s_to_m=1, 195 ) 196 ] 197 198 self._setup_link_from_cert() 199 (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert( 200 cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval, 201 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, 202 cis_configs) 203 dut_cis_stream.send(b'abcdefgh' * 10) 204 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 205 206 @metadata( 207 pts_test_id="IAL/CIS/FRA/SLA/BV-03-C", 208 pts_test_name="connected isochronous stream, framed data, peripheral role") 209 def test_iso_cis_fra_sla_bv_03_c(self): 210 """ 211 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 212 """ 213 cig_id = 0x01 214 sdu_interval_m_to_s = 0x0000 215 sdu_interval_s_to_m = 0x4e30 216 ft_m_to_s = 0 217 ft_s_to_m = 2 218 iso_interval = 0x14 219 peripherals_clock_accuracy = 0 220 packing = 0 221 framing = 1 222 max_transport_latency_m_to_s = 0 223 max_transport_latency_s_to_m = 0 224 cis_configs = [ 225 CisTestParameters( 226 cis_id=0x01, 227 nse=4, 228 max_sdu_m_to_s=100, 229 max_sdu_s_to_m=100, 230 max_pdu_m_to_s=100, 231 max_pdu_s_to_m=100, 232 phy_m_to_s=0x02, 233 phy_s_to_m=0x00, 234 bn_m_to_s=0, 235 bn_s_to_m=2, 236 ) 237 ] 238 239 self._setup_link_from_cert() 240 (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert( 241 cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval, 242 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, 243 cis_configs) 244 dut_cis_stream.send(b'abcdefgh' * 10) 245 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 246 247 @metadata( 248 pts_test_id="IAL/CIS/FRA/SLA/BV-26-C", 249 pts_test_name="connected isochronous stream, framed data, peripheral role") 250 def test_iso_cis_fra_sla_bv_26_c(self): 251 """ 252 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 253 """ 254 cig_id = 0x01 255 sdu_interval_m_to_s = 0x14D5 256 sdu_interval_s_to_m = 0x14D5 257 ft_m_to_s = 1 258 ft_s_to_m = 1 259 iso_interval = 0x08 260 peripherals_clock_accuracy = 0 261 packing = 0 262 framing = 1 263 max_transport_latency_m_to_s = 0 264 max_transport_latency_s_to_m = 0 265 cis_configs = [ 266 CisTestParameters( 267 cis_id=0x01, 268 nse=2, 269 max_sdu_m_to_s=100, 270 max_sdu_s_to_m=100, 271 max_pdu_m_to_s=100, 272 max_pdu_s_to_m=100, 273 phy_m_to_s=0x02, 274 phy_s_to_m=0x00, 275 bn_m_to_s=1, 276 bn_s_to_m=1, 277 ) 278 ] 279 280 self._setup_link_from_cert() 281 (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert( 282 cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval, 283 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, 284 cis_configs) 285 dut_cis_stream.send(b'abcdefgh' * 10) 286 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 287