1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import logging
19
20from google.protobuf import empty_pb2 as empty_proto
21
22from bluetooth_packets_python3 import hci_packets
23from cert.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys
24from cert.ble_lib import generate_ble_scan_objects
25from cert.gd_sl4a_base_test import GdSl4aBaseTestClass
26from hci.facade import \
27  le_advertising_manager_facade_pb2 as le_advertising_facade
28from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade
29from facade import common_pb2 as common
30
31
32class LeAdvancedScanningTest(GdSl4aBaseTestClass):
33
34    def setup_class(self):
35        super().setup_class(cert_module='HCI_INTERFACES')
36        self.default_timeout = 10  # seconds
37
38    def setup_test(self):
39        super().setup_test()
40
41    def teardown_test(self):
42        super().teardown_test()
43
44    def set_cert_privacy_policy_with_random_address(self, random_address):
45        private_policy = le_initiator_address_facade.PrivacyPolicy(
46            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
47            address_with_type=common.BluetoothAddressWithType(
48                address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')),
49                type=common.RANDOM_DEVICE_ADDRESS))
50        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
51
52    def set_cert_privacy_policy_with_public_address(self):
53        public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address
54        private_policy = le_initiator_address_facade.PrivacyPolicy(
55            address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS,
56            address_with_type=common.BluetoothAddressWithType(
57                address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS))
58        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
59        # Bluetooth MAC address must be upper case
60        return public_address_bytes.decode('utf-8').upper()
61
62    def test_scan_filter_device_name_legacy_pdu(self):
63        # Use public address on cert side
64        logging.info("Setting public address")
65        DEVICE_NAME = 'Im_The_CERT!'
66        public_address = self.set_cert_privacy_policy_with_public_address()
67        logging.info("Set public address")
68
69        # Setup cert side to advertise
70        gap_name = hci_packets.GapData()
71        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
72        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
73        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
74        config = le_advertising_facade.AdvertisingConfig(
75            advertisement=[gap_data],
76            interval_min=512,
77            interval_max=768,
78            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
79            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
80            channel_map=7,
81            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES,
82            tx_power=20)
83        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
84        logging.info("Creating advertiser")
85        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
86        logging.info("Created advertiser")
87
88        # Setup SL4A DUT side to scan
89        logging.info("Start scanning with public address %s" % public_address)
90        self.dut.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
91        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.droid)
92        expected_event_name = scan_result.format(scan_callback)
93
94        # Setup SL4A DUT filter
95        self.dut.droid.bleSetScanFilterDeviceName(DEVICE_NAME)
96        self.dut.droid.bleBuildScanFilter(filter_list)
97
98        # Start scanning on SL4A DUT side
99        self.dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
100        logging.info("Started scanning")
101        try:
102            # Verify if there is scan result
103            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
104        except queue.Empty as error:
105            self.log.error("Could not find initial advertisement.")
106            return False
107        # Print out scan result
108        mac_address = event_info['data']['Result']['deviceInfo']['address']
109        self.log.info("Filter advertisement with address {}".format(mac_address))
110
111        # Stop scanning
112        logging.info("Stop scanning")
113        self.dut.droid.bleStopBleScan(scan_callback)
114        logging.info("Stopped scanning")
115
116        # Stop advertising
117        logging.info("Stop advertising")
118        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id)
119        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
120        logging.info("Stopped advertising")
121
122        return True
123
124    def test_scan_filter_device_random_address_legacy_pdu(self):
125        # Use random address on cert side
126        logging.info("Setting random address")
127        RANDOM_ADDRESS = 'D0:05:04:03:02:01'
128        DEVICE_NAME = 'Im_The_CERT!'
129        self.set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS)
130        logging.info("Set random address")
131
132        # Setup cert side to advertise
133        gap_name = hci_packets.GapData()
134        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
135        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
136        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
137        config = le_advertising_facade.AdvertisingConfig(
138            advertisement=[gap_data],
139            interval_min=512,
140            interval_max=768,
141            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
142            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
143            channel_map=7,
144            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
145        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
146        logging.info("Creating advertiser")
147        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
148        logging.info("Created advertiser")
149
150        # Setup SL4A DUT side to scan
151        addr_type = ble_address_types["random"]
152        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type))
153        self.dut.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
154        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.droid)
155        expected_event_name = scan_result.format(scan_callback)
156
157        # Setup SL4A DUT filter
158        self.dut.droid.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type))
159        self.dut.droid.bleBuildScanFilter(filter_list)
160
161        # Start scanning on SL4A DUT side
162        self.dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
163        logging.info("Started scanning")
164        try:
165            # Verify if there is scan result
166            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
167        except queue.Empty as error:
168            self.log.error("Could not find initial advertisement.")
169            return False
170        # Print out scan result
171        mac_address = event_info['data']['Result']['deviceInfo']['address']
172        self.log.info("Filter advertisement with address {}".format(mac_address))
173
174        # Stop scanning
175        logging.info("Stop scanning")
176        self.dut.droid.bleStopBleScan(scan_callback)
177        logging.info("Stopped scanning")
178
179        # Stop advertising
180        logging.info("Stop advertising")
181        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id)
182        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
183        logging.info("Stopped advertising")
184
185        return True
186
187    def test_scan_filter_device_public_address_extended_pdu(self):
188        # Use public address on cert side
189        logging.info("Setting public address")
190        DEVICE_NAME = 'Im_The_CERT!'
191        public_address = self.set_cert_privacy_policy_with_public_address()
192        logging.info("Set public address")
193
194        # Setup cert side to advertise
195        gap_name = hci_packets.GapData()
196        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
197        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
198        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
199        config = le_advertising_facade.AdvertisingConfig(
200            advertisement=[gap_data],
201            interval_min=512,
202            interval_max=768,
203            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
204            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
205            channel_map=7,
206            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
207        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
208            advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"])
209        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
210        logging.info("Creating advertiser")
211        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
212        logging.info("Created advertiser")
213
214        # Setup SL4A DUT side to scan
215        addr_type = ble_address_types["public"]
216        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type))
217        self.dut.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
218        self.dut.droid.bleSetScanSettingsLegacy(False)
219        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.droid)
220        expected_event_name = scan_result.format(scan_callback)
221
222        # Setup SL4A DUT filter
223        self.dut.droid.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type))
224        self.dut.droid.bleBuildScanFilter(filter_list)
225
226        # Start scanning on SL4A DUT side
227        self.dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
228        logging.info("Started scanning")
229        try:
230            # Verify if there is scan result
231            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
232        except queue.Empty as error:
233            self.log.error("Could not find initial advertisement.")
234            return False
235        # Print out scan result
236        mac_address = event_info['data']['Result']['deviceInfo']['address']
237        self.log.info("Filter advertisement with address {}".format(mac_address))
238
239        # Stop scanning
240        logging.info("Stop scanning")
241        self.dut.droid.bleStopBleScan(scan_callback)
242        logging.info("Stopped scanning")
243
244        # Stop advertising
245        logging.info("Stop advertising")
246        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id)
247        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
248        logging.info("Stopped advertising")
249
250        return True
251