1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18 19from cert.event_stream import EventStream 20from cert.event_stream import IEventStream 21from cert.captures import HciCaptures 22from cert.closable import Closable 23from cert.closable import safeClose 24from bluetooth_packets_python3 import hci_packets 25from cert.truth import assertThat 26from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade 27 28 29class PyLeAclManagerAclConnection(IEventStream, Closable): 30 31 def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): 32 """ 33 An abstract representation for an LE ACL connection in GD certification test 34 :param le_acl_manager: The LeAclManager from this GD device 35 :param address: The local device address 36 :param remote_addr: Remote device address 37 :param handle: Connection handle 38 :param event_stream: The connection event stream for this connection 39 """ 40 self.le_acl_manager = le_acl_manager 41 # todo enable filtering after sorting out handles 42 # self.our_acl_stream = FilteringEventStream(acl_stream, None) 43 self.handle = handle 44 self.connection_event_stream = event_stream 45 self.acl_stream = EventStream( 46 self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) 47 self.remote_address = remote_addr 48 self.own_address = address 49 self.disconnect_reason = None 50 51 def close(self): 52 safeClose(self.connection_event_stream) 53 safeClose(self.acl_stream) 54 55 def wait_for_disconnection_complete(self): 56 disconnection_complete = HciCaptures.DisconnectionCompleteCapture() 57 assertThat(self.connection_event_stream).emits(disconnection_complete) 58 self.disconnect_reason = disconnection_complete.get().GetReason() 59 60 def send(self, data): 61 self.le_acl_manager.SendAclData(le_acl_manager_facade.LeAclData(handle=self.handle, payload=bytes(data))) 62 63 def get_event_queue(self): 64 return self.acl_stream.get_event_queue() 65 66 67class PyLeAclManager(Closable): 68 69 def __init__(self, device): 70 """ 71 LE ACL Manager for GD Certification test 72 :param device: The GD device 73 """ 74 self.le_acl_manager = device.hci_le_acl_manager 75 76 self.incoming_connection_event_stream = None 77 self.outgoing_connection_event_streams = {} 78 self.active_connections = [] 79 self.next_token = 1 80 81 def close(self): 82 safeClose(self.incoming_connection_event_stream) 83 for v in self.outgoing_connection_event_streams.values(): 84 safeClose(v[0]) 85 for connection in self.active_connections: 86 safeClose(connection) 87 88 def listen_for_incoming_connections(self): 89 assertThat(self.incoming_connection_event_stream).isNone() 90 self.incoming_connection_event_stream = EventStream( 91 self.le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) 92 93 def connect_to_remote(self, remote_addr): 94 token = self.initiate_connection(remote_addr) 95 return self.complete_outgoing_connection(token) 96 97 def wait_for_connection(self): 98 self.listen_for_incoming_connections() 99 return self.complete_incoming_connection() 100 101 def cancel_connection(self, token): 102 assertThat(token in self.outgoing_connection_event_streams).isTrue() 103 pair = self.outgoing_connection_event_streams.pop(token) 104 safeClose(pair[0]) 105 self.le_acl_manager.CancelConnection(pair[1]) 106 107 def initiate_connection(self, remote_addr): 108 assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() 109 self.outgoing_connection_event_streams[self.next_token] = EventStream( 110 self.le_acl_manager.CreateConnection(remote_addr)), remote_addr 111 token = self.next_token 112 self.next_token += 1 113 return token 114 115 def complete_connection(self, event_stream): 116 connection_complete = HciCaptures.LeConnectionCompleteCapture() 117 assertThat(event_stream).emits(connection_complete) 118 complete = connection_complete.get() 119 handle = complete.GetConnectionHandle() 120 remote = complete.GetPeerAddress() 121 if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: 122 address = complete.GetLocalResolvablePrivateAddress() 123 else: 124 address = None 125 connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) 126 self.active_connections.append(connection) 127 return connection 128 129 def complete_incoming_connection(self): 130 assertThat(self.incoming_connection_event_stream).isNotNone() 131 event_stream = self.incoming_connection_event_stream 132 self.incoming_connection_event_stream = None 133 return self.complete_connection(event_stream) 134 135 def complete_outgoing_connection(self, token): 136 assertThat(self.outgoing_connection_event_streams[token]).isNotNone() 137 event_stream = self.outgoing_connection_event_streams.pop(token)[0] 138 return self.complete_connection(event_stream) 139