1 //! Shim out the main thread in the BT stack, to reduce threading dances at the shim boundary
2 
3 use bt_common::init_flags;
4 use std::convert::TryInto;
5 use std::sync::Arc;
6 use std::time::Duration;
7 use tokio::runtime::Runtime;
8 use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
9 
10 #[cxx::bridge(namespace = bluetooth::shim::rust)]
11 mod ffi {
12     unsafe extern "C++" {
13         include!("callbacks/callbacks.h");
14 
15         type OnceClosure;
Run(&self)16         fn Run(&self);
17     }
18 
19     extern "Rust" {
20         type MessageLoopThread;
21 
main_message_loop_thread_create() -> Box<MessageLoopThread>22         fn main_message_loop_thread_create() -> Box<MessageLoopThread>;
main_message_loop_thread_start(thread: &mut MessageLoopThread) -> i3223         fn main_message_loop_thread_start(thread: &mut MessageLoopThread) -> i32;
main_message_loop_thread_do_delayed( thread: &mut MessageLoopThread, closure: UniquePtr<OnceClosure>, delay_ms: i64, )24         fn main_message_loop_thread_do_delayed(
25             thread: &mut MessageLoopThread,
26             closure: UniquePtr<OnceClosure>,
27             delay_ms: i64,
28         );
29     }
30 }
31 
32 unsafe impl Send for ffi::OnceClosure {}
33 
34 pub struct MessageLoopThread {
35     rt: Arc<Runtime>,
36     tx: UnboundedSender<cxx::UniquePtr<ffi::OnceClosure>>,
37 }
38 
main_message_loop_thread_create() -> Box<MessageLoopThread>39 pub fn main_message_loop_thread_create() -> Box<MessageLoopThread> {
40     assert!(init_flags::gd_rust_is_enabled());
41 
42     let rt = crate::stack::RUNTIME.clone();
43     let (tx, mut rx) = unbounded_channel::<cxx::UniquePtr<ffi::OnceClosure>>();
44     rt.spawn(async move {
45         while let Some(c) = rx.recv().await {
46             c.Run();
47         }
48     });
49 
50     Box::new(MessageLoopThread { rt, tx })
51 }
52 
main_message_loop_thread_start(thread: &mut MessageLoopThread) -> i3253 pub fn main_message_loop_thread_start(thread: &mut MessageLoopThread) -> i32 {
54     assert!(init_flags::gd_rust_is_enabled());
55 
56     thread.rt.block_on(async move { nix::unistd::gettid().as_raw() })
57 }
58 
main_message_loop_thread_do_delayed( thread: &mut MessageLoopThread, closure: cxx::UniquePtr<ffi::OnceClosure>, delay_ms: i64, )59 pub fn main_message_loop_thread_do_delayed(
60     thread: &mut MessageLoopThread,
61     closure: cxx::UniquePtr<ffi::OnceClosure>,
62     delay_ms: i64,
63 ) {
64     assert!(init_flags::gd_rust_is_enabled());
65     if delay_ms == 0 {
66         if thread.tx.send(closure).is_err() {
67             log::error!("could not post task - shutting down?");
68         }
69     } else {
70         thread.rt.spawn(async move {
71             // NOTE: tokio's sleep can't wake up the system...
72             // but hey, neither could the message loop from libchrome.
73             //
74             // ...and this way we don't use timerfds arbitrarily.
75             //
76             // #yolo
77             tokio::time::sleep(Duration::from_millis(delay_ms.try_into().unwrap_or(0))).await;
78             closure.Run();
79         });
80     }
81 }
82