1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import timedelta
18
19from cert.gd_base_test import GdBaseTestClass
20from cert.matchers import HciMatchers, NeighborMatchers
21from cert.py_hci import PyHci
22from cert.truth import assertThat
23from neighbor.cert.py_neighbor import PyNeighbor
24from neighbor.facade import facade_pb2 as neighbor_facade
25from bluetooth_packets_python3 import hci_packets
26from bluetooth_packets_python3.hci_packets import OpCode
27
28
29class NeighborTest(GdBaseTestClass):
30
31    def setup_class(self):
32        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')
33
34    def setup_test(self):
35        super().setup_test()
36        self.cert_hci = PyHci(self.cert, acl_streaming=True)
37        self.cert_hci.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
38        self.cert_name = b'Im_A_Cert'
39        self.cert_address = self.cert_hci.read_own_address()
40        self.cert_name += b'@' + self.cert_address.encode('utf8')
41        self.dut_neighbor = PyNeighbor(self.dut)
42
43    def teardown_test(self):
44        self.cert_hci.close()
45        super().teardown_test()
46
47    def _set_name(self):
48        padded_name = self.cert_name
49        while len(padded_name) < 248:
50            padded_name = padded_name + b'\0'
51        self.cert_hci.send_command(hci_packets.WriteLocalNameBuilder(padded_name))
52
53        assertThat(self.cert_hci.get_event_stream()).emits(HciMatchers.CommandComplete(OpCode.WRITE_LOCAL_NAME))
54
55    def test_inquiry_from_dut(self):
56        inquiry_msg = neighbor_facade.InquiryMsg(
57            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
58            result_mode=neighbor_facade.ResultMode.STANDARD,
59            length_1_28s=3,
60            max_results=0)
61        session = self.dut_neighbor.set_inquiry_mode(inquiry_msg)
62        self.cert_hci.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
63        assertThat(session).emits(NeighborMatchers.InquiryResult(self.cert_address), timeout=timedelta(seconds=10))
64
65    def test_inquiry_rssi_from_dut(self):
66        inquiry_msg = neighbor_facade.InquiryMsg(
67            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
68            result_mode=neighbor_facade.ResultMode.RSSI,
69            length_1_28s=6,
70            max_results=0)
71        session = self.dut_neighbor.set_inquiry_mode(inquiry_msg)
72        self.cert_hci.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
73        assertThat(session).emits(
74            NeighborMatchers.InquiryResultwithRssi(self.cert_address), timeout=timedelta(seconds=10))
75
76    def test_inquiry_extended_from_dut(self):
77        self._set_name()
78        gap_name = hci_packets.GapData()
79        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
80        gap_name.data = list(bytes(self.cert_name))
81        gap_data = list([gap_name])
82
83        self.cert_hci.send_command(
84            hci_packets.WriteExtendedInquiryResponseBuilder(hci_packets.FecRequired.NOT_REQUIRED, gap_data))
85        inquiry_msg = neighbor_facade.InquiryMsg(
86            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
87            result_mode=neighbor_facade.ResultMode.EXTENDED,
88            length_1_28s=8,
89            max_results=0)
90        session = self.dut_neighbor.set_inquiry_mode(inquiry_msg)
91        self.cert_hci.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
92        assertThat(session).emits(
93            NeighborMatchers.ExtendedInquiryResult(self.cert_address), timeout=timedelta(seconds=10))
94
95    def test_remote_name(self):
96        self._set_name()
97        session = self.dut_neighbor.get_remote_name(self.cert_address)
98        session.verify_name(self.cert_name)
99