1 //! ACL core dispatch shared between LE and classic
2 
3 use crate::acl::fragment::{fragmenting_stream, Reassembler};
4 use bt_common::Bluetooth::{self, Classic, Le};
5 use bt_hal::AclHal;
6 use bt_hci::{ControllerExports, EventRegistry};
7 use bt_packets::hci::EventChild::{DisconnectionComplete, NumberOfCompletedPackets};
8 use bt_packets::hci::{AclPacket, EventCode, EventPacket};
9 use bytes::Bytes;
10 use futures::stream::{SelectAll, StreamExt};
11 use gddi::{module, provides, Stoppable};
12 use log::info;
13 use std::collections::HashMap;
14 use std::sync::Arc;
15 use tokio::runtime::Runtime;
16 use tokio::select;
17 use tokio::sync::mpsc::{channel, Receiver, Sender};
18 use tokio::sync::{oneshot, Mutex};
19 use tokio_stream::wrappers::ReceiverStream;
20 
21 module! {
22     core_module,
23     providers {
24         AclDispatch => provide_acl_dispatch,
25     },
26 }
27 
28 /// A basic ACL connection
29 #[derive(Debug)]
30 pub struct Connection {
31     pub rx: Option<Receiver<Bytes>>,
32     pub tx: Option<Sender<Bytes>>,
33     handle: u16,
34     requests: Sender<Request>,
35     pub evt_rx: Receiver<EventPacket>,
36     pub evt_tx: Sender<EventPacket>,
37 }
38 
39 struct ConnectionInternal {
40     reassembler: Reassembler,
41     bt: Bluetooth,
42     close_tx: oneshot::Sender<()>,
43     evt_tx: Sender<EventPacket>,
44 }
45 
46 /// Manages rx and tx for open ACL connections
47 #[derive(Clone, Stoppable)]
48 pub struct AclDispatch {
49     requests: Sender<Request>,
50 }
51 
52 impl AclDispatch {
53     /// Register the provided connection with the ACL dispatch
54     #[allow(dead_code)]
register(&mut self, handle: u16, bt: Bluetooth) -> Connection55     pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
56         let (tx, rx) = oneshot::channel();
57         self.requests.send(Request::Register { handle, bt, fut: tx }).await.unwrap();
58         rx.await.unwrap()
59     }
60 }
61 
62 #[derive(Debug)]
63 enum Request {
64     Register { handle: u16, bt: Bluetooth, fut: oneshot::Sender<Connection> },
65 }
66 
67 const QCOM_DEBUG_HANDLE: u16 = 0xedc;
68 
69 #[provides]
provide_acl_dispatch( acl: AclHal, controller: Arc<ControllerExports>, mut events: EventRegistry, rt: Arc<Runtime>, ) -> AclDispatch70 async fn provide_acl_dispatch(
71     acl: AclHal,
72     controller: Arc<ControllerExports>,
73     mut events: EventRegistry,
74     rt: Arc<Runtime>,
75 ) -> AclDispatch {
76     let (req_tx, mut req_rx) = channel::<Request>(10);
77     let req_tx_clone = req_tx.clone();
78 
79     rt.spawn(async move {
80         let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
81         let mut classic_outbound = SelectAll::new();
82         let mut classic_credits = controller.acl_buffers;
83         let mut le_outbound = SelectAll::new();
84         let mut le_credits: u16 = controller.le_buffers.into();
85 
86         let (evt_tx, mut evt_rx) = channel(3);
87         events.register(EventCode::NumberOfCompletedPackets, evt_tx.clone()).await;
88         events.register(EventCode::DisconnectionComplete, evt_tx).await;
89 
90         loop {
91             select! {
92                 Some(req) = req_rx.recv() => {
93                     match req {
94                         Request::Register { handle, bt, fut } => {
95                             let (out_tx, out_rx) = channel(10);
96                             let (in_tx, in_rx) = channel(10);
97                             let (evt_tx, evt_rx) = channel(3);
98                             let (close_tx, close_rx) = oneshot::channel();
99 
100                             assert!(connections.insert(
101                                 handle,
102                                 ConnectionInternal {
103                                     reassembler: Reassembler::new(out_tx),
104                                     bt,
105                                     close_tx,
106                                     evt_tx: evt_tx.clone(),
107                                 }).is_none());
108 
109                             match bt {
110                                 Classic => {
111                                     classic_outbound.push(fragmenting_stream(
112                                         ReceiverStream::new(in_rx), controller.acl_buffer_length.into(), handle, bt, close_rx));
113                                 },
114                                 Le => {
115                                     le_outbound.push(fragmenting_stream(
116                                         ReceiverStream::new(in_rx), controller.le_buffer_length.into(), handle, bt, close_rx));
117                                 },
118                             }
119 
120                             fut.send(Connection {
121                                 rx: Some(out_rx),
122                                 tx: Some(in_tx),
123                                 handle,
124                                 requests: req_tx_clone.clone(),
125                                 evt_rx,
126                                 evt_tx,
127                             }).unwrap();
128                         },
129                     }
130                 },
131                 Some(p) = consume(&acl.rx) => {
132                     match connections.get_mut(&p.get_handle()) {
133                         Some(c) => c.reassembler.on_packet(p).await,
134                         None if p.get_handle() == QCOM_DEBUG_HANDLE => {},
135                         None => info!("no acl for {}", p.get_handle()),
136                     }
137                 },
138                 Some(p) = classic_outbound.next(), if classic_credits > 0 => {
139                     acl.tx.send(p).await.unwrap();
140                     classic_credits -= 1;
141                 },
142                 Some(p) = le_outbound.next(), if le_credits > 0 => {
143                     acl.tx.send(p).await.unwrap();
144                     le_credits -= 1;
145                 },
146                 Some(evt) = evt_rx.recv() => {
147                     match evt.specialize() {
148                         NumberOfCompletedPackets(evt) => {
149                             for entry in evt.get_completed_packets() {
150                                 match connections.get(&entry.connection_handle) {
151                                     Some(conn) => {
152                                         let credits = entry.host_num_of_completed_packets;
153                                         match conn.bt {
154                                             Classic => classic_credits += credits,
155                                             Le => le_credits += credits,
156                                         }
157                                         assert!(classic_credits <= controller.acl_buffers);
158                                         assert!(le_credits <= controller.le_buffers.into());
159                                     },
160                                     None => info!("dropping credits for unknown connection {}", entry.connection_handle),
161                                 }
162                             }
163                         },
164                         DisconnectionComplete(evt) => {
165                             if let Some(c) = connections.remove(&evt.get_connection_handle()) {
166                                 c.close_tx.send(()).unwrap();
167                                 c.evt_tx.send(evt.into()).await.unwrap();
168                             }
169                         },
170                         _ => unimplemented!(),
171                     }
172                 },
173             }
174         }
175     });
176 
177     AclDispatch { requests: req_tx }
178 }
179 
consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket>180 async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
181     rx.lock().await.recv().await
182 }
183