1# 2# Copyright 2020 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from datetime import datetime, timedelta 17 18from bluetooth_packets_python3 import RawBuilder 19from cert.matchers import L2capMatchers 20from cert.truth import assertThat 21from cert.performance_test_logger import PerformanceTestLogger 22from l2cap.classic.cert.cert_l2cap import CertL2cap 23from l2cap.classic.cert.l2cap_test import L2capTestBase 24from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode 25from bluetooth_packets_python3.l2cap_packets import FcsType 26from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction 27 28 29class L2capPerformanceTest(L2capTestBase): 30 31 def setup_test(self): 32 super().setup_test() 33 self.performance_test_logger = PerformanceTestLogger() 34 35 def teardown_test(self): 36 super().teardown_test() 37 38 def _basic_mode_tx(self, mtu, packets): 39 """ 40 Send the specified number of packets and return the time interval in ms. 41 """ 42 self._setup_link_from_cert() 43 44 (dut_channel, cert_channel) = self._open_channel_from_cert() 45 self.performance_test_logger.start_interval("TX") 46 for _ in range(packets): 47 dut_channel.send(b'a' * mtu) 48 assertThat(cert_channel).emits( 49 L2capMatchers.Data(b'a' * mtu), at_least_times=packets, timeout=timedelta(seconds=60)) 50 self.performance_test_logger.end_interval("TX") 51 52 duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] 53 self.log.info("Duration: %s" % str(duration)) 54 55 return duration 56 57 def _basic_mode_tx_fixed_interval(self, mtu, interval=timedelta(seconds=10), batch_size=20): 58 """ 59 Send packets as much as possible over a certain interval, and return the 60 number of packets sent 61 """ 62 self._setup_link_from_cert() 63 64 (dut_channel, cert_channel) = self._open_channel_from_cert() 65 start_time = datetime.now() 66 end_time = start_time + interval 67 packets_sent = 0 68 while datetime.now() < end_time: 69 for _ in range(batch_size): 70 dut_channel.send(b'a' * mtu) 71 packets_sent += batch_size 72 assertThat(cert_channel).emits(L2capMatchers.Data(b'a' * mtu), at_least_times=batch_size) 73 74 return packets_sent 75 76 def _basic_mode_rx(self, mtu, packets): 77 self._setup_link_from_cert() 78 79 (dut_channel, cert_channel) = self._open_channel_from_cert() 80 self.performance_test_logger.start_interval("RX") 81 data = b"a" * mtu 82 data_packet = RawBuilder([x for x in data]) 83 for _ in range(packets): 84 cert_channel.send(data_packet) 85 assertThat(dut_channel).emits( 86 L2capMatchers.PacketPayloadRawData(data), at_least_times=packets, timeout=timedelta(seconds=60)) 87 self.performance_test_logger.end_interval("RX") 88 89 duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] 90 self.log.info("Duration: %s" % str(duration)) 91 92 def _ertm_mode_tx(self, mtu, packets, tx_window_size=10): 93 """ 94 Send the specified number of packets and return the time interval in ms. 95 """ 96 # Make sure that number of packets is a multiple of tx_window_size 97 packets = packets // tx_window_size * tx_window_size 98 # For ERTM TX test, we have to do it sequentially because cert needs to ack 99 self._setup_link_from_cert() 100 101 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) 102 103 (dut_channel, cert_channel) = self._open_channel_from_cert( 104 mode=RetransmissionFlowControlMode.ERTM, 105 fcs=FcsType.NO_FCS, 106 req_config_options=config, 107 rsp_config_options=config) 108 109 self.performance_test_logger.start_interval("TX") 110 for i in range(packets): 111 dut_channel.send(b'a' * mtu) 112 if i % tx_window_size == tx_window_size - 1: 113 assertThat(cert_channel).emits(L2capMatchers.IFrame(payload=b'a' * mtu), at_least_times=tx_window_size) 114 cert_channel.send_s_frame(req_seq=(i + 1) % 64, s=SupervisoryFunction.RECEIVER_READY) 115 116 self.performance_test_logger.end_interval("TX") 117 118 duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] 119 self.log.info("Duration: %s" % str(duration)) 120 121 return duration 122 123 def _ertm_mode_rx(self, mtu, packets, tx_window_size=10): 124 # Make sure that number of packets is a multiple of tx_window_size 125 packets = packets // tx_window_size * tx_window_size 126 127 self._setup_link_from_cert() 128 129 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) 130 131 (dut_channel, cert_channel) = self._open_channel_from_cert( 132 mode=RetransmissionFlowControlMode.ERTM, 133 fcs=FcsType.NO_FCS, 134 req_config_options=config, 135 rsp_config_options=config) 136 137 data = b"a" * mtu 138 data_packet = RawBuilder([x for x in data]) 139 self.performance_test_logger.start_interval("RX") 140 for i in range(packets): 141 cert_channel.send_i_frame(tx_seq=i % 64, req_seq=0, payload=data_packet) 142 if i % tx_window_size == (tx_window_size - 1): 143 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=(i + 1) % 64)) 144 self.performance_test_logger.end_interval("RX") 145 146 duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] 147 self.log.info("Duration: %s" % str(duration)) 148 149 def test_basic_mode_tx_672_100(self): 150 duration = self._basic_mode_tx(672, 100) 151 assertThat(duration).isWithin(timedelta(seconds=2)) 152 153 def test_basic_mode_tx_100_100(self): 154 duration = self._basic_mode_tx(100, 100) 155 assertThat(duration).isWithin(timedelta(seconds=2)) 156 157 def test_ertm_mode_tx_672_100(self): 158 duration = self._ertm_mode_tx(672, 100) 159 assertThat(duration).isWithin(timedelta(seconds=5)) 160 161 def test_basic_mode_rx_672_100(self): 162 self._basic_mode_rx(672, 100) 163 164 def test_ertm_mode_rx_672_100(self): 165 self._ertm_mode_rx(672, 100) 166 167 def test_basic_mode_end_to_end_latency(self): 168 self._setup_link_from_cert() 169 170 (dut_channel, cert_channel) = self._open_channel_from_cert() 171 172 data = b"a" * 100 173 data_packet = RawBuilder([x for x in data]) 174 for i in range(100): 175 self.performance_test_logger.start_interval("RX") 176 cert_channel.send(data_packet) 177 assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(data)) 178 self.performance_test_logger.end_interval("RX") 179 duration = self.performance_test_logger.get_duration_of_intervals("RX") 180 mean = sum(duration, timedelta()) / len(duration) 181 self.log.info("Mean: %s" % str(mean)) 182 183 def test_basic_mode_number_of_packets_10_seconds_672(self): 184 number_packets = self._basic_mode_tx_fixed_interval(672) 185 # Requiring that 500 packets (20ms period on average) are sent 186 self.log.info("Packets sent: %d" % number_packets) 187 assertThat(number_packets > 500).isTrue() 188