1 //! Handles fragmentation & reassembly of ACL packets into whole L2CAP payloads
2
3 use bt_common::Bluetooth;
4 use bt_packets::hci::PacketBoundaryFlag::{
5 ContinuingFragment, FirstAutomaticallyFlushable, FirstNonAutomaticallyFlushable,
6 };
7 use bt_packets::hci::{AclBuilder, AclChild, AclPacket, BroadcastFlag};
8 use bytes::{Buf, Bytes, BytesMut};
9 use futures::stream::{self, StreamExt};
10 use log::{error, info, warn};
11 use tokio::sync::mpsc::Sender;
12 use tokio::sync::oneshot;
13 use tokio_stream::wrappers::ReceiverStream;
14
15 const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;
16
17 pub struct Reassembler {
18 buffer: Option<BytesMut>,
19 remaining: usize,
20 out: Sender<Bytes>,
21 }
22
23 impl Reassembler {
24 /// Create a new reassembler
new(out: Sender<Bytes>) -> Self25 pub fn new(out: Sender<Bytes>) -> Self {
26 Self { buffer: None, remaining: 0, out }
27 }
28
29 /// Injest the packet and send out if fully reassembled
on_packet(&mut self, packet: AclPacket)30 pub async fn on_packet(&mut self, packet: AclPacket) {
31 let payload = match packet.specialize() {
32 AclChild::Payload(payload) => payload,
33 AclChild::None => {
34 info!("dropping ACL packet with empty payload");
35 return;
36 }
37 };
38
39 if let BroadcastFlag::ActivePeripheralBroadcast = packet.get_broadcast_flag() {
40 // we do not accept broadcast packets
41 return;
42 }
43
44 match packet.get_packet_boundary_flag() {
45 FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
46 FirstAutomaticallyFlushable => {
47 if self.buffer.take().is_some() {
48 error!("got a start packet without finishing previous reassembly - dropping previous");
49 }
50
51 let full_size = get_l2cap_pdu_size(&payload);
52 self.remaining = full_size - (payload.len() - L2CAP_BASIC_FRAME_HEADER_LEN);
53 if self.remaining > 0 {
54 let mut buffer = BytesMut::with_capacity(full_size);
55 buffer.extend_from_slice(&payload[..]);
56 self.buffer = Some(buffer);
57 } else {
58 self.out.send(payload).await.unwrap();
59 }
60 },
61 ContinuingFragment => {
62 match self.buffer.take() {
63 None => warn!("got continuation packet without pending reassembly"),
64 Some(_) if self.remaining < payload.len() => warn!("remote sent unexpected L2CAP PDU - dropping entire packet"),
65 Some(mut buffer) => {
66 self.remaining -= payload.len();
67 buffer.extend_from_slice(&payload[..]);
68 if self.remaining == 0 {
69 self.out.send(buffer.freeze()).await.unwrap();
70 } else {
71 self.buffer = Some(buffer);
72 }
73 }
74 }
75 },
76 }
77 }
78 }
79
get_l2cap_pdu_size(first_packet: &Bytes) -> usize80 fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
81 if first_packet.len() <= L2CAP_BASIC_FRAME_HEADER_LEN {
82 error!("invalid l2cap starting packet");
83
84 0
85 } else {
86 (&first_packet[..]).get_u16_le() as usize
87 }
88 }
89
fragmenting_stream( rx: ReceiverStream<Bytes>, mtu: usize, handle: u16, bt: Bluetooth, close_rx: oneshot::Receiver<()>, ) -> std::pin::Pin< std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>, >90 pub fn fragmenting_stream(
91 rx: ReceiverStream<Bytes>,
92 mtu: usize,
93 handle: u16,
94 bt: Bluetooth,
95 close_rx: oneshot::Receiver<()>,
96 ) -> std::pin::Pin<
97 std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>,
98 > {
99 rx.flat_map(move |data| {
100 stream::iter(
101 data.chunks(mtu)
102 .enumerate()
103 .map(move |(i, chunk)| {
104 AclBuilder {
105 handle,
106 packet_boundary_flag: match bt {
107 Bluetooth::Classic if i == 0 => FirstAutomaticallyFlushable,
108 Bluetooth::Le if i == 0 => FirstNonAutomaticallyFlushable,
109 _ => ContinuingFragment,
110 },
111 broadcast_flag: BroadcastFlag::PointToPoint,
112 payload: Some(Bytes::copy_from_slice(chunk)),
113 }
114 .build()
115 })
116 .collect::<Vec<AclPacket>>(),
117 )
118 })
119 .take_until(close_rx)
120 .boxed()
121 }
122