#!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from cert.closable import Closable from cert.closable import safeClose from cert.py_le_acl_manager import PyLeAclManager from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import LeCommandCode from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures from mobly import asserts class CertLeL2capChannel(IEventStream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, initial_credits=0): self._device = device self._scid = scid self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid)) self._credits_left = initial_credits def get_event_queue(self): return self._our_acl_view.get_event_queue() def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) self._credits_left -= 1 def send_first_le_i_frame(self, sdu_size, packet): frame = l2cap_packets.FirstLeInformationFrameBuilder(self._dcid, sdu_size, packet) self._acl.send(frame.Serialize()) self._credits_left -= 1 def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) self._control_channel.send(l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid, self._scid)) assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionResponse(self._scid, self._dcid)) def verify_disconnect_request(self): assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionRequest(self._dcid, self._scid)) def send_credits(self, num_credits): self._control_channel.send(l2cap_packets.LeFlowControlCreditBuilder(2, self._scid, num_credits)) def credits_left(self): return self._credits_left class CertLeL2cap(Closable): def __init__(self, device): self._device = device self._le_acl_manager = PyLeAclManager(device) self._le_acl = None self.control_table = { LeCommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default, LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, LeCommandCode.LE_FLOW_CONTROL_CREDIT: self._on_credit, } self._cid_to_cert_channels = {} def close(self): self._le_acl_manager.close() safeClose(self._le_acl) def connect_le_acl(self, remote_addr): self._le_acl = self._le_acl_manager.connect_to_remote(remote_addr) self.control_channel = CertLeL2capChannel( self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def wait_for_connection(self): self._le_acl = self._le_acl_manager.wait_for_connection() self.control_channel = CertLeL2capChannel( self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def open_fixed_channel(self, cid=4): channel = CertLeL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._le_acl, None, 0) return channel def open_channel(self, signal_id, psm, scid, mtu=1000, mps=100, initial_credit=6): self.control_channel.send( l2cap_packets.LeCreditBasedConnectionRequestBuilder(signal_id, psm, scid, mtu, mps, initial_credit)) response = L2capCaptures.CreditBasedConnectionResponse() assertThat(self.control_channel).emits(response) channel = CertLeL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._le_acl, self.control_channel, response.get().GetInitialCredits()) self._cid_to_cert_channels[scid] = channel return channel def open_channel_with_expected_result(self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS): self.control_channel.send(l2cap_packets.LeCreditBasedConnectionRequestBuilder(1, psm, 0x40, 1000, 100, 6)) response = L2capMatchers.CreditBasedConnectionResponse(result) assertThat(self.control_channel).emits(response) def verify_and_respond_open_channel_from_remote(self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS, our_scid=None): request = L2capCaptures.CreditBasedConnectionRequest(psm) assertThat(self.control_channel).emits(request) (scid, dcid) = self._respond_connection_request_default(request.get(), result, our_scid) channel = CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl, self.control_channel, request.get().GetInitialCredits()) self._cid_to_cert_channels[scid] = channel return channel def verify_and_reject_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.CreditBasedConnectionRequest(psm) assertThat(self.control_channel).emits(request) sid = request.get().GetIdentifier() reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid) self.control_channel.send(reject) def verify_le_flow_control_credit(self, channel): assertThat(self.control_channel).emits(L2capMatchers.LeFlowControlCredit(channel._dcid)) def _respond_connection_request_default(self, request, result=LeCreditBasedConnectionResponseResult.SUCCESS, our_scid=None): sid = request.GetIdentifier() their_scid = request.GetSourceCid() mtu = request.GetMtu() mps = request.GetMps() initial_credits = request.GetInitialCredits() # If our_scid is not specified, we use the same value - their scid as their scid if our_scid is None: our_scid = their_scid our_dcid = their_scid response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(sid, our_scid, mtu, mps, initial_credits, result) self.control_channel.send(response) return (our_scid, our_dcid) def get_control_channel(self): return self.control_channel def _get_acl_stream(self): return self._le_acl.acl_stream def _on_disconnection_request_default(self, request): disconnection_request = l2cap_packets.LeDisconnectionRequestView(request) sid = disconnection_request.GetIdentifier() scid = disconnection_request.GetSourceCid() dcid = disconnection_request.GetDestinationCid() response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid) self.control_channel.send(response) def _on_disconnection_response_default(self, request): disconnection_response = l2cap_packets.LeDisconnectionResponseView(request) def _on_credit(self, l2cap_le_control_view): credit_view = l2cap_packets.LeFlowControlCreditView(l2cap_le_control_view) cid = credit_view.GetCid() if cid not in self._cid_to_cert_channels: return self._cid_to_cert_channels[cid]._credits_left += credit_view.GetCredits() def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 5: return request = l2cap_packets.LeControlView(l2cap_view.GetPayload()) fn = self.control_table.get(request.GetCode()) if fn is not None: fn(request) return