1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from bluetooth_packets_python3 import hci_packets
20from cert.capture import Capture
21from cert.captures import SecurityCaptures
22from cert.closable import Closable
23from cert.closable import safeClose
24from cert.event_stream import EventStream, IEventStream
25from cert.event_stream import FilteringEventStream
26from cert.matchers import IsoMatchers
27from cert.truth import assertThat
28from datetime import timedelta
29from facade import common_pb2 as common
30from google.protobuf import empty_pb2 as empty_proto
31from iso import facade_pb2 as iso_facade_pb2
32
33
34class CisTestParameters():
35
36    def __init__(self, cis_id, nse, max_sdu_m_to_s, max_sdu_s_to_m, max_pdu_m_to_s, max_pdu_s_to_m, phy_m_to_s,
37                 phy_s_to_m, bn_m_to_s, bn_s_to_m):
38        self.cis_id = cis_id
39        self.nse = nse
40        self.max_sdu_m_to_s = max_sdu_m_to_s
41        self.max_sdu_s_to_m = max_sdu_s_to_m
42        self.max_pdu_m_to_s = max_pdu_m_to_s
43        self.max_pdu_s_to_m = max_pdu_s_to_m
44        self.phy_m_to_s = phy_m_to_s
45        self.phy_s_to_m = phy_s_to_m
46        self.bn_m_to_s = bn_m_to_s
47        self.bn_s_to_m = bn_s_to_m
48
49
50class PyLeIsoStream(IEventStream):
51
52    def __init__(self, device, cis_handle, iso_data_stream):
53        self._device = device
54        self._cis_handle = cis_handle
55        self._le_iso_data_stream = iso_data_stream
56        self._our_le_iso_cis_view = FilteringEventStream(
57            self._le_iso_data_stream, IsoMatchers.PacketPayloadWithMatchingCisHandle(self._cis_handle))
58
59    def get_event_queue(self):
60        return self._our_le_iso_cis_view.get_event_queue()
61
62    def send(self, payload):
63        self._device.iso.SendIsoPacket(iso_facade_pb2.IsoPacket(handle=self._cis_handle, payload=payload))
64
65
66class PyLeIso(Closable):
67    """
68        Abstraction for iso tasks and GRPC calls
69    """
70
71    _iso_event_stream = None
72
73    def __init__(self, device):
74        logging.info("DUT: Init")
75        self._device = device
76        self._device.wait_channel_ready()
77        self._iso_event_stream = EventStream(self._device.iso.FetchIsoEvents(empty_proto.Empty()))
78        self._iso_data_stream = EventStream(self._device.iso.FetchIsoData(empty_proto.Empty()))
79
80    def close(self):
81        if self._iso_event_stream is not None:
82            safeClose(self._iso_event_stream)
83        else:
84            logging.info("DUT: ISO Event Stream is None!")
85        if self._iso_data_stream is not None:
86            safeClose(self._iso_data_stream)
87        else:
88            logging.info("DUT: ISO Data Stream is None!")
89
90        logging.info("DUT: close")
91
92    def le_set_cig_parameters(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy,
93                              packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id,
94                              max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, rtn_m_to_s, rtn_s_to_m):
95
96        resp = self._device.iso.LeSetCigParameters(
97            iso_facade_pb2.LeSetCigParametersRequest(
98                cig_id=cig_id,
99                sdu_interval_m_to_s=sdu_interval_m_to_s,
100                sdu_interval_s_to_m=sdu_interval_s_to_m,
101                peripherals_clock_accuracy=peripherals_clock_accuracy,
102                packing=packing,
103                framing=framing,
104                max_transport_latency_m_to_s=max_transport_latency_m_to_s,
105                max_transport_latency_s_to_m=max_transport_latency_s_to_m,
106                cis_id=cis_id,
107                max_sdu_m_to_s=max_sdu_m_to_s,
108                max_sdu_s_to_m=max_sdu_s_to_m,
109                phy_m_to_s=phy_m_to_s,
110                phy_s_to_m=phy_s_to_m,
111                rtn_m_to_s=rtn_m_to_s,
112                rtn_s_to_m=rtn_s_to_m))
113
114    def le_set_cig_parameters_test(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m,
115                                   iso_interval, peripherals_clock_accuracy, packing, framing,
116                                   max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_configs):
117        configs = []
118        for cc in cis_configs:
119            configs.append(
120                iso_facade_pb2.LeSetCigParametersTestRequest.LeCisParametersTestConfig(
121                    cis_id=cc.cis_id,
122                    nse=cc.nse,
123                    max_sdu_m_to_s=cc.max_sdu_m_to_s,
124                    max_sdu_s_to_m=cc.max_sdu_s_to_m,
125                    max_pdu_m_to_s=cc.max_pdu_m_to_s,
126                    max_pdu_s_to_m=cc.max_pdu_s_to_m,
127                    phy_m_to_s=cc.phy_m_to_s,
128                    phy_s_to_m=cc.phy_s_to_m,
129                    bn_m_to_s=cc.bn_m_to_s,
130                    bn_s_to_m=cc.bn_s_to_m,
131                ))
132
133        resp = self._device.iso.LeSetCigParameters(
134            iso_facade_pb2.LeSetCigParametersTestRequest(
135                cig_id=cig_id,
136                sdu_interval_m_to_s=sdu_interval_m_to_s,
137                sdu_interval_s_to_m=sdu_interval_s_to_m,
138                ft_m_to_s=ft_m_to_s,
139                ft_s_to_m=ft_s_to_m,
140                iso_interval=iso_interval,
141                peripherals_clock_accuracy=peripherals_clock_accuracy,
142                packing=packing,
143                framing=framing,
144                max_transport_latency_m_to_s=max_transport_latency_m_to_s,
145                max_transport_latency_s_to_m=max_transport_latency_s_to_m,
146                cis_configs=configs))
147
148    def wait_le_set_cig_parameters_complete(self):
149        set_cig_params_complete_capture = PyLeIso.IsoCigComplete(iso_facade_pb2.IsoMsgType.ISO_PARAMETERS_SET_COMPLETE)
150
151        assertThat(self._iso_event_stream).emits(set_cig_params_complete_capture, timeout=timedelta(seconds=5))
152        return set_cig_params_complete_capture.get()
153
154    @staticmethod
155    def IsoCigComplete(type=None):
156        return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles)
157
158    @staticmethod
159    def _extract_cis_handles(event):
160        if event is None:
161            return None
162        return event.cis_handle
163
164    def le_create_cis(self, cis_and_acl_handle_array):
165        handles_pairs = []
166        for hp_tmp in cis_and_acl_handle_array:
167            handles_pairs.append(
168                iso_facade_pb2.LeCreateCisRequest.HandlePair(cis_handle=hp_tmp[0], acl_handle=hp_tmp[1]))
169
170        self._device.iso.LeCreateCis(iso_facade_pb2.LeCreateCisRequest(handle_pair=handles_pairs))
171
172    def wait_le_cis_established(self):
173        cis_establshed_capture = PyLeIso.IsoCigEstablished(iso_facade_pb2.IsoMsgType.ISO_CIS_ESTABLISHED)
174        assertThat(self._iso_event_stream).emits(cis_establshed_capture, timeout=timedelta(seconds=5))
175        cis_handle = cis_establshed_capture.get()[0]
176        return PyLeIsoStream(self._device, cis_handle, self._iso_data_stream)
177
178    @staticmethod
179    def IsoCigEstablished(type):
180        return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles)
181