1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import timedelta
18
19from cert.gd_base_test import GdBaseTestClass
20from cert.event_stream import EventStream
21from cert.truth import assertThat
22from cert.py_hal import PyHal
23from cert.matchers import HciMatchers
24from cert.captures import HciCaptures
25from google.protobuf import empty_pb2
26from facade import rootservice_pb2 as facade_rootservice_pb2
27from hal import hal_facade_pb2 as hal_facade_pb2
28from bluetooth_packets_python3 import hci_packets
29import bluetooth_packets_python3 as bt_packets
30from bluetooth_packets_python3.hci_packets import AclBuilder
31from bluetooth_packets_python3 import RawBuilder
32
33_GRPC_TIMEOUT = 10
34
35
36class SimpleHalTest(GdBaseTestClass):
37
38    def setup_class(self):
39        super().setup_class(dut_module='HAL', cert_module='HAL')
40
41    def setup_test(self):
42        super().setup_test()
43
44        self.dut_hal = PyHal(self.dut)
45        self.cert_hal = PyHal(self.cert)
46
47        self.dut_hal.reset()
48        self.cert_hal.reset()
49
50    def teardown_test(self):
51        self.dut_hal.close()
52        self.cert_hal.close()
53        super().teardown_test()
54
55    def test_stream_events(self):
56        self.dut_hal.send_hci_command(
57            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
58
59        assertThat(self.dut_hal.get_hci_event_stream()).emits(
60            HciMatchers.Exactly(hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS)))
61
62    def test_loopback_hci_command(self):
63        self.dut_hal.send_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL))
64
65        command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
66                                                              '0C:05:04:03:02:01')
67        self.dut_hal.send_hci_command(command)
68
69        assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LoopbackOf(command))
70
71    def test_inquiry_from_dut(self):
72        self.cert_hal.send_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
73
74        lap = hci_packets.Lap()
75        lap.lap = 0x33
76        self.dut_hal.send_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff))
77
78        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload
79                                                              # Expecting an HCI Event (code 0x02, length 0x0f)
80                                                             )
81
82    def test_le_ad_scan_cert_advertises(self):
83        self.dut_hal.set_random_le_address('0D:05:04:03:02:01')
84
85        self.dut_hal.set_scan_parameters()
86        self.dut_hal.start_scanning()
87
88        advertisement = self.cert_hal.create_advertisement(
89            0,
90            '0C:05:04:03:02:01',
91            min_interval=512,
92            max_interval=768,
93            peer_address='A6:A5:A4:A3:A2:A1',
94            tx_power=0x7f,
95            sid=1)
96        advertisement.set_data(b'Im_A_Cert')
97        advertisement.start()
98
99        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)
100
101        advertisement.stop()
102
103        self.dut_hal.stop_scanning()
104
105    def test_le_connection_dut_advertises(self):
106        self.cert_hal.set_random_le_address('0C:05:04:03:02:01')
107        self.cert_hal.initiate_le_connection('0D:05:04:03:02:01')
108
109        # DUT Advertises
110        advertisement = self.dut_hal.create_advertisement(0, '0D:05:04:03:02:01')
111        advertisement.set_data(b'Im_The_DUT')
112        advertisement.set_scan_response(b'Im_The_D')
113        advertisement.start()
114
115        cert_acl = self.cert_hal.complete_le_connection()
116        dut_acl = self.dut_hal.complete_le_connection()
117
118        dut_acl.send_first(b'Just SomeAclData')
119        cert_acl.send_first(b'Just SomeMoreAclData')
120
121        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
122        assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
123
124    def test_le_connect_list_connection_cert_advertises(self):
125        self.dut_hal.set_random_le_address('0D:05:04:03:02:01')
126        self.dut_hal.add_to_connect_list('0C:05:04:03:02:01')
127        self.dut_hal.initiate_le_connection_by_connect_list('BA:D5:A4:A3:A2:A1')
128
129        advertisement = self.cert_hal.create_advertisement(
130            1,
131            '0C:05:04:03:02:01',
132            min_interval=512,
133            max_interval=768,
134            peer_address='A6:A5:A4:A3:A2:A1',
135            tx_power=0x7F,
136            sid=0)
137        advertisement.set_data(b'Im_A_Cert')
138        advertisement.start()
139
140        assertThat(self.cert_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete())
141        assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete())
142