1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import logging 19 20from google.protobuf import empty_pb2 as empty_proto 21 22from bluetooth_packets_python3 import hci_packets 23from cert.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys 24from cert.ble_lib import generate_ble_scan_objects 25from cert.gd_sl4a_base_test import GdSl4aBaseTestClass 26from hci.facade import \ 27 le_advertising_manager_facade_pb2 as le_advertising_facade 28from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade 29from facade import common_pb2 as common 30 31 32class LeAdvancedScanningTest(GdSl4aBaseTestClass): 33 34 def setup_class(self): 35 super().setup_class(cert_module='HCI_INTERFACES') 36 self.default_timeout = 10 # seconds 37 38 def setup_test(self): 39 super().setup_test() 40 41 def teardown_test(self): 42 super().teardown_test() 43 44 def set_cert_privacy_policy_with_random_address(self, random_address): 45 private_policy = le_initiator_address_facade.PrivacyPolicy( 46 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 47 address_with_type=common.BluetoothAddressWithType( 48 address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')), 49 type=common.RANDOM_DEVICE_ADDRESS)) 50 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 51 52 def set_cert_privacy_policy_with_public_address(self): 53 public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address 54 private_policy = le_initiator_address_facade.PrivacyPolicy( 55 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS, 56 address_with_type=common.BluetoothAddressWithType( 57 address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS)) 58 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 59 # Bluetooth MAC address must be upper case 60 return public_address_bytes.decode('utf-8').upper() 61 62 def test_scan_filter_device_name_legacy_pdu(self): 63 # Use public address on cert side 64 logging.info("Setting public address") 65 DEVICE_NAME = 'Im_The_CERT!' 66 public_address = self.set_cert_privacy_policy_with_public_address() 67 logging.info("Set public address") 68 69 # Setup cert side to advertise 70 gap_name = hci_packets.GapData() 71 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 72 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 73 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 74 config = le_advertising_facade.AdvertisingConfig( 75 advertisement=[gap_data], 76 interval_min=512, 77 interval_max=768, 78 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 79 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 80 channel_map=7, 81 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES, 82 tx_power=20) 83 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 84 logging.info("Creating advertiser") 85 create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) 86 logging.info("Created advertiser") 87 88 # Setup SL4A DUT side to scan 89 logging.info("Start scanning with public address %s" % public_address) 90 self.dut.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 91 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.droid) 92 expected_event_name = scan_result.format(scan_callback) 93 94 # Setup SL4A DUT filter 95 self.dut.droid.bleSetScanFilterDeviceName(DEVICE_NAME) 96 self.dut.droid.bleBuildScanFilter(filter_list) 97 98 # Start scanning on SL4A DUT side 99 self.dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 100 logging.info("Started scanning") 101 try: 102 # Verify if there is scan result 103 event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) 104 except queue.Empty as error: 105 self.log.error("Could not find initial advertisement.") 106 return False 107 # Print out scan result 108 mac_address = event_info['data']['Result']['deviceInfo']['address'] 109 self.log.info("Filter advertisement with address {}".format(mac_address)) 110 111 # Stop scanning 112 logging.info("Stop scanning") 113 self.dut.droid.bleStopBleScan(scan_callback) 114 logging.info("Stopped scanning") 115 116 # Stop advertising 117 logging.info("Stop advertising") 118 remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) 119 self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) 120 logging.info("Stopped advertising") 121 122 return True 123 124 def test_scan_filter_device_random_address_legacy_pdu(self): 125 # Use random address on cert side 126 logging.info("Setting random address") 127 RANDOM_ADDRESS = 'D0:05:04:03:02:01' 128 DEVICE_NAME = 'Im_The_CERT!' 129 self.set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS) 130 logging.info("Set random address") 131 132 # Setup cert side to advertise 133 gap_name = hci_packets.GapData() 134 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 135 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 136 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 137 config = le_advertising_facade.AdvertisingConfig( 138 advertisement=[gap_data], 139 interval_min=512, 140 interval_max=768, 141 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 142 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 143 channel_map=7, 144 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 145 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 146 logging.info("Creating advertiser") 147 create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) 148 logging.info("Created advertiser") 149 150 # Setup SL4A DUT side to scan 151 addr_type = ble_address_types["random"] 152 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type)) 153 self.dut.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 154 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.droid) 155 expected_event_name = scan_result.format(scan_callback) 156 157 # Setup SL4A DUT filter 158 self.dut.droid.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type)) 159 self.dut.droid.bleBuildScanFilter(filter_list) 160 161 # Start scanning on SL4A DUT side 162 self.dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 163 logging.info("Started scanning") 164 try: 165 # Verify if there is scan result 166 event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) 167 except queue.Empty as error: 168 self.log.error("Could not find initial advertisement.") 169 return False 170 # Print out scan result 171 mac_address = event_info['data']['Result']['deviceInfo']['address'] 172 self.log.info("Filter advertisement with address {}".format(mac_address)) 173 174 # Stop scanning 175 logging.info("Stop scanning") 176 self.dut.droid.bleStopBleScan(scan_callback) 177 logging.info("Stopped scanning") 178 179 # Stop advertising 180 logging.info("Stop advertising") 181 remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) 182 self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) 183 logging.info("Stopped advertising") 184 185 return True 186 187 def test_scan_filter_device_public_address_extended_pdu(self): 188 # Use public address on cert side 189 logging.info("Setting public address") 190 DEVICE_NAME = 'Im_The_CERT!' 191 public_address = self.set_cert_privacy_policy_with_public_address() 192 logging.info("Set public address") 193 194 # Setup cert side to advertise 195 gap_name = hci_packets.GapData() 196 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 197 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 198 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 199 config = le_advertising_facade.AdvertisingConfig( 200 advertisement=[gap_data], 201 interval_min=512, 202 interval_max=768, 203 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 204 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 205 channel_map=7, 206 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 207 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 208 advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) 209 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 210 logging.info("Creating advertiser") 211 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 212 logging.info("Created advertiser") 213 214 # Setup SL4A DUT side to scan 215 addr_type = ble_address_types["public"] 216 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type)) 217 self.dut.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 218 self.dut.droid.bleSetScanSettingsLegacy(False) 219 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.droid) 220 expected_event_name = scan_result.format(scan_callback) 221 222 # Setup SL4A DUT filter 223 self.dut.droid.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type)) 224 self.dut.droid.bleBuildScanFilter(filter_list) 225 226 # Start scanning on SL4A DUT side 227 self.dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 228 logging.info("Started scanning") 229 try: 230 # Verify if there is scan result 231 event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) 232 except queue.Empty as error: 233 self.log.error("Could not find initial advertisement.") 234 return False 235 # Print out scan result 236 mac_address = event_info['data']['Result']['deviceInfo']['address'] 237 self.log.info("Filter advertisement with address {}".format(mac_address)) 238 239 # Stop scanning 240 logging.info("Stop scanning") 241 self.dut.droid.bleStopBleScan(scan_callback) 242 logging.info("Stopped scanning") 243 244 # Stop advertising 245 logging.info("Stop advertising") 246 remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) 247 self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) 248 logging.info("Stopped advertising") 249 250 return True 251