1 //! ACL core dispatch shared between LE and classic
2
3 use crate::acl::fragment::{fragmenting_stream, Reassembler};
4 use bt_common::Bluetooth::{self, Classic, Le};
5 use bt_hal::AclHal;
6 use bt_hci::{ControllerExports, EventRegistry};
7 use bt_packets::hci::EventChild::{DisconnectionComplete, NumberOfCompletedPackets};
8 use bt_packets::hci::{AclPacket, EventCode, EventPacket};
9 use bytes::Bytes;
10 use futures::stream::{SelectAll, StreamExt};
11 use gddi::{module, provides, Stoppable};
12 use log::info;
13 use std::collections::HashMap;
14 use std::sync::Arc;
15 use tokio::runtime::Runtime;
16 use tokio::select;
17 use tokio::sync::mpsc::{channel, Receiver, Sender};
18 use tokio::sync::{oneshot, Mutex};
19 use tokio_stream::wrappers::ReceiverStream;
20
21 module! {
22 core_module,
23 providers {
24 AclDispatch => provide_acl_dispatch,
25 },
26 }
27
28 /// A basic ACL connection
29 #[derive(Debug)]
30 pub struct Connection {
31 pub rx: Option<Receiver<Bytes>>,
32 pub tx: Option<Sender<Bytes>>,
33 handle: u16,
34 requests: Sender<Request>,
35 pub evt_rx: Receiver<EventPacket>,
36 pub evt_tx: Sender<EventPacket>,
37 }
38
39 struct ConnectionInternal {
40 reassembler: Reassembler,
41 bt: Bluetooth,
42 close_tx: oneshot::Sender<()>,
43 evt_tx: Sender<EventPacket>,
44 }
45
46 /// Manages rx and tx for open ACL connections
47 #[derive(Clone, Stoppable)]
48 pub struct AclDispatch {
49 requests: Sender<Request>,
50 }
51
52 impl AclDispatch {
53 /// Register the provided connection with the ACL dispatch
54 #[allow(dead_code)]
register(&mut self, handle: u16, bt: Bluetooth) -> Connection55 pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
56 let (tx, rx) = oneshot::channel();
57 self.requests.send(Request::Register { handle, bt, fut: tx }).await.unwrap();
58 rx.await.unwrap()
59 }
60 }
61
62 #[derive(Debug)]
63 enum Request {
64 Register { handle: u16, bt: Bluetooth, fut: oneshot::Sender<Connection> },
65 }
66
67 const QCOM_DEBUG_HANDLE: u16 = 0xedc;
68
69 #[provides]
provide_acl_dispatch( acl: AclHal, controller: Arc<ControllerExports>, mut events: EventRegistry, rt: Arc<Runtime>, ) -> AclDispatch70 async fn provide_acl_dispatch(
71 acl: AclHal,
72 controller: Arc<ControllerExports>,
73 mut events: EventRegistry,
74 rt: Arc<Runtime>,
75 ) -> AclDispatch {
76 let (req_tx, mut req_rx) = channel::<Request>(10);
77 let req_tx_clone = req_tx.clone();
78
79 rt.spawn(async move {
80 let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
81 let mut classic_outbound = SelectAll::new();
82 let mut classic_credits = controller.acl_buffers;
83 let mut le_outbound = SelectAll::new();
84 let mut le_credits: u16 = controller.le_buffers.into();
85
86 let (evt_tx, mut evt_rx) = channel(3);
87 events.register(EventCode::NumberOfCompletedPackets, evt_tx.clone()).await;
88 events.register(EventCode::DisconnectionComplete, evt_tx).await;
89
90 loop {
91 select! {
92 Some(req) = req_rx.recv() => {
93 match req {
94 Request::Register { handle, bt, fut } => {
95 let (out_tx, out_rx) = channel(10);
96 let (in_tx, in_rx) = channel(10);
97 let (evt_tx, evt_rx) = channel(3);
98 let (close_tx, close_rx) = oneshot::channel();
99
100 assert!(connections.insert(
101 handle,
102 ConnectionInternal {
103 reassembler: Reassembler::new(out_tx),
104 bt,
105 close_tx,
106 evt_tx: evt_tx.clone(),
107 }).is_none());
108
109 match bt {
110 Classic => {
111 classic_outbound.push(fragmenting_stream(
112 ReceiverStream::new(in_rx), controller.acl_buffer_length.into(), handle, bt, close_rx));
113 },
114 Le => {
115 le_outbound.push(fragmenting_stream(
116 ReceiverStream::new(in_rx), controller.le_buffer_length.into(), handle, bt, close_rx));
117 },
118 }
119
120 fut.send(Connection {
121 rx: Some(out_rx),
122 tx: Some(in_tx),
123 handle,
124 requests: req_tx_clone.clone(),
125 evt_rx,
126 evt_tx,
127 }).unwrap();
128 },
129 }
130 },
131 Some(p) = consume(&acl.rx) => {
132 match connections.get_mut(&p.get_handle()) {
133 Some(c) => c.reassembler.on_packet(p).await,
134 None if p.get_handle() == QCOM_DEBUG_HANDLE => {},
135 None => info!("no acl for {}", p.get_handle()),
136 }
137 },
138 Some(p) = classic_outbound.next(), if classic_credits > 0 => {
139 acl.tx.send(p).await.unwrap();
140 classic_credits -= 1;
141 },
142 Some(p) = le_outbound.next(), if le_credits > 0 => {
143 acl.tx.send(p).await.unwrap();
144 le_credits -= 1;
145 },
146 Some(evt) = evt_rx.recv() => {
147 match evt.specialize() {
148 NumberOfCompletedPackets(evt) => {
149 for entry in evt.get_completed_packets() {
150 match connections.get(&entry.connection_handle) {
151 Some(conn) => {
152 let credits = entry.host_num_of_completed_packets;
153 match conn.bt {
154 Classic => classic_credits += credits,
155 Le => le_credits += credits,
156 }
157 assert!(classic_credits <= controller.acl_buffers);
158 assert!(le_credits <= controller.le_buffers.into());
159 },
160 None => info!("dropping credits for unknown connection {}", entry.connection_handle),
161 }
162 }
163 },
164 DisconnectionComplete(evt) => {
165 if let Some(c) = connections.remove(&evt.get_connection_handle()) {
166 c.close_tx.send(()).unwrap();
167 c.evt_tx.send(evt.into()).await.unwrap();
168 }
169 },
170 _ => unimplemented!(),
171 }
172 },
173 }
174 }
175 });
176
177 AclDispatch { requests: req_tx }
178 }
179
consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket>180 async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
181 rx.lock().await.recv().await
182 }
183