1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import timedelta 18import logging 19 20from cert.captures import HalCaptures, HciCaptures 21from cert.gd_base_test import GdBaseTestClass 22from cert.matchers import HciMatchers 23from cert.py_hal import PyHal 24from cert.py_hci import PyHci 25from cert.truth import assertThat 26from hci.facade import hci_facade_pb2 as hci_facade 27from facade import common_pb2 as common 28from bluetooth_packets_python3.hci_packets import EventCode 29from bluetooth_packets_python3.hci_packets import LoopbackMode 30from bluetooth_packets_python3.hci_packets import WriteLoopbackModeBuilder 31from bluetooth_packets_python3.hci_packets import ReadLocalNameBuilder 32from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder 33from bluetooth_packets_python3.hci_packets import ScanEnable 34from bluetooth_packets_python3.hci_packets import InquiryBuilder 35from bluetooth_packets_python3.hci_packets import SubeventCode 36from bluetooth_packets_python3.hci_packets import LeSetRandomAddressBuilder 37from bluetooth_packets_python3.hci_packets import PhyScanParameters 38from bluetooth_packets_python3.hci_packets import LeScanType 39from bluetooth_packets_python3.hci_packets import LeSetExtendedScanParametersBuilder 40from bluetooth_packets_python3.hci_packets import OwnAddressType 41from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy 42from bluetooth_packets_python3.hci_packets import Enable 43from bluetooth_packets_python3.hci_packets import FilterDuplicates 44from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder 45from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties 46from bluetooth_packets_python3.hci_packets import PeerAddressType 47from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy 48from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder 49from bluetooth_packets_python3.hci_packets import GapData 50from bluetooth_packets_python3.hci_packets import GapDataType 51from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder 52from bluetooth_packets_python3.hci_packets import Operation 53from bluetooth_packets_python3.hci_packets import FragmentPreference 54from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder 55from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder 56from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder 57from bluetooth_packets_python3.hci_packets import EnabledSet 58from bluetooth_packets_python3.hci_packets import LeCreateConnPhyScanParameters 59from bluetooth_packets_python3.hci_packets import LeExtendedCreateConnectionBuilder 60from bluetooth_packets_python3.hci_packets import InitiatorFilterPolicy 61from bluetooth_packets_python3.hci_packets import AddressType 62from bluetooth_packets_python3.hci_packets import BroadcastFlag 63from bluetooth_packets_python3.hci_packets import ConnectListAddressType 64from bluetooth_packets_python3.hci_packets import LeAddDeviceToConnectListBuilder 65from bluetooth_packets_python3.hci_packets import LeSetRandomAddressBuilder 66from bluetooth_packets_python3.hci_packets import LeReadRemoteFeaturesBuilder 67from bluetooth_packets_python3.hci_packets import WritePageTimeoutBuilder 68from bluetooth_packets_python3.hci_packets import ReadBdAddrBuilder 69from bluetooth_packets_python3.hci_packets import CreateConnectionBuilder 70from bluetooth_packets_python3.hci_packets import PageScanRepetitionMode 71from bluetooth_packets_python3.hci_packets import ClockOffsetValid 72from bluetooth_packets_python3.hci_packets import CreateConnectionRoleSwitch 73from bluetooth_packets_python3.hci_packets import AcceptConnectionRequestBuilder 74from bluetooth_packets_python3.hci_packets import AcceptConnectionRequestRole 75from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag 76from bluetooth_packets_python3.hci_packets import ResetBuilder 77from bluetooth_packets_python3.hci_packets import Lap 78from bluetooth_packets_python3.hci_packets import OpCode 79from bluetooth_packets_python3.hci_packets import AclBuilder 80from bluetooth_packets_python3 import RawBuilder 81 82 83class DirectHciTest(GdBaseTestClass): 84 85 def setup_class(self): 86 super().setup_class(dut_module='HCI', cert_module='HAL') 87 88 def setup_test(self): 89 super().setup_test() 90 self.dut_hci = PyHci(self.dut, acl_streaming=True) 91 self.cert_hal = PyHal(self.cert) 92 self.cert_hal.send_hci_command(ResetBuilder()) 93 94 def teardown_test(self): 95 self.dut_hci.close() 96 self.cert_hal.close() 97 super().teardown_test() 98 99 def enqueue_acl_data(self, handle, pb_flag, b_flag, data): 100 acl = AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) 101 self.dut.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 102 103 def test_local_hci_cmd_and_event(self): 104 # Loopback mode responds with ACL and SCO connection complete 105 self.dut_hci.register_for_events(EventCode.LOOPBACK_COMMAND) 106 self.dut_hci.send_command(WriteLoopbackModeBuilder(LoopbackMode.ENABLE_LOCAL)) 107 108 self.dut_hci.send_command(ReadLocalNameBuilder()) 109 assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.LoopbackOf(ReadLocalNameBuilder())) 110 111 def test_inquiry_from_dut(self): 112 self.dut_hci.register_for_events(EventCode.INQUIRY_RESULT) 113 114 self.cert_hal.enable_inquiry_and_page_scan() 115 lap = Lap() 116 lap.lap = 0x33 117 self.dut_hci.send_command(InquiryBuilder(lap, 0x30, 0xff)) 118 assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.EventWithCode(EventCode.INQUIRY_RESULT)) 119 120 def test_le_ad_scan_cert_advertises(self): 121 self.dut_hci.register_for_le_events(SubeventCode.EXTENDED_ADVERTISING_REPORT, SubeventCode.ADVERTISING_REPORT) 122 123 # DUT Scans 124 self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) 125 phy_scan_params = PhyScanParameters() 126 phy_scan_params.le_scan_interval = 6553 127 phy_scan_params.le_scan_window = 6553 128 phy_scan_params.le_scan_type = LeScanType.ACTIVE 129 130 self.dut_hci.send_command( 131 LeSetExtendedScanParametersBuilder(OwnAddressType.RANDOM_DEVICE_ADDRESS, LeScanningFilterPolicy.ACCEPT_ALL, 132 1, [phy_scan_params])) 133 self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) 134 135 # CERT Advertises 136 advertising_handle = 0 137 self.cert_hal.send_hci_command( 138 LeSetExtendedAdvertisingLegacyParametersBuilder( 139 advertising_handle, 140 LegacyAdvertisingProperties.ADV_IND, 141 512, 142 768, 143 7, 144 OwnAddressType.RANDOM_DEVICE_ADDRESS, 145 PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 146 'A6:A5:A4:A3:A2:A1', 147 AdvertisingFilterPolicy.ALL_DEVICES, 148 0xF7, 149 1, # SID 150 Enable.DISABLED # Scan request notification 151 )) 152 153 self.cert_hal.send_hci_command( 154 LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) 155 gap_name = GapData() 156 gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME 157 gap_name.data = list(bytes(b'Im_A_Cert')) 158 159 self.cert_hal.send_hci_command( 160 LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT, 161 FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) 162 163 gap_short_name = GapData() 164 gap_short_name.data_type = GapDataType.SHORTENED_LOCAL_NAME 165 gap_short_name.data = list(bytes(b'Im_A_C')) 166 167 self.cert_hal.send_hci_command( 168 LeSetExtendedAdvertisingScanResponseBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT, 169 FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) 170 171 enabled_set = EnabledSet() 172 enabled_set.advertising_handle = 0 173 enabled_set.duration = 0 174 enabled_set.max_extended_advertising_events = 0 175 self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) 176 177 assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) 178 179 self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) 180 self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.DISABLED, FilterDuplicates.DISABLED, 0, 0)) 181 182 def _verify_le_connection_complete(self): 183 cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture() 184 assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_conn_complete_capture) 185 cert_handle = cert_conn_complete_capture.get().GetConnectionHandle() 186 187 dut_conn_complete_capture = HciCaptures.LeConnectionCompleteCapture() 188 assertThat(self.dut_hci.get_le_event_stream()).emits(dut_conn_complete_capture) 189 dut_handle = dut_conn_complete_capture.get().GetConnectionHandle() 190 191 return (dut_handle, cert_handle) 192 193 @staticmethod 194 def _create_phy_scan_params(): 195 phy_scan_params = LeCreateConnPhyScanParameters() 196 phy_scan_params.scan_interval = 0x60 197 phy_scan_params.scan_window = 0x30 198 phy_scan_params.conn_interval_min = 0x18 199 phy_scan_params.conn_interval_max = 0x28 200 phy_scan_params.conn_latency = 0 201 phy_scan_params.supervision_timeout = 0x1f4 202 phy_scan_params.min_ce_length = 0 203 phy_scan_params.max_ce_length = 0 204 return phy_scan_params 205 206 def test_le_connection_dut_advertises(self): 207 self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ADVERTISING_SET_TERMINATED, 208 SubeventCode.ENHANCED_CONNECTION_COMPLETE, 209 SubeventCode.READ_REMOTE_FEATURES_COMPLETE) 210 # Cert Connects 211 self.cert_hal.send_hci_command(LeSetRandomAddressBuilder('0C:05:04:03:02:01')) 212 phy_scan_params = DirectHciTest._create_phy_scan_params() 213 self.cert_hal.send_hci_command( 214 LeExtendedCreateConnectionBuilder(InitiatorFilterPolicy.USE_PEER_ADDRESS, 215 OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS, 216 '0D:05:04:03:02:01', 1, [phy_scan_params])) 217 218 advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01') 219 advertisement.set_data(b'Im_The_DUT') 220 advertisement.set_scan_response(b'Im_The_D') 221 advertisement.start() 222 223 (dut_handle, cert_handle) = self._verify_le_connection_complete() 224 225 self.dut_hci.send_command(LeReadRemoteFeaturesBuilder(dut_handle)) 226 assertThat(self.dut_hci.get_le_event_stream()).emits( 227 lambda packet: packet.payload[0] == int(EventCode.LE_META_EVENT) and packet.payload[2] == int(SubeventCode.READ_REMOTE_FEATURES_COMPLETE) 228 ) 229 230 # Send ACL Data 231 self.enqueue_acl_data(dut_handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 232 BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData')) 233 self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData')) 234 235 assertThat(self.cert_hal.get_acl_stream()).emits( 236 lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload) 237 assertThat(self.dut_hci.get_raw_acl_stream()).emits( 238 lambda packet: logging.debug(packet.payload) or b'SomeMoreAclData' in packet.payload) 239 240 def test_le_connect_list_connection_cert_advertises(self): 241 self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ENHANCED_CONNECTION_COMPLETE) 242 # DUT Connects 243 self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) 244 self.dut_hci.send_command(LeAddDeviceToConnectListBuilder(ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) 245 phy_scan_params = DirectHciTest._create_phy_scan_params() 246 self.dut_hci.send_command( 247 LeExtendedCreateConnectionBuilder(InitiatorFilterPolicy.USE_CONNECT_LIST, 248 OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS, 249 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) 250 251 advertisement = self.cert_hal.create_advertisement( 252 1, 253 '0C:05:04:03:02:01', 254 min_interval=512, 255 max_interval=768, 256 peer_address='A6:A5:A4:A3:A2:A1', 257 tx_power=0x7f, 258 sid=0) 259 advertisement.set_data(b'Im_A_Cert') 260 advertisement.start() 261 262 # LeConnectionComplete 263 self._verify_le_connection_complete() 264 265 def test_connection_dut_connects(self): 266 self.dut_hci.send_command(WritePageTimeoutBuilder(0x4000)) 267 268 self.cert_hal.enable_inquiry_and_page_scan() 269 address = self.cert_hal.read_own_address() 270 271 self.dut_hci.initiate_connection(address) 272 cert_acl = self.cert_hal.accept_connection() 273 dut_acl = self.dut_hci.complete_connection() 274 275 # Send ACL Data 276 dut_acl.send_first(b'Just SomeAclData') 277 cert_acl.send_first(b'Just SomeMoreAclData') 278 279 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 280 assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 281 282 def test_connection_cert_connects(self): 283 self.cert_hal.send_hci_command(WritePageTimeoutBuilder(0x4000)) 284 285 self.dut_hci.enable_inquiry_and_page_scan() 286 address = self.dut_hci.read_own_address() 287 288 self.cert_hal.initiate_connection(address) 289 dut_acl = self.dut_hci.accept_connection() 290 cert_acl = self.cert_hal.complete_connection() 291 292 # Send ACL Data 293 dut_acl.send_first(b'This is just SomeAclData') 294 cert_acl.send_first(b'This is just SomeMoreAclData') 295 296 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 297 assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 298