1#
2#   Copyright 2020 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from cert.gd_base_test import GdBaseTestClass
17from cert.truth import assertThat
18from cert.py_l2cap import PyLeL2cap
19from cert.matchers import L2capMatchers
20from cert.metadata import metadata
21from facade import common_pb2 as common
22from google.protobuf import empty_pb2 as empty_proto
23from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade
24from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
25from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade
26import bluetooth_packets_python3 as bt_packets
27from bluetooth_packets_python3 import hci_packets, l2cap_packets
28from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
29from l2cap.le.cert.cert_le_l2cap import CertLeL2cap
30from l2cap.le.facade_pb2 import SecurityLevel
31
32# Assemble a sample packet.
33SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17])
34
35
36class LeL2capTest(GdBaseTestClass):
37
38    def setup_class(self):
39        super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES')
40
41    def setup_test(self):
42        super().setup_test()
43
44        self.dut_l2cap = PyLeL2cap(self.dut)
45        self.cert_l2cap = CertLeL2cap(self.cert)
46        self.dut_address = common.BluetoothAddressWithType(
47            address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS)
48        self.cert_address = common.BluetoothAddressWithType(
49            address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS)
50        dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy(
51            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
52            address_with_type=self.dut_address,
53            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
54            minimum_rotation_time=0,
55            maximum_rotation_time=0)
56        self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy)
57        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
58            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
59            address_with_type=self.cert_address,
60            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
61            minimum_rotation_time=0,
62            maximum_rotation_time=0)
63        self.cert_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
64
65    def teardown_test(self):
66        self.cert_l2cap.close()
67        self.dut_l2cap.close()
68        super().teardown_test()
69
70    def _setup_link_from_cert(self):
71        # DUT Advertises
72        gap_name = hci_packets.GapData()
73        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
74        gap_name.data = list(bytes(b'Im_The_DUT'))
75        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
76        config = le_advertising_facade.AdvertisingConfig(
77            advertisement=[gap_data],
78            interval_min=512,
79            interval_max=768,
80            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
81            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
82            channel_map=7,
83            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
84        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
85        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
86        self.cert_l2cap.connect_le_acl(self.dut_address)
87
88    def _set_link_from_dut_and_open_channel(self,
89                                            signal_id=1,
90                                            scid=0x0101,
91                                            psm=0x33,
92                                            mtu=1000,
93                                            mps=100,
94                                            initial_credit=6):
95        # Cert Advertises
96        gap_name = hci_packets.GapData()
97        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
98        gap_name.data = list(bytes(b'Im_The_DUT'))
99        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
100        config = le_advertising_facade.AdvertisingConfig(
101            advertisement=[gap_data],
102            interval_min=512,
103            interval_max=768,
104            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
105            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
106            channel_map=7,
107            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
108        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
109        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
110        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm)
111        self.cert_l2cap.wait_for_connection()
112        # TODO: Currently we can only connect by using Dynamic channel API. Use fixed channel instead.
113        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm)
114        dut_channel = response_future.get_channel()
115        return (dut_channel, cert_channel)
116
117    def _open_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33, mtu=1000, mps=100, initial_credit=6):
118
119        dut_channel = self.dut_l2cap.register_coc(self.cert_address, psm)
120        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu, mps, initial_credit)
121
122        return (dut_channel, cert_channel)
123
124    def _open_channel_from_dut(self, psm=0x33):
125        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm)
126        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm)
127        dut_channel = response_future.get_channel()
128        return (dut_channel, cert_channel)
129
130    def _open_fixed_channel(self, cid=4):
131        dut_channel = self.dut_l2cap.get_fixed_channel(cid)
132        cert_channel = self.cert_l2cap.open_fixed_channel(cid)
133        return (dut_channel, cert_channel)
134
135    def test_fixed_channel_send(self):
136        self.dut_l2cap.enable_fixed_channel(4)
137        self._setup_link_from_cert()
138        (dut_channel, cert_channel) = self._open_fixed_channel(4)
139        dut_channel.send(b'hello' * 40)
140        assertThat(cert_channel).emits(L2capMatchers.Data(b'hello' * 40))
141
142    def test_fixed_channel_receive(self):
143        self.dut_l2cap.enable_fixed_channel(4)
144        self._setup_link_from_cert()
145        (dut_channel, cert_channel) = self._open_fixed_channel(4)
146        cert_channel.send(SAMPLE_PACKET)
147        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))
148
149    def test_connect_from_dut_and_open_dynamic_channel(self):
150        """
151        Internal test for GD stack only
152        """
153        self._set_link_from_dut_and_open_channel()
154
155    @metadata(pts_test_id="L2CAP/LE/CPU/BV-01-C", pts_test_name="Send Connection Parameter Update Request")
156    def test_send_connection_parameter_update_request(self):
157        """
158        Verify that the IUT is able to send the connection parameter update Request to Lower Tester when acting as a peripheral device.
159        NOTE: This is an optional feature. Also if both LL central and peripheral supports 4.1+ connection parameter update, this should happen in LL only, not L2CAP
160        NOTE: Currently we need to establish at least one dynamic channel to allow update.
161        """
162        self._setup_link_from_cert()
163        self._open_channel_from_dut()
164        self.dut_l2cap.update_connection_parameter()
165        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeConnectionParameterUpdateRequest())
166
167    @metadata(pts_test_id="L2CAP/LE/CPU/BV-02-C", pts_test_name="Accept Connection Parameter Update Request")
168    def test_accept_connection_parameter_update_request(self):
169        """
170        Verify that the IUT is able to receive and handle a request for connection parameter update when acting as a central device.
171        NOTE: Currently we need to establish at least one dynamic channel to allow update.
172        """
173        self._set_link_from_dut_and_open_channel()
174        self.cert_l2cap.get_control_channel().send(
175            l2cap_packets.ConnectionParameterUpdateRequestBuilder(2, 0x10, 0x10, 0x0a, 0x64))
176        assertThat(self.cert_l2cap.get_control_channel()).emits(
177            L2capMatchers.LeConnectionParameterUpdateResponse(
178                l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED))
179
180    @metadata(pts_test_id="L2CAP/LE/CPU/BI-01-C", pts_test_name="Reject Connection Parameter Update Parameters")
181    def test_reject_connection_parameter_update_parameters(self):
182        """
183        Verify that the IUT is able to reject a request for connection parameter update with illegal parameters.
184        NOTE: Currently we need to establish at least one dynamic channel to allow update.
185        """
186        self._set_link_from_dut_and_open_channel()
187        self.cert_l2cap.get_control_channel().send(
188            l2cap_packets.ConnectionParameterUpdateRequestBuilder(2, 0x10, 0x10, 512, 0x64))
189        assertThat(self.cert_l2cap.get_control_channel()).emits(
190            L2capMatchers.LeConnectionParameterUpdateResponse(
191                l2cap_packets.ConnectionParameterUpdateResponseResult.REJECTED))
192
193    @metadata(pts_test_id="L2CAP/LE/CPU/BI-02-C", pts_test_name="Reject Connection Parameter Update Request")
194    def test_reject_connection_parameter_update_request(self):
195        """
196        Verify that the IUT is able to reject a request for connection parameter update in peripheral mode.
197        """
198        self._setup_link_from_cert()
199        self.cert_l2cap.get_control_channel().send(
200            l2cap_packets.ConnectionParameterUpdateRequestBuilder(2, 0x10, 0x10, 0x0a, 0x64))
201        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeCommandReject())
202
203    @metadata(pts_test_id="L2CAP/COS/CFC/BV-01-C", pts_test_name="Segmentation")
204    def test_segmentation(self):
205        """
206        Verify that the IUT can send data segments which are larger than the LE frame size.
207        """
208        self._setup_link_from_cert()
209        (dut_channel, cert_channel) = self._open_channel_from_cert(mtu=1000, mps=102)
210        dut_channel.send(b'hello' * 20 + b'world')
211        # The first LeInformation packet contains 2 bytes of SDU size.
212        # The packet is divided into first 100 bytes from 'hellohello....'
213        # and remaining 5 bytes 'world'
214        assertThat(cert_channel).emits(
215            L2capMatchers.FirstLeIFrame(b'hello' * 20, sdu_size=105), L2capMatchers.Data(b'world')).inOrder()
216
217    @metadata(pts_test_id="L2CAP/COS/CFC/BV-02-C", pts_test_name="No Segmentation")
218    def test_no_segmentation(self):
219        """
220        Verify that the IUT can send data segments which do not require segmentation.
221        """
222        self._setup_link_from_cert()
223        (dut_channel, cert_channel) = self._open_channel_from_cert(mtu=1000, mps=202)
224        dut_channel.send(b'hello' * 40)
225        assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello' * 40, sdu_size=200))
226
227    def test_no_segmentation_dut_is_central(self):
228        """
229        L2CAP/COS/CFC/BV-02-C
230        """
231        (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel()
232        dut_channel.send(b'hello' * 40)
233        assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello' * 40, sdu_size=200))
234
235    @metadata(pts_test_id="L2CAP/COS/CFC/BV-03-C", pts_test_name="Reassembling")
236    def test_reassembling(self):
237        """
238        Verify that the IUT can correctly reassemble data received from the Lower Tester which is greater than the IUT LE-frame size.
239        """
240        self._setup_link_from_cert()
241        (dut_channel, cert_channel) = self._open_channel_from_cert()
242        sdu_size_for_two_sample_packet = 8
243        cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet, SAMPLE_PACKET)
244        cert_channel.send(SAMPLE_PACKET)
245        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17' * 2))
246
247    @metadata(pts_test_id="L2CAP/COS/CFC/BV-04-C", pts_test_name="Data Receiving")
248    def test_data_receiving(self):
249        """
250        Verify that the IUT can receive unsegmented data correctly.
251        """
252        self._setup_link_from_cert()
253        (dut_channel, cert_channel) = self._open_channel_from_cert()
254        cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
255        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))
256
257    def test_data_receiving_dut_is_central(self):
258        """
259        L2CAP/COS/CFC/BV-04-C
260        """
261        (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel()
262        cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
263        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))
264
265    @metadata(pts_test_id="L2CAP/COS/CFC/BV-05-C", pts_test_name="Multiple Channels with Interleaved Data Streams")
266    def test_multiple_channels_with_interleaved_data_streams(self):
267        """
268        Verify that an IUT can create multiple channels and receives data streams on the channels when the streams are interleaved.
269        """
270        self._setup_link_from_cert()
271        (dut_channel_x, cert_channel_x) = self._open_channel_from_cert(signal_id=1, scid=0x0103, psm=0x33)
272        (dut_channel_y, cert_channel_y) = self._open_channel_from_cert(signal_id=2, scid=0x0105, psm=0x35)
273        (dut_channel_z, cert_channel_z) = self._open_channel_from_cert(signal_id=3, scid=0x0107, psm=0x37)
274        cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET)
275        cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET)
276        cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET)
277        cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET)
278        cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET)
279        # TODO: We should assert two events in order, but it got stuck
280        assertThat(dut_channel_y).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), at_least_times=3)
281        assertThat(dut_channel_z).emits(
282            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'),
283            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')).inOrder()
284        cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET)
285        assertThat(dut_channel_z).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))
286
287    @metadata(pts_test_id="L2CAP/LE/REJ/BI-01-C", pts_test_name="Reject Unknown Command in LE Signaling Channel")
288    def test_reject_unknown_command_in_le_sigling_channel(self):
289        """
290        Verify that the IUT is able to reject unknown command.
291        """
292        self._setup_link_from_cert()
293        self.cert_l2cap.get_control_channel().send(
294            l2cap_packets.InformationRequestBuilder(
295                2, l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED))
296        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeCommandReject())
297
298    @metadata(pts_test_id="L2CAP/LE/REJ/BI-02-C", pts_test_name="Command Reject – Reserved PDU Codes")
299    def test_command_reject_reserved_pdu_codes(self):
300        """
301        Verify that an IUT receiving a PDU with a reserved command code sends a command reject.
302        """
303        self._setup_link_from_cert()
304        self.cert_l2cap.get_control_channel().send(l2cap_packets.MoveChannelRequestBuilder(2, 0, 0))
305        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeCommandReject())
306
307    @metadata(pts_test_id="L2CAP/LE/CFC/BV-01-C", pts_test_name="LE Credit Based Connection Request - Legacy Peer")
308    def test_le_credit_based_connection_request_legacy_peer(self):
309        """
310        Verify that an IUT sending an LE Credit Based Connection Request to a legacy peer and receiving a Command Reject does not establish the channel.
311        """
312        self._setup_link_from_cert()
313        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
314        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
315        assertThat(response_future.get_status()).isNotEqualTo(LeCreditBasedConnectionResponseResult.SUCCESS)
316
317    @metadata(
318        pts_test_id="L2CAP/LE/CFC/BV-02-C", pts_test_name="LE Credit Based Connection Request on Supported LE_PSM")
319    def test_le_credit_based_connection_request_on_supported_le_psm(self):
320        """
321        Verify that an IUT sending an LE Credit Based Connection Request to a peer will establish the channel upon receiving the LE Credit Based Connection Response.
322        """
323        self._setup_link_from_cert()
324        (dut_channel, cert_channel) = self._open_channel_from_dut()
325        cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
326        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))
327
328    @metadata(
329        pts_test_id="L2CAP/LE/CFC/BV-03-C", pts_test_name="LE Credit Based Connection Response on Supported LE_PSM")
330    def test_credit_based_connection_response_on_supported_le_psm(self):
331        """
332        Verify that an IUT receiving a valid LE Credit Based Connection Request from a peer will send an LE Credit Based Connection Response and establish the channel.
333        """
334        self._setup_link_from_cert()
335        (dut_channel, cert_channel) = self._open_channel_from_cert()
336        dut_channel.send(b'hello')
337        assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
338
339    @metadata(
340        pts_test_id="L2CAP/LE/CFC/BV-04-C", pts_test_name="LE Credit Based Connection Request on an Unsupported LE_PSM")
341    def test_credit_based_connection_request_on_an_unsupported_le_psm(self):
342        """
343        Verify that an IUT sending an LE Credit Based Connection Request on an unsupported LE_PSM will not establish a channel upon receiving an LE Credit Based Connection Response refusing the connection.
344        """
345        self._setup_link_from_cert()
346        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
347        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
348            psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
349        assertThat(response_future.get_status()).isEqualTo(LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
350
351    @metadata(
352        pts_test_id="L2CAP/LE/CFC/BV-05-C", pts_test_name="LE Credit Based Connection Request - unsupported LE_PSM")
353    def test_credit_based_connection_request_unsupported_le_psm(self):
354        """
355        Verify that an IUT receiving an LE Credit Based Connection Request on an unsupported LE_PSM will respond with an LE Credit Based Connection Response refusing the connection.
356        """
357        self._setup_link_from_cert()
358        self.cert_l2cap.get_control_channel().send(
359            l2cap_packets.LeCreditBasedConnectionRequestBuilder(1, 0x34, 0x0101, 2000, 1000, 1000))
360        assertThat(self.cert_l2cap.get_control_channel()).emits(
361            L2capMatchers.CreditBasedConnectionResponse(
362                result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED))
363
364    @metadata(pts_test_id="L2CAP/LE/CFC/BV-06-C", pts_test_name="Credit Exchange – Receiving Incremental Credits")
365    def test_credit_exchange_receiving_incremental_credits(self):
366        """
367        Verify the IUT handles flow control correctly, by handling the LE Flow Control Credit sent by the peer.
368        """
369        self._setup_link_from_cert()
370        (dut_channel, cert_channel) = self._open_channel_from_cert(initial_credit=0)
371        for _ in range(4):
372            dut_channel.send(b'hello')
373        cert_channel.send_credits(1)
374        assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
375        cert_channel.send_credits(1)
376        assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
377        cert_channel.send_credits(2)
378        assertThat(cert_channel).emits(
379            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5), L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
380
381    @metadata(pts_test_id="L2CAP/LE/CFC/BV-07-C", pts_test_name="Credit Exchange – Sending Credits")
382    def test_credit_exchange_sending_credits(self):
383        """
384        Verify that the IUT sends LE Flow Control Credit to the peer.
385        """
386        self._setup_link_from_cert()
387        (dut_channel, cert_channel) = self._open_channel_from_cert()
388        credits = cert_channel.credits_left()
389        # Note: DUT only needs to send credit when ALL credits are consumed.
390        # Here we enforce that DUT sends credit after receiving 3 packets, to
391        # test without sending too many packets (may take too long).
392        # This behavior is not expected for all Bluetooth stacks.
393        for _ in range(min(credits + 1, 3)):
394            cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
395        self.cert_l2cap.verify_le_flow_control_credit(cert_channel)
396
397    @metadata(pts_test_id="L2CAP/LE/CFC/BV-08-C", pts_test_name="Disconnection Request")
398    def test_disconnection_request(self):
399        """
400        Verify that the IUT can disconnect the channel.
401        """
402        self._setup_link_from_cert()
403        (dut_channel, cert_channel) = self._open_channel_from_cert()
404        dut_channel.close_channel()
405        cert_channel.verify_disconnect_request()
406
407    @metadata(pts_test_id="L2CAP/LE/CFC/BV-09-C", pts_test_name="Disconnection Response")
408    def test_disconnection_response(self):
409        """
410        Verify that the IUT responds correctly to reception of a Disconnection Request.
411        """
412        self._setup_link_from_cert()
413        (dut_channel, cert_channel) = self._open_channel_from_cert()
414        cert_channel.disconnect_and_verify()
415
416    @metadata(pts_test_id="L2CAP/LE/CFC/BV-10-C", pts_test_name="Security - Insufficient Authentication – Initiator")
417    def test_security_insufficient_authentication_initiator(self):
418        """
419        Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0005 – Connection Refused – Insufficient Authentication".
420        """
421        self._setup_link_from_cert()
422        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
423        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
424            psm=0x33, result=LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)
425        assertThat(response_future.get_status()).isEqualTo(
426            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)
427
428    @metadata(pts_test_id="L2CAP/LE/CFC/BV-11-C", pts_test_name="Security - Insufficient Authentication – Responder")
429    def test_security_insufficient_authentication_responder(self):
430        """
431        Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection
432Request which fails to satisfy authentication requirements.
433        """
434        self._setup_link_from_cert()
435        psm = 0x33
436        self.dut_l2cap.register_coc(self.cert_address, psm, SecurityLevel.AUTHENTICATED_PAIRING_WITH_ENCRYPTION)
437        self.cert_l2cap.open_channel_with_expected_result(
438            psm, LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)
439
440    @metadata(pts_test_id="L2CAP/LE/CFC/BV-12-C", pts_test_name="Security - Insufficient Authorization – Initiator")
441    def test_security_insufficient_authorization_initiator(self):
442        """
443        Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0006 – Connection Refused – Insufficient Authorization”.
444        """
445        self._setup_link_from_cert()
446        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
447        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
448            psm=0x33, result=LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)
449        assertThat(response_future.get_status()).isEqualTo(
450            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)
451
452    @metadata(pts_test_id="L2CAP/LE/CFC/BV-13-C", pts_test_name="Security - Insufficient Authorization – Responder")
453    def test_security_insufficient_authorization_responder(self):
454        """
455        Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection
456        Request which fails to satisfy authentication requirements.
457        """
458        self._setup_link_from_cert()
459        psm = 0x33
460        self.dut_l2cap.register_coc(self.cert_address, psm, SecurityLevel.AUTHORIZATION)
461        self.cert_l2cap.open_channel_with_expected_result(
462            psm, LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)
463
464    @metadata(pts_test_id="L2CAP/LE/CFC/BV-14-C", pts_test_name="Security - Insufficient Key Size – Initiator")
465    def test_security_insufficient_key_size_initiator(self):
466        """
467        Verify that the IUT does not establish the channel upon receipt of an
468        LE Credit Based Connection Response indicating the connection was
469        refused with Result "0x0007 – Connection Refused – Insufficient
470        Encryption Key Size".
471        """
472        self._setup_link_from_cert()
473        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
474        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
475            psm=0x33, result=LeCreditBasedConnectionResponseResult.INSUFFICIENT_ENCRYPTION_KEY_SIZE)
476        assertThat(response_future.get_status()).isEqualTo(
477            LeCreditBasedConnectionResponseResult.INSUFFICIENT_ENCRYPTION_KEY_SIZE)
478
479    @metadata(
480        pts_test_id="L2CAP/LE/CFC/BV-15-C", pts_test_name="Security - Insufficient Encryption Key Size – Responder")
481    def test_security_insufficient_encryption_key_size_responder(self):
482        """
483        Verify that an IUT refuses to create a connection upon receipt of an LE Credit Based Connection
484        Request which fails to satisfy Encryption Key Size requirements.
485        """
486        self._setup_link_from_cert()
487        psm = 0x33
488        self.dut_l2cap.register_coc(self.cert_address, psm, SecurityLevel.AUTHENTICATED_PAIRING_WITH_128_BIT_KEY)
489        self.cert_l2cap.open_channel_with_expected_result(
490            psm, LeCreditBasedConnectionResponseResult.INSUFFICIENT_ENCRYPTION_KEY_SIZE)
491
492    @metadata(
493        pts_test_id="L2CAP/LE/CFC/BV-16-C",
494        pts_test_name="LE Credit Based Connection Request - refuse due to insufficient resources - Initiator")
495    def test_le_connection_request_insufficient_resources_initiator(self):
496        """
497        Verify that an IUT sending an LE Credit Based Connection Request does
498        not establish the channel upon receiving an LE Credit Based Connection
499        Response refusing the connection with result "0x0004 – Connection
500        refused – no resources available".
501        """
502        self._setup_link_from_cert()
503        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
504        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
505            psm=0x33, result=LeCreditBasedConnectionResponseResult.NO_RESOURCES_AVAILABLE)
506        assertThat(response_future.get_status()).isEqualTo(LeCreditBasedConnectionResponseResult.NO_RESOURCES_AVAILABLE)
507
508    @metadata(
509        pts_test_id="L2CAP/LE/CFC/BV-18-C",
510        pts_test_name="LE Credit Based Connection Request - refused due to Invalid Source CID - Initiator")
511    def test_request_refused_due_to_invalid_source_cid_initiator(self):
512        """
513        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x0009 – Connection refused – Invalid Source CID".
514        """
515        self._setup_link_from_cert()
516        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
517        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
518            psm=0x33, result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID)
519        assertThat(response_future.get_status()).isEqualTo(LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID)
520
521    @metadata(
522        pts_test_id="L2CAP/LE/CFC/BV-19-C",
523        pts_test_name="LE Credit Based Connection Request - refused due to source CID already allocated - Initiator")
524    def test_request_refused_due_to_source_cid_already_allocated_initiator(self):
525        """
526        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000A – Connection refused – Source CID already allocated".
527        """
528        self._setup_link_from_cert()
529        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
530        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
531            psm=0x33, result=LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED)
532        assertThat(response_future.get_status()).isEqualTo(
533            LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED)
534
535    @metadata(
536        pts_test_id="L2CAP/LE/CFC/BV-20-C",
537        pts_test_name="LE Credit Based Connection Response - refused due to Source CID already allocated - Responder")
538    def test_request_refused_due_to_source_cid_already_allocated_responder(self):
539        """
540        Verify that an IUT receiving an LE Credit Based Connection Request for a second channel will refuse the connection with result "0x000A - Connection refused – Source CID already allocated" if it receives a Source CID which is already in use.
541        """
542        self._setup_link_from_cert()
543        (dut_channel, cert_channel) = self._open_channel_from_cert(psm=0x33, scid=0x0101)
544        self.dut_l2cap.register_coc(self.cert_address, psm=0x35)
545        self.cert_l2cap.get_control_channel().send(
546            l2cap_packets.LeCreditBasedConnectionRequestBuilder(2, 0x35, 0x0101, 1000, 1000, 1000))
547        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.CreditBasedConnectionResponseUsedCid())
548
549    @metadata(
550        pts_test_id="L2CAP/LE/CFC/BV-21-C",
551        pts_test_name="LE Credit Based Connection Request - refused due to Unacceptable Parameters - Initiator")
552    def test_request_refused_due_to_unacceptable_parameters_initiator(self):
553        """
554        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000B – Connection refused – Unacceptable Parameters".
555        """
556        self._setup_link_from_cert()
557        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
558        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
559            psm=0x33, result=LeCreditBasedConnectionResponseResult.UNACCEPTABLE_PARAMETERS)
560        assertThat(response_future.get_status()).isEqualTo(
561            LeCreditBasedConnectionResponseResult.UNACCEPTABLE_PARAMETERS)
562
563    @metadata(pts_test_id="L2CAP/LE/CFC/BI-01-C", pts_test_name="Credit Exchange – Exceed Initial Credits")
564    def test_credit_exchange_exceed_initial_credits(self):
565        """
566        Verify that the IUT disconnects the LE Data Channel when the credit count exceeds 65535.
567        """
568        self._setup_link_from_cert()
569        (dut_channel, cert_channel) = self._open_channel_from_cert()
570        cert_channel.send_credits(65535)
571        cert_channel.verify_disconnect_request()
572