1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from cert.closable import safeClose 18from cert.gd_base_test import GdBaseTestClass 19from cert.event_stream import EventStream 20from cert.truth import assertThat 21from cert.py_le_acl_manager import PyLeAclManager 22from google.protobuf import empty_pb2 as empty_proto 23from facade import common_pb2 as common 24from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade 25from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade 26from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade 27from hci.facade import hci_facade_pb2 as hci_facade 28import bluetooth_packets_python3 as bt_packets 29from bluetooth_packets_python3 import hci_packets 30from bluetooth_packets_python3 import RawBuilder 31 32 33class LeAclManagerTest(GdBaseTestClass): 34 35 def setup_class(self): 36 super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') 37 38 def setup_test(self): 39 super().setup_test() 40 self.dut_le_acl_manager = PyLeAclManager(self.dut) 41 self.cert_hci_le_event_stream = EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty())) 42 self.cert_acl_data_stream = EventStream(self.cert.hci.StreamAcl(empty_proto.Empty())) 43 44 def teardown_test(self): 45 safeClose(self.cert_hci_le_event_stream) 46 safeClose(self.cert_acl_data_stream) 47 safeClose(self.dut_le_acl_manager) 48 super().teardown_test() 49 50 def set_privacy_policy_static(self): 51 self.dut_address = b'd0:05:04:03:02:01' 52 private_policy = le_initiator_address_facade.PrivacyPolicy( 53 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 54 address_with_type=common.BluetoothAddressWithType( 55 address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS)) 56 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 57 58 def register_for_event(self, event_code): 59 msg = hci_facade.EventRequest(code=int(event_code)) 60 self.cert.hci.RequestEvent(msg) 61 62 def register_for_le_event(self, event_code): 63 msg = hci_facade.EventRequest(code=int(event_code)) 64 self.cert.hci.RequestLeSubevent(msg) 65 66 def enqueue_hci_command(self, command): 67 cmd_bytes = bytes(command.Serialize()) 68 cmd = common.Data(payload=cmd_bytes) 69 self.cert.hci.SendCommand(cmd) 70 71 def enqueue_acl_data(self, handle, pb_flag, b_flag, data): 72 acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) 73 self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 74 75 def dut_connects(self, check_address): 76 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 77 self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) 78 79 # Cert Advertises 80 advertising_handle = 0 81 self.enqueue_hci_command( 82 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 83 advertising_handle, 84 hci_packets.LegacyAdvertisingProperties.ADV_IND, 85 400, 86 450, 87 7, 88 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 89 hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 90 '00:00:00:00:00:00', 91 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 92 0xF8, 93 1, #SID 94 hci_packets.Enable.DISABLED # Scan request notification 95 )) 96 97 self.enqueue_hci_command( 98 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) 99 100 gap_name = hci_packets.GapData() 101 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 102 gap_name.data = list(bytes(b'Im_A_Cert')) 103 104 self.enqueue_hci_command( 105 hci_packets.LeSetExtendedAdvertisingDataBuilder( 106 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 107 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) 108 109 gap_short_name = hci_packets.GapData() 110 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 111 gap_short_name.data = list(bytes(b'Im_A_C')) 112 113 self.enqueue_hci_command( 114 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 115 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 116 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) 117 118 enabled_set = hci_packets.EnabledSet() 119 enabled_set.advertising_handle = advertising_handle 120 enabled_set.duration = 0 121 enabled_set.max_extended_advertising_events = 0 122 self.enqueue_hci_command( 123 hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) 124 125 self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( 126 remote_addr=common.BluetoothAddressWithType( 127 address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), 128 type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) 129 130 # Cert gets ConnectionComplete with a handle and sends ACL data 131 handle = 0xfff 132 address = hci_packets.Address() 133 134 def get_handle(packet): 135 packet_bytes = packet.payload 136 nonlocal handle 137 nonlocal address 138 if b'\x3e\x13\x01\x00' in packet_bytes: 139 cc_view = hci_packets.LeConnectionCompleteView( 140 hci_packets.LeMetaEventView( 141 hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) 142 handle = cc_view.GetConnectionHandle() 143 address = cc_view.GetPeerAddress() 144 return True 145 if b'\x3e\x13\x0A\x00' in packet_bytes: 146 cc_view = hci_packets.LeEnhancedConnectionCompleteView( 147 hci_packets.LeMetaEventView( 148 hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) 149 handle = cc_view.GetConnectionHandle() 150 address = cc_view.GetPeerResolvablePrivateAddress() 151 return True 152 return False 153 154 self.cert_hci_le_event_stream.assert_event_occurs(get_handle) 155 self.cert_handle = handle 156 dut_address_from_complete = address 157 if check_address: 158 assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode()) 159 160 def send_receive_and_check(self): 161 self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 162 hci_packets.BroadcastFlag.POINT_TO_POINT, 163 bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) 164 165 self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') 166 self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.payload) 167 assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) 168 169 def test_dut_connects(self): 170 self.set_privacy_policy_static() 171 self.dut_connects(check_address=True) 172 self.send_receive_and_check() 173 174 def test_dut_connects_resolvable_address(self): 175 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 176 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 177 rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', 178 minimum_rotation_time=7 * 60 * 1000, 179 maximum_rotation_time=15 * 60 * 1000) 180 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 181 self.dut_connects(check_address=False) 182 self.send_receive_and_check() 183 184 def test_dut_connects_non_resolvable_address(self): 185 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 186 address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS, 187 rotation_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f', 188 minimum_rotation_time=8 * 60 * 1000, 189 maximum_rotation_time=14 * 60 * 1000) 190 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 191 self.dut_connects(check_address=False) 192 self.send_receive_and_check() 193 194 def test_dut_connects_public_address(self): 195 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( 196 le_initiator_address_facade.PrivacyPolicy( 197 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) 198 self.dut_connects(check_address=False) 199 self.send_receive_and_check() 200 201 def test_dut_connects_public_address_cancelled(self): 202 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( 203 le_initiator_address_facade.PrivacyPolicy( 204 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) 205 self.dut_connects(check_address=False) 206 self.send_receive_and_check() 207 208 def test_cert_connects(self): 209 self.set_privacy_policy_static() 210 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 211 212 self.dut_le_acl_manager.listen_for_incoming_connections() 213 214 # DUT Advertises 215 gap_name = hci_packets.GapData() 216 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 217 gap_name.data = list(bytes(b'Im_The_DUT')) 218 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 219 config = le_advertising_facade.AdvertisingConfig( 220 advertisement=[gap_data], 221 interval_min=512, 222 interval_max=768, 223 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 224 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 225 peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 226 peer_address=common.BluetoothAddress(address=bytes(b'A6:A5:A4:A3:A2:A1')), 227 channel_map=7, 228 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 229 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 230 231 self.dut.hci_le_advertising_manager.CreateAdvertiser(request) 232 233 # Cert Connects 234 self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) 235 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 236 phy_scan_params.scan_interval = 0x60 237 phy_scan_params.scan_window = 0x30 238 phy_scan_params.conn_interval_min = 0x18 239 phy_scan_params.conn_interval_max = 0x28 240 phy_scan_params.conn_latency = 0 241 phy_scan_params.supervision_timeout = 0x1f4 242 phy_scan_params.min_ce_length = 0 243 phy_scan_params.max_ce_length = 0 244 self.enqueue_hci_command( 245 hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, 246 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 247 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 248 self.dut_address.decode(), 1, [phy_scan_params])) 249 250 # Cert gets ConnectionComplete with a handle and sends ACL data 251 handle = 0xfff 252 253 def get_handle(packet): 254 packet_bytes = packet.payload 255 nonlocal handle 256 if b'\x3e\x13\x01\x00' in packet_bytes: 257 cc_view = hci_packets.LeConnectionCompleteView( 258 hci_packets.LeMetaEventView( 259 hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) 260 handle = cc_view.GetConnectionHandle() 261 return True 262 if b'\x3e\x13\x0A\x00' in packet_bytes: 263 cc_view = hci_packets.LeEnhancedConnectionCompleteView( 264 hci_packets.LeMetaEventView( 265 hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) 266 handle = cc_view.GetConnectionHandle() 267 return True 268 return False 269 270 self.cert_hci_le_event_stream.assert_event_occurs(get_handle) 271 self.cert_handle = handle 272 273 self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 274 hci_packets.BroadcastFlag.POINT_TO_POINT, 275 bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) 276 277 # DUT gets a connection complete event and sends and receives 278 handle = 0xfff 279 self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() 280 281 self.send_receive_and_check() 282 283 def test_recombination_l2cap_packet(self): 284 self.set_privacy_policy_static() 285 self.dut_connects(check_address=True) 286 287 self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 288 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) 289 self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, 290 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) 291 292 assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) 293