1#
2#   Copyright 2021 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16import time
17import logging
18
19from bluetooth_packets_python3 import hci_packets
20from cert.event_stream import EventStream
21from cert.gd_base_test import GdBaseTestClass
22from cert.matchers import HciMatchers, IsoMatchers, L2capMatchers
23from cert.metadata import metadata
24from cert.py_hci import PyHci
25from cert.py_l2cap import PyLeL2cap
26from cert.py_le_iso import PyLeIso
27from cert.py_le_iso import CisTestParameters
28from cert.truth import assertThat
29from datetime import timedelta
30from facade import common_pb2 as common
31from hci.facade import controller_facade_pb2 as controller_facade
32from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
33from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade
34from google.protobuf import empty_pb2 as empty_proto
35from neighbor.facade import facade_pb2 as neighbor_facade
36from l2cap.le.cert.cert_le_l2cap import CertLeL2cap
37from iso.cert.cert_le_iso import CertLeIso
38
39import time
40from bluetooth_packets_python3.hci_packets import OpCode
41
42
43class LeIsoTest(GdBaseTestClass):
44    """
45        Collection of tests that each sample results from
46        different (unique) combinations of io capabilities, authentication requirements, and oob data.
47    """
48
49    def setup_class(self):
50        super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES')
51
52    def setup_test(self):
53        super().setup_test()
54
55        self.dut_l2cap = PyLeL2cap(self.dut)
56        self.cert_l2cap = CertLeL2cap(self.cert)
57        self.dut_address = common.BluetoothAddressWithType(
58            address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS)
59        self.cert_address = common.BluetoothAddressWithType(
60            address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS)
61        dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy(
62            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
63            address_with_type=self.dut_address,
64            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
65            minimum_rotation_time=0,
66            maximum_rotation_time=0)
67        self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy)
68        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
69            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
70            address_with_type=self.cert_address,
71            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
72            minimum_rotation_time=0,
73            maximum_rotation_time=0)
74        self.cert_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
75
76        self.dut_iso = PyLeIso(self.dut)
77        self.cert_iso = CertLeIso(self.cert)
78
79    def teardown_test(self):
80        self.dut_iso.close()
81        self.cert_iso.close()
82
83        self.cert_l2cap.close()
84        self.dut_l2cap.close()
85        super().teardown_test()
86
87    #cert becomes central of connection, dut peripheral
88    def _setup_link_from_cert(self):
89        # DUT Advertises
90        gap_name = hci_packets.GapData()
91        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
92        gap_name.data = list(bytes(b'Im_The_DUT'))
93        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
94        config = le_advertising_facade.AdvertisingConfig(
95            advertisement=[gap_data],
96            interval_min=512,
97            interval_max=768,
98            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
99            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
100            channel_map=7,
101            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
102        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
103        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
104        self.cert_l2cap.connect_le_acl(self.dut_address)
105
106    def _setup_cis_from_cert(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval,
107                             peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s,
108                             max_transport_latency_s_to_m, cis_configs):
109        self.cert_iso.le_set_cig_parameters_test(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m,
110                                                 iso_interval, peripherals_clock_accuracy, packing, framing,
111                                                 max_transport_latency_m_to_s, max_transport_latency_s_to_m,
112                                                 cis_configs)
113
114        cis_handles = self.cert_iso.wait_le_set_cig_parameters_complete()
115
116        cis_handle = cis_handles[0]
117
118        acl_connection_handle = self.cert_l2cap._le_acl.handle
119        self.cert_iso.le_cretate_cis([(cis_handle, acl_connection_handle)])
120        dut_cis_stream = self.dut_iso.wait_le_cis_established()
121        cert_cis_stream = self.cert_iso.wait_le_cis_established()
122        return (dut_cis_stream, cert_cis_stream)
123
124    @metadata(
125        pts_test_id="IAL/CIS/UNF/SLA/BV-01-C",
126        pts_test_name="connected isochronous stream, unframed data, peripheral role")
127    def test_iso_cis_unf_sla_bv_01_c(self):
128        """
129            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
130        """
131        cig_id = 0x01
132        sdu_interval_m_to_s = 0
133        sdu_interval_s_to_m = 0x186a
134        ft_m_to_s = 0
135        ft_s_to_m = 1
136        iso_interval = 0x0A
137        peripherals_clock_accuracy = 0
138        packing = 0
139        framing = 0
140        max_transport_latency_m_to_s = 0
141        max_transport_latency_s_to_m = 0
142        cis_configs = [
143            CisTestParameters(
144                cis_id=0x01,
145                nse=2,
146                max_sdu_m_to_s=100,
147                max_sdu_s_to_m=100,
148                max_pdu_m_to_s=100,
149                max_pdu_s_to_m=100,
150                phy_m_to_s=0x02,
151                phy_s_to_m=0x00,
152                bn_m_to_s=0,
153                bn_s_to_m=2,
154            )
155        ]
156
157        self._setup_link_from_cert()
158        (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert(
159            cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval,
160            peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m,
161            cis_configs)
162        dut_cis_stream.send(b'abcdefgh' * 10)
163        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
164
165    @metadata(
166        pts_test_id="IAL/CIS/UNF/SLA/BV-25-C",
167        pts_test_name="connected isochronous stream, unframed data, peripheral role")
168    def test_iso_cis_unf_sla_bv_25_c(self):
169        """
170            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
171        """
172        cig_id = 0x01
173        sdu_interval_m_to_s = 0x7530
174        sdu_interval_s_to_m = 0x7530
175        ft_m_to_s = 3
176        ft_s_to_m = 2
177        iso_interval = 0x18
178        peripherals_clock_accuracy = 0
179        packing = 0
180        framing = 0
181        max_transport_latency_m_to_s = 0
182        max_transport_latency_s_to_m = 0
183        cis_configs = [
184            CisTestParameters(
185                cis_id=0x01,
186                nse=5,
187                max_sdu_m_to_s=100,
188                max_sdu_s_to_m=100,
189                max_pdu_m_to_s=100,
190                max_pdu_s_to_m=100,
191                phy_m_to_s=0x02,
192                phy_s_to_m=0x00,
193                bn_m_to_s=3,
194                bn_s_to_m=1,
195            )
196        ]
197
198        self._setup_link_from_cert()
199        (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert(
200            cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval,
201            peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m,
202            cis_configs)
203        dut_cis_stream.send(b'abcdefgh' * 10)
204        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
205
206    @metadata(
207        pts_test_id="IAL/CIS/FRA/SLA/BV-03-C",
208        pts_test_name="connected isochronous stream, framed data, peripheral role")
209    def test_iso_cis_fra_sla_bv_03_c(self):
210        """
211            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
212        """
213        cig_id = 0x01
214        sdu_interval_m_to_s = 0x0000
215        sdu_interval_s_to_m = 0x4e30
216        ft_m_to_s = 0
217        ft_s_to_m = 2
218        iso_interval = 0x14
219        peripherals_clock_accuracy = 0
220        packing = 0
221        framing = 1
222        max_transport_latency_m_to_s = 0
223        max_transport_latency_s_to_m = 0
224        cis_configs = [
225            CisTestParameters(
226                cis_id=0x01,
227                nse=4,
228                max_sdu_m_to_s=100,
229                max_sdu_s_to_m=100,
230                max_pdu_m_to_s=100,
231                max_pdu_s_to_m=100,
232                phy_m_to_s=0x02,
233                phy_s_to_m=0x00,
234                bn_m_to_s=0,
235                bn_s_to_m=2,
236            )
237        ]
238
239        self._setup_link_from_cert()
240        (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert(
241            cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval,
242            peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m,
243            cis_configs)
244        dut_cis_stream.send(b'abcdefgh' * 10)
245        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
246
247    @metadata(
248        pts_test_id="IAL/CIS/FRA/SLA/BV-26-C",
249        pts_test_name="connected isochronous stream, framed data, peripheral role")
250    def test_iso_cis_fra_sla_bv_26_c(self):
251        """
252            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
253        """
254        cig_id = 0x01
255        sdu_interval_m_to_s = 0x14D5
256        sdu_interval_s_to_m = 0x14D5
257        ft_m_to_s = 1
258        ft_s_to_m = 1
259        iso_interval = 0x08
260        peripherals_clock_accuracy = 0
261        packing = 0
262        framing = 1
263        max_transport_latency_m_to_s = 0
264        max_transport_latency_s_to_m = 0
265        cis_configs = [
266            CisTestParameters(
267                cis_id=0x01,
268                nse=2,
269                max_sdu_m_to_s=100,
270                max_sdu_s_to_m=100,
271                max_pdu_m_to_s=100,
272                max_pdu_s_to_m=100,
273                phy_m_to_s=0x02,
274                phy_s_to_m=0x00,
275                bn_m_to_s=1,
276                bn_s_to_m=1,
277            )
278        ]
279
280        self._setup_link_from_cert()
281        (dut_cis_stream, cert_cis_stream) = self._setup_cis_from_cert(
282            cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, iso_interval,
283            peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m,
284            cis_configs)
285        dut_cis_stream.send(b'abcdefgh' * 10)
286        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
287