1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from cert.closable import safeClose
18from cert.gd_base_test import GdBaseTestClass
19from cert.event_stream import EventStream
20from cert.truth import assertThat
21from cert.py_le_acl_manager import PyLeAclManager
22from google.protobuf import empty_pb2 as empty_proto
23from facade import common_pb2 as common
24from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade
25from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
26from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade
27from hci.facade import hci_facade_pb2 as hci_facade
28import bluetooth_packets_python3 as bt_packets
29from bluetooth_packets_python3 import hci_packets
30from bluetooth_packets_python3 import RawBuilder
31
32
33class LeAclManagerTest(GdBaseTestClass):
34
35    def setup_class(self):
36        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')
37
38    def setup_test(self):
39        super().setup_test()
40        self.dut_le_acl_manager = PyLeAclManager(self.dut)
41        self.cert_hci_le_event_stream = EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty()))
42        self.cert_acl_data_stream = EventStream(self.cert.hci.StreamAcl(empty_proto.Empty()))
43
44    def teardown_test(self):
45        safeClose(self.cert_hci_le_event_stream)
46        safeClose(self.cert_acl_data_stream)
47        safeClose(self.dut_le_acl_manager)
48        super().teardown_test()
49
50    def set_privacy_policy_static(self):
51        self.dut_address = b'd0:05:04:03:02:01'
52        private_policy = le_initiator_address_facade.PrivacyPolicy(
53            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
54            address_with_type=common.BluetoothAddressWithType(
55                address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS))
56        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
57
58    def register_for_event(self, event_code):
59        msg = hci_facade.EventRequest(code=int(event_code))
60        self.cert.hci.RequestEvent(msg)
61
62    def register_for_le_event(self, event_code):
63        msg = hci_facade.EventRequest(code=int(event_code))
64        self.cert.hci.RequestLeSubevent(msg)
65
66    def enqueue_hci_command(self, command):
67        cmd_bytes = bytes(command.Serialize())
68        cmd = common.Data(payload=cmd_bytes)
69        self.cert.hci.SendCommand(cmd)
70
71    def enqueue_acl_data(self, handle, pb_flag, b_flag, data):
72        acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data))
73        self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))
74
75    def dut_connects(self, check_address):
76        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
77        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
78
79        # Cert Advertises
80        advertising_handle = 0
81        self.enqueue_hci_command(
82            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
83                advertising_handle,
84                hci_packets.LegacyAdvertisingProperties.ADV_IND,
85                400,
86                450,
87                7,
88                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
89                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
90                '00:00:00:00:00:00',
91                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
92                0xF8,
93                1,  #SID
94                hci_packets.Enable.DISABLED  # Scan request notification
95            ))
96
97        self.enqueue_hci_command(
98            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
99
100        gap_name = hci_packets.GapData()
101        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
102        gap_name.data = list(bytes(b'Im_A_Cert'))
103
104        self.enqueue_hci_command(
105            hci_packets.LeSetExtendedAdvertisingDataBuilder(
106                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
107                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
108
109        gap_short_name = hci_packets.GapData()
110        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
111        gap_short_name.data = list(bytes(b'Im_A_C'))
112
113        self.enqueue_hci_command(
114            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
115                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
116                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
117
118        enabled_set = hci_packets.EnabledSet()
119        enabled_set.advertising_handle = advertising_handle
120        enabled_set.duration = 0
121        enabled_set.max_extended_advertising_events = 0
122        self.enqueue_hci_command(
123            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
124
125        self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
126            remote_addr=common.BluetoothAddressWithType(
127                address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
128                type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))
129
130        # Cert gets ConnectionComplete with a handle and sends ACL data
131        handle = 0xfff
132        address = hci_packets.Address()
133
134        def get_handle(packet):
135            packet_bytes = packet.payload
136            nonlocal handle
137            nonlocal address
138            if b'\x3e\x13\x01\x00' in packet_bytes:
139                cc_view = hci_packets.LeConnectionCompleteView(
140                    hci_packets.LeMetaEventView(
141                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
142                handle = cc_view.GetConnectionHandle()
143                address = cc_view.GetPeerAddress()
144                return True
145            if b'\x3e\x13\x0A\x00' in packet_bytes:
146                cc_view = hci_packets.LeEnhancedConnectionCompleteView(
147                    hci_packets.LeMetaEventView(
148                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
149                handle = cc_view.GetConnectionHandle()
150                address = cc_view.GetPeerResolvablePrivateAddress()
151                return True
152            return False
153
154        self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
155        self.cert_handle = handle
156        dut_address_from_complete = address
157        if check_address:
158            assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode())
159
160    def send_receive_and_check(self):
161        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
162                              hci_packets.BroadcastFlag.POINT_TO_POINT,
163                              bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
164
165        self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')
166        self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.payload)
167        assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
168
169    def test_dut_connects(self):
170        self.set_privacy_policy_static()
171        self.dut_connects(check_address=True)
172        self.send_receive_and_check()
173
174    def test_dut_connects_resolvable_address(self):
175        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
176            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
177            rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f',
178            minimum_rotation_time=7 * 60 * 1000,
179            maximum_rotation_time=15 * 60 * 1000)
180        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
181        self.dut_connects(check_address=False)
182        self.send_receive_and_check()
183
184    def test_dut_connects_non_resolvable_address(self):
185        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
186            address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS,
187            rotation_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f',
188            minimum_rotation_time=8 * 60 * 1000,
189            maximum_rotation_time=14 * 60 * 1000)
190        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
191        self.dut_connects(check_address=False)
192        self.send_receive_and_check()
193
194    def test_dut_connects_public_address(self):
195        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(
196            le_initiator_address_facade.PrivacyPolicy(
197                address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS))
198        self.dut_connects(check_address=False)
199        self.send_receive_and_check()
200
201    def test_dut_connects_public_address_cancelled(self):
202        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(
203            le_initiator_address_facade.PrivacyPolicy(
204                address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS))
205        self.dut_connects(check_address=False)
206        self.send_receive_and_check()
207
208    def test_cert_connects(self):
209        self.set_privacy_policy_static()
210        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
211
212        self.dut_le_acl_manager.listen_for_incoming_connections()
213
214        # DUT Advertises
215        gap_name = hci_packets.GapData()
216        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
217        gap_name.data = list(bytes(b'Im_The_DUT'))
218        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
219        config = le_advertising_facade.AdvertisingConfig(
220            advertisement=[gap_data],
221            interval_min=512,
222            interval_max=768,
223            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
224            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
225            peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
226            peer_address=common.BluetoothAddress(address=bytes(b'A6:A5:A4:A3:A2:A1')),
227            channel_map=7,
228            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
229        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
230
231        self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
232
233        # Cert Connects
234        self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
235        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
236        phy_scan_params.scan_interval = 0x60
237        phy_scan_params.scan_window = 0x30
238        phy_scan_params.conn_interval_min = 0x18
239        phy_scan_params.conn_interval_max = 0x28
240        phy_scan_params.conn_latency = 0
241        phy_scan_params.supervision_timeout = 0x1f4
242        phy_scan_params.min_ce_length = 0
243        phy_scan_params.max_ce_length = 0
244        self.enqueue_hci_command(
245            hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
246                                                          hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
247                                                          hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
248                                                          self.dut_address.decode(), 1, [phy_scan_params]))
249
250        # Cert gets ConnectionComplete with a handle and sends ACL data
251        handle = 0xfff
252
253        def get_handle(packet):
254            packet_bytes = packet.payload
255            nonlocal handle
256            if b'\x3e\x13\x01\x00' in packet_bytes:
257                cc_view = hci_packets.LeConnectionCompleteView(
258                    hci_packets.LeMetaEventView(
259                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
260                handle = cc_view.GetConnectionHandle()
261                return True
262            if b'\x3e\x13\x0A\x00' in packet_bytes:
263                cc_view = hci_packets.LeEnhancedConnectionCompleteView(
264                    hci_packets.LeMetaEventView(
265                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
266                handle = cc_view.GetConnectionHandle()
267                return True
268            return False
269
270        self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
271        self.cert_handle = handle
272
273        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
274                              hci_packets.BroadcastFlag.POINT_TO_POINT,
275                              bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
276
277        # DUT gets a connection complete event and sends and receives
278        handle = 0xfff
279        self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection()
280
281        self.send_receive_and_check()
282
283    def test_recombination_l2cap_packet(self):
284        self.set_privacy_policy_static()
285        self.dut_connects(check_address=True)
286
287        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
288                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello'))
289        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
290                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
291
292        assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)
293