1 //! Handles fragmentation & reassembly of ACL packets into whole L2CAP payloads
2 
3 use bt_common::Bluetooth;
4 use bt_packets::hci::PacketBoundaryFlag::{
5     ContinuingFragment, FirstAutomaticallyFlushable, FirstNonAutomaticallyFlushable,
6 };
7 use bt_packets::hci::{AclBuilder, AclChild, AclPacket, BroadcastFlag};
8 use bytes::{Buf, Bytes, BytesMut};
9 use futures::stream::{self, StreamExt};
10 use log::{error, info, warn};
11 use tokio::sync::mpsc::Sender;
12 use tokio::sync::oneshot;
13 use tokio_stream::wrappers::ReceiverStream;
14 
15 const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;
16 
17 pub struct Reassembler {
18     buffer: Option<BytesMut>,
19     remaining: usize,
20     out: Sender<Bytes>,
21 }
22 
23 impl Reassembler {
24     /// Create a new reassembler
new(out: Sender<Bytes>) -> Self25     pub fn new(out: Sender<Bytes>) -> Self {
26         Self { buffer: None, remaining: 0, out }
27     }
28 
29     /// Injest the packet and send out if fully reassembled
on_packet(&mut self, packet: AclPacket)30     pub async fn on_packet(&mut self, packet: AclPacket) {
31         let payload = match packet.specialize() {
32             AclChild::Payload(payload) => payload,
33             AclChild::None => {
34                 info!("dropping ACL packet with empty payload");
35                 return;
36             }
37         };
38 
39         if let BroadcastFlag::ActivePeripheralBroadcast = packet.get_broadcast_flag() {
40             // we do not accept broadcast packets
41             return;
42         }
43 
44         match packet.get_packet_boundary_flag() {
45             FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
46             FirstAutomaticallyFlushable => {
47                 if self.buffer.take().is_some() {
48                     error!("got a start packet without finishing previous reassembly - dropping previous");
49                 }
50 
51                 let full_size = get_l2cap_pdu_size(&payload);
52                 self.remaining = full_size - (payload.len() - L2CAP_BASIC_FRAME_HEADER_LEN);
53                 if self.remaining > 0 {
54                     let mut buffer = BytesMut::with_capacity(full_size);
55                     buffer.extend_from_slice(&payload[..]);
56                     self.buffer = Some(buffer);
57                 } else {
58                     self.out.send(payload).await.unwrap();
59                 }
60             },
61             ContinuingFragment => {
62                 match self.buffer.take() {
63                     None => warn!("got continuation packet without pending reassembly"),
64                     Some(_) if self.remaining < payload.len() => warn!("remote sent unexpected L2CAP PDU - dropping entire packet"),
65                     Some(mut buffer) => {
66                         self.remaining -= payload.len();
67                         buffer.extend_from_slice(&payload[..]);
68                         if self.remaining == 0 {
69                             self.out.send(buffer.freeze()).await.unwrap();
70                         } else {
71                             self.buffer = Some(buffer);
72                         }
73                     }
74                 }
75             },
76         }
77     }
78 }
79 
get_l2cap_pdu_size(first_packet: &Bytes) -> usize80 fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
81     if first_packet.len() <= L2CAP_BASIC_FRAME_HEADER_LEN {
82         error!("invalid l2cap starting packet");
83 
84         0
85     } else {
86         (&first_packet[..]).get_u16_le() as usize
87     }
88 }
89 
fragmenting_stream( rx: ReceiverStream<Bytes>, mtu: usize, handle: u16, bt: Bluetooth, close_rx: oneshot::Receiver<()>, ) -> std::pin::Pin< std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>, >90 pub fn fragmenting_stream(
91     rx: ReceiverStream<Bytes>,
92     mtu: usize,
93     handle: u16,
94     bt: Bluetooth,
95     close_rx: oneshot::Receiver<()>,
96 ) -> std::pin::Pin<
97     std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>,
98 > {
99     rx.flat_map(move |data| {
100         stream::iter(
101             data.chunks(mtu)
102                 .enumerate()
103                 .map(move |(i, chunk)| {
104                     AclBuilder {
105                         handle,
106                         packet_boundary_flag: match bt {
107                             Bluetooth::Classic if i == 0 => FirstAutomaticallyFlushable,
108                             Bluetooth::Le if i == 0 => FirstNonAutomaticallyFlushable,
109                             _ => ContinuingFragment,
110                         },
111                         broadcast_flag: BroadcastFlag::PointToPoint,
112                         payload: Some(Bytes::copy_from_slice(chunk)),
113                     }
114                     .build()
115                 })
116                 .collect::<Vec<AclPacket>>(),
117         )
118     })
119     .take_until(close_rx)
120     .boxed()
121 }
122