1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import timedelta 18 19from cert.gd_base_test import GdBaseTestClass 20from cert.event_stream import EventStream 21from cert.truth import assertThat 22from cert.py_hal import PyHal 23from cert.matchers import HciMatchers 24from cert.captures import HciCaptures 25from google.protobuf import empty_pb2 26from facade import rootservice_pb2 as facade_rootservice_pb2 27from hal import hal_facade_pb2 as hal_facade_pb2 28from bluetooth_packets_python3 import hci_packets 29import bluetooth_packets_python3 as bt_packets 30from bluetooth_packets_python3.hci_packets import AclBuilder 31from bluetooth_packets_python3 import RawBuilder 32 33_GRPC_TIMEOUT = 10 34 35 36class SimpleHalTest(GdBaseTestClass): 37 38 def setup_class(self): 39 super().setup_class(dut_module='HAL', cert_module='HAL') 40 41 def setup_test(self): 42 super().setup_test() 43 44 self.dut_hal = PyHal(self.dut) 45 self.cert_hal = PyHal(self.cert) 46 47 self.dut_hal.reset() 48 self.cert_hal.reset() 49 50 def teardown_test(self): 51 self.dut_hal.close() 52 self.cert_hal.close() 53 super().teardown_test() 54 55 def test_stream_events(self): 56 self.dut_hal.send_hci_command( 57 hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) 58 59 assertThat(self.dut_hal.get_hci_event_stream()).emits( 60 HciMatchers.Exactly(hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS))) 61 62 def test_loopback_hci_command(self): 63 self.dut_hal.send_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL)) 64 65 command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, 66 '0C:05:04:03:02:01') 67 self.dut_hal.send_hci_command(command) 68 69 assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LoopbackOf(command)) 70 71 def test_inquiry_from_dut(self): 72 self.cert_hal.send_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 73 74 lap = hci_packets.Lap() 75 lap.lap = 0x33 76 self.dut_hal.send_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff)) 77 78 assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload 79 # Expecting an HCI Event (code 0x02, length 0x0f) 80 ) 81 82 def test_le_ad_scan_cert_advertises(self): 83 self.dut_hal.set_random_le_address('0D:05:04:03:02:01') 84 85 self.dut_hal.set_scan_parameters() 86 self.dut_hal.start_scanning() 87 88 advertisement = self.cert_hal.create_advertisement( 89 0, 90 '0C:05:04:03:02:01', 91 min_interval=512, 92 max_interval=768, 93 peer_address='A6:A5:A4:A3:A2:A1', 94 tx_power=0x7f, 95 sid=1) 96 advertisement.set_data(b'Im_A_Cert') 97 advertisement.start() 98 99 assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) 100 101 advertisement.stop() 102 103 self.dut_hal.stop_scanning() 104 105 def test_le_connection_dut_advertises(self): 106 self.cert_hal.set_random_le_address('0C:05:04:03:02:01') 107 self.cert_hal.initiate_le_connection('0D:05:04:03:02:01') 108 109 # DUT Advertises 110 advertisement = self.dut_hal.create_advertisement(0, '0D:05:04:03:02:01') 111 advertisement.set_data(b'Im_The_DUT') 112 advertisement.set_scan_response(b'Im_The_D') 113 advertisement.start() 114 115 cert_acl = self.cert_hal.complete_le_connection() 116 dut_acl = self.dut_hal.complete_le_connection() 117 118 dut_acl.send_first(b'Just SomeAclData') 119 cert_acl.send_first(b'Just SomeMoreAclData') 120 121 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 122 assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 123 124 def test_le_connect_list_connection_cert_advertises(self): 125 self.dut_hal.set_random_le_address('0D:05:04:03:02:01') 126 self.dut_hal.add_to_connect_list('0C:05:04:03:02:01') 127 self.dut_hal.initiate_le_connection_by_connect_list('BA:D5:A4:A3:A2:A1') 128 129 advertisement = self.cert_hal.create_advertisement( 130 1, 131 '0C:05:04:03:02:01', 132 min_interval=512, 133 max_interval=768, 134 peer_address='A6:A5:A4:A3:A2:A1', 135 tx_power=0x7F, 136 sid=0) 137 advertisement.set_data(b'Im_A_Cert') 138 advertisement.start() 139 140 assertThat(self.cert_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) 141 assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) 142