1 use bt_common::time::Alarm;
2 use std::collections::VecDeque;
3 use std::process::{Child, Command, Stdio};
4 use std::sync::Arc;
5 use std::time::Duration;
6 use tokio::io::unix::AsyncFd;
7 use tokio::sync::{mpsc, Mutex};
8 use tokio::sync::mpsc::error::SendError;
9
10 #[derive(Debug, PartialEq, Copy, Clone)]
11 pub enum State {
12 Off, // Bluetooth is not running
13 TurningOn, // We are not notified that the Bluetooth is running
14 On, // Bluetooth is running
15 TurningOff, // We are not notified that the Bluetooth is stopped
16 }
17
18 #[derive(Debug)]
19 pub enum StateMachineActions {
20 StartBluetooth(i32),
21 StopBluetooth(i32),
22 BluetoothStarted(i32, i32), // PID and HCI
23 BluetoothStopped(),
24 }
25
26 pub struct StateMachineContext<PM> {
27 tx: mpsc::Sender<StateMachineActions>,
28 rx: mpsc::Receiver<StateMachineActions>,
29 state_machine: ManagerStateMachine<PM>,
30 }
31
32 impl<PM> StateMachineContext<PM> {
new(state_machine: ManagerStateMachine<PM>) -> StateMachineContext<PM> where PM: ProcessManager + Send,33 fn new(state_machine: ManagerStateMachine<PM>) -> StateMachineContext<PM>
34 where
35 PM: ProcessManager + Send,
36 {
37 let (tx, rx) = mpsc::channel::<StateMachineActions>(1);
38 StateMachineContext { tx: tx, rx: rx, state_machine: state_machine }
39 }
40
get_proxy(&self) -> StateMachineProxy41 pub fn get_proxy(&self) -> StateMachineProxy {
42 StateMachineProxy {
43 tx: self.tx.clone(),
44 state: self.state_machine.state.clone(),
45 state_change_observers: self.state_machine.state_change_observers.clone(),
46 }
47 }
48 }
49
start_new_state_machine_context() -> StateMachineContext<NativeSubprocess>50 pub fn start_new_state_machine_context() -> StateMachineContext<NativeSubprocess> {
51 StateMachineContext::new(ManagerStateMachine::new_native())
52 }
53
54 #[derive(Clone)]
55 pub struct StateMachineProxy {
56 tx: mpsc::Sender<StateMachineActions>,
57 state: Arc<Mutex<State>>,
58 state_change_observers: Arc<Mutex<Vec<String>>>,
59 }
60
61 impl StateMachineProxy {
start_bluetooth( &self, hci_interface: i32, ) -> Result<(), SendError<StateMachineActions>>62 pub async fn start_bluetooth(
63 &self,
64 hci_interface: i32,
65 ) -> Result<(), SendError<StateMachineActions>> {
66 self.tx.send(StateMachineActions::StartBluetooth(hci_interface)).await
67 }
68
stop_bluetooth(&self, hci_interface: i32,) -> Result<(), SendError<StateMachineActions>>69 pub async fn stop_bluetooth(&self, hci_interface: i32,) -> Result<(), SendError<StateMachineActions>> {
70 self.tx.send(StateMachineActions::StopBluetooth(hci_interface)).await
71 }
72
get_state(&self) -> State73 pub async fn get_state(&self) -> State {
74 *self.state.lock().await
75 }
76
register_state_change_observer( &self, object_path: String, ) -> Result<(), SendError<StateMachineActions>>77 pub async fn register_state_change_observer(
78 &self,
79 object_path: String,
80 ) -> Result<(), SendError<StateMachineActions>> {
81 self.state_change_observers.lock().await.push(object_path);
82 Ok(())
83 }
84
unregister_state_change_observer( &self, object_path: String, ) -> Result<(), SendError<StateMachineActions>>85 pub async fn unregister_state_change_observer(
86 &self,
87 object_path: String,
88 ) -> Result<(), SendError<StateMachineActions>> {
89 let mut observers = self.state_change_observers.lock().await;
90 let index = observers.iter().position(|x| *x == object_path).unwrap();
91 observers.remove(index);
92 Ok(())
93 }
94 }
95
mainloop<PM>(mut context: StateMachineContext<PM>) where PM: ProcessManager + Send,96 pub async fn mainloop<PM>(mut context: StateMachineContext<PM>)
97 where
98 PM: ProcessManager + Send,
99 {
100 let mut command_timeout = Alarm::new();
101 let mut pid_detector = inotify::Inotify::init().expect("cannot use inotify");
102 pid_detector
103 .add_watch("/var/run", inotify::WatchMask::CREATE | inotify::WatchMask::DELETE)
104 .expect("failed to add watch");
105 let mut pid_async_fd = AsyncFd::new(pid_detector).expect("failed to add async fd");
106 // let mut async_fd = pid_async_fd.readable_mut();
107 // tokio::pin!(async_fd);
108 let command_timeout_duration = Duration::from_secs(2);
109 loop {
110 tokio::select! {
111 Some(action) = context.rx.recv() => {
112 match action {
113 StateMachineActions::StartBluetooth(i) => {
114 match context.state_machine.action_start_bluetooth(i) {
115 true => {
116 command_timeout.reset(command_timeout_duration);
117 },
118 false => (),
119 }
120 },
121 StateMachineActions::StopBluetooth(i) => {
122 match context.state_machine.action_stop_bluetooth(i) {
123 true => command_timeout.reset(command_timeout_duration),
124 false => (),
125 }
126 },
127 StateMachineActions::BluetoothStarted(pid, hci) => {
128 match context.state_machine.action_on_bluetooth_started(pid, hci) {
129 true => command_timeout.cancel(),
130 false => println!("unexpected BluetoothStarted pid{} hci{}", pid, hci),
131 }
132 },
133 StateMachineActions::BluetoothStopped() => {
134 match context.state_machine.action_on_bluetooth_stopped() {
135 true => command_timeout.cancel(),
136 false => {
137 println!("BluetoothStopped");
138 command_timeout.reset(command_timeout_duration);
139 }
140 }
141 },
142 }
143 },
144 _ = command_timeout.expired() => {
145 println!("expired {:?}", *context.state_machine.state.lock().await);
146 let timeout_action = context.state_machine.action_on_command_timeout();
147 match timeout_action {
148 StateMachineTimeoutActions::Noop => (),
149 _ => command_timeout.reset(command_timeout_duration),
150 }
151 },
152 r = pid_async_fd.readable_mut() => {
153 let mut fd_ready = r.unwrap();
154 let mut buffer: [u8; 1024] = [0; 1024];
155 match fd_ready.try_io(|inner| inner.get_mut().read_events(&mut buffer)) {
156 Ok(Ok(events)) => {
157 for event in events {
158 match event.mask {
159 inotify::EventMask::CREATE => {
160 if event.name == Some(std::ffi::OsStr::new("bluetooth.pid")) {
161 let read_result = tokio::fs::read("/var/run/bluetooth.pid").await;
162 match read_result {
163 Ok(v) => {
164 let file_string = String::from_utf8(v).expect("invalid pid file");
165 let mut iter = file_string.split_ascii_whitespace();
166 let pid = match iter.next() {
167 Some(s) => s.parse::<i32>().unwrap(),
168 None => 0
169 };
170 let hci = match iter.next() {
171 Some(s) => s.parse::<i32>().unwrap(),
172 None => 0
173 };
174 context.tx.send(StateMachineActions::BluetoothStarted(pid, hci)).await;
175 },
176 Err(e) => println!("{}", e)
177 }
178 }
179 },
180 inotify::EventMask::DELETE => {
181 if event.name == Some(std::ffi::OsStr::new("bluetooth.pid")) {
182 context.tx.send(StateMachineActions::BluetoothStopped()).await;
183 }
184 },
185 _ => println!("Ignored event {:?}", event.mask)
186 }
187 }
188 }
189 Err(_) | Ok(Err(_)) => panic!("why can't we read while the asyncfd is ready?"),
190 }
191 fd_ready.clear_ready();
192 drop(fd_ready);
193 },
194 }
195 }
196 }
197
198 pub trait ProcessManager {
start(&mut self, hci_interface: String)199 fn start(&mut self, hci_interface: String);
stop(&mut self, hci_interface: String)200 fn stop(&mut self, hci_interface: String);
201 }
202
203 pub struct NativeSubprocess {
204 process_container: Option<Child>,
205 }
206
207 impl NativeSubprocess {
new() -> NativeSubprocess208 pub fn new() -> NativeSubprocess {
209 NativeSubprocess { process_container: None }
210 }
211 }
212
213 impl ProcessManager for NativeSubprocess {
start(&mut self, hci_interface: String)214 fn start(&mut self, hci_interface: String) {
215 self.process_container = Some(
216 Command::new("/usr/bin/touch")
217 .arg("/var/run/bluetooth.pid")
218 .stdout(Stdio::piped())
219 .spawn()
220 .expect("cannot open"),
221 );
222 }
stop(&mut self, hci_interface: String)223 fn stop(&mut self, hci_interface: String) {
224 match self.process_container {
225 Some(ref mut p) => {
226 // TODO: Maybe just SIGINT first, not kill
227 p.kill();
228 self.process_container = None;
229 }
230 None => {
231 println!("Process doesn't exist");
232 }
233 }
234 }
235 }
236
237 pub struct UpstartInvoker {
238 // Upstart version not implemented
239 }
240
241 impl UpstartInvoker {
new() -> UpstartInvoker242 pub fn new() -> UpstartInvoker {
243 UpstartInvoker {}
244 }
245 }
246
247 impl ProcessManager for UpstartInvoker {
start(&mut self, hci_interface: String)248 fn start(&mut self, hci_interface: String) {
249 Command::new("initctl")
250 .arg("start")
251 .arg("bluetooth")
252 .arg(format!("HCI={}", hci_interface))
253 .output()
254 .expect("failed to start bluetooth");
255 }
256
stop(&mut self, hci_interface: String)257 fn stop(&mut self, hci_interface: String) {
258 Command::new("initctl")
259 .arg("stop")
260 .arg("bluetooth")
261 .arg(format!("HCI={}", hci_interface))
262 .output()
263 .expect("failed to stop bluetooth");
264 }
265 }
266
267 struct ManagerStateMachine<PM> {
268 state: Arc<Mutex<State>>,
269 process_manager: PM,
270 state_change_observers: Arc<Mutex<Vec<String>>>,
271 hci_interface: i32,
272 bluetooth_pid: i32,
273 }
274
275 impl ManagerStateMachine<NativeSubprocess> {
new_native() -> ManagerStateMachine<NativeSubprocess>276 pub fn new_native() -> ManagerStateMachine<NativeSubprocess> {
277 ManagerStateMachine::new(NativeSubprocess::new())
278 }
279 }
280
281 #[derive(Debug, PartialEq)]
282 enum StateMachineTimeoutActions {
283 RetryStart,
284 RetryStop,
285 Killed,
286 Noop,
287 }
288
289 impl<PM> ManagerStateMachine<PM>
290 where
291 PM: ProcessManager + Send,
292 {
new(process_manager: PM) -> ManagerStateMachine<PM>293 pub fn new(process_manager: PM) -> ManagerStateMachine<PM> {
294 ManagerStateMachine {
295 state: Arc::new(Mutex::new(State::Off)),
296 process_manager: process_manager,
297 state_change_observers: Arc::new(Mutex::new(Vec::new())),
298 hci_interface: 0,
299 bluetooth_pid: 0,
300 }
301 }
302
303 /// Returns true if we are starting bluetooth process.
action_start_bluetooth(&mut self, hci_interface: i32) -> bool304 pub fn action_start_bluetooth(&mut self, hci_interface: i32) -> bool {
305 let mut state = self.state.try_lock().unwrap(); // TODO hsz: fix me
306 match *state {
307 State::Off => {
308 *state = State::TurningOn;
309 self.hci_interface = hci_interface;
310 self.process_manager.start(format!("hci{}", hci_interface));
311 true
312 }
313 // Otherwise no op
314 _ => false,
315 }
316 }
317
318 /// Returns true if we are stopping bluetooth process.
action_stop_bluetooth(&mut self, hci_interface: i32) -> bool319 pub fn action_stop_bluetooth(&mut self, hci_interface: i32) -> bool {
320 if self.hci_interface != hci_interface {
321 println!("We are running hci{} but attempting to stop hci{}", self.hci_interface, hci_interface);
322 return false
323 }
324
325 let mut state = self.state.try_lock().unwrap(); // TODO hsz: fix me
326 match *state {
327 State::On | State::TurningOn => {
328 *state = State::TurningOff;
329 self.process_manager.stop(self.hci_interface.to_string());
330 true
331 }
332 // Otherwise no op
333 _ => false,
334 }
335 }
336
337 /// Returns true if the event is expected.
action_on_bluetooth_started(&mut self, pid: i32, hci_interface: i32) -> bool338 pub fn action_on_bluetooth_started(&mut self, pid: i32, hci_interface: i32) -> bool {
339 let mut state = self.state.try_lock().unwrap(); // TODO hsz: fix me
340 if self.hci_interface != hci_interface {
341 println!("We should start hci{} but hci{} is started; capturing that process", self.hci_interface, hci_interface);
342 self.hci_interface = hci_interface;
343 }
344 if *state != State::TurningOn {
345 println!("Unexpected Bluetooth started");
346 }
347 *state = State::On;
348 self.bluetooth_pid = pid;
349 true
350 }
351
352 /// Returns true if the event is expected.
353 /// If unexpected, Bluetooth probably crashed;
354 /// start the timer for restart timeout
action_on_bluetooth_stopped(&mut self) -> bool355 pub fn action_on_bluetooth_stopped(&mut self) -> bool {
356 // Need to check if file exists
357 let mut state = self.state.try_lock().unwrap(); // TODO hsz: fix me
358
359 match *state {
360 State::TurningOff => {
361 *state = State::Off;
362 true
363 }
364 State::On => {
365 println!("Bluetooth stopped unexpectedly, try restarting");
366 *state = State::TurningOn;
367 self.process_manager.start(format!("hci{}", self.hci_interface));
368 false
369 }
370 State::TurningOn | State::Off => {
371 // Unexpected
372 panic!("unexpected bluetooth shutdown");
373 }
374 }
375 }
376
377 /// Triggered on Bluetooth start/stop timeout. Return the actions that the
378 /// state machine has taken, for the external context to reset the timer.
action_on_command_timeout(&mut self) -> StateMachineTimeoutActions379 pub fn action_on_command_timeout(&mut self) -> StateMachineTimeoutActions {
380 let mut state = self.state.try_lock().unwrap(); // TODO hsz: fix me
381 match *state {
382 State::TurningOn => {
383 println!("Restarting bluetooth");
384 *state = State::TurningOn;
385 self.process_manager.start(format! {"hci{}", self.hci_interface});
386 StateMachineTimeoutActions::RetryStart
387 }
388 State::TurningOff => {
389 println!("Killing bluetooth");
390
391 *state = State::Off;
392 StateMachineTimeoutActions::RetryStop
393 // kill bluetooth
394 // tx.try_send(StateMachineActions::StopBluetooth());
395 }
396 _ => panic!("Unexpected timeout on {:?}", *state),
397 }
398 }
399 }
400
401 #[cfg(test)]
402 mod tests {
403 use super::*;
404
405 #[derive(Debug, PartialEq)]
406 enum ExecutedCommand {
407 Start,
408 Stop,
409 }
410
411 struct MockProcessManager {
412 last_command: VecDeque<ExecutedCommand>,
413 }
414
415 impl MockProcessManager {
new() -> MockProcessManager416 fn new() -> MockProcessManager {
417 MockProcessManager { last_command: VecDeque::new() }
418 }
419
expect_start(&mut self)420 fn expect_start(&mut self) {
421 self.last_command.push_back(ExecutedCommand::Start);
422 }
423
expect_stop(&mut self)424 fn expect_stop(&mut self) {
425 self.last_command.push_back(ExecutedCommand::Stop);
426 }
427 }
428
429 impl ProcessManager for MockProcessManager {
start(&mut self, hci_interface: String)430 fn start(&mut self, hci_interface: String) {
431 let start = self.last_command.pop_front().expect("Should expect start event");
432 assert_eq!(start, ExecutedCommand::Start);
433 }
434
stop(&mut self, hci_interface: String)435 fn stop(&mut self, hci_interface: String) {
436 let stop = self.last_command.pop_front().expect("Should expect stop event");
437 assert_eq!(stop, ExecutedCommand::Stop);
438 }
439 }
440
441 impl Drop for MockProcessManager {
drop(&mut self)442 fn drop(&mut self) {
443 assert_eq!(self.last_command.len(), 0);
444 }
445 }
446
447 #[test]
initial_state_is_off()448 fn initial_state_is_off() {
449 let process_manager = MockProcessManager::new();
450 let state_machine = ManagerStateMachine::new(process_manager);
451 assert_eq!(*state_machine.state.try_lock().unwrap(), State::Off);
452 }
453
454 #[test]
off_turnoff_should_noop()455 fn off_turnoff_should_noop() {
456 let process_manager = MockProcessManager::new();
457 let mut state_machine = ManagerStateMachine::new(process_manager);
458 state_machine.action_stop_bluetooth(0);
459 assert_eq!(*state_machine.state.try_lock().unwrap(), State::Off);
460 }
461
462 #[test]
off_turnon_should_turningon()463 fn off_turnon_should_turningon() {
464 let mut process_manager = MockProcessManager::new();
465 // Expect to send start command
466 process_manager.expect_start();
467 let mut state_machine = ManagerStateMachine::new(process_manager);
468 state_machine.action_start_bluetooth(0);
469 assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOn);
470 }
471
472 #[test]
turningon_turnon_again_noop()473 fn turningon_turnon_again_noop() {
474 let mut process_manager = MockProcessManager::new();
475 // Expect to send start command just once
476 process_manager.expect_start();
477 let mut state_machine = ManagerStateMachine::new(process_manager);
478 state_machine.action_start_bluetooth(0);
479 assert_eq!(state_machine.action_start_bluetooth(0), false);
480 }
481
482 #[test]
turningon_bluetooth_started()483 fn turningon_bluetooth_started() {
484 let mut process_manager = MockProcessManager::new();
485 process_manager.expect_start();
486 let mut state_machine = ManagerStateMachine::new(process_manager);
487 state_machine.action_start_bluetooth(0);
488 state_machine.action_on_bluetooth_started(0, 0);
489 assert_eq!(*state_machine.state.try_lock().unwrap(), State::On);
490 }
491
492 #[test]
turningon_timeout()493 fn turningon_timeout() {
494 let mut process_manager = MockProcessManager::new();
495 process_manager.expect_start();
496 process_manager.expect_start(); // start bluetooth again
497 let mut state_machine = ManagerStateMachine::new(process_manager);
498 state_machine.action_start_bluetooth(0);
499 assert_eq!(
500 state_machine.action_on_command_timeout(),
501 StateMachineTimeoutActions::RetryStart
502 );
503 assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOn);
504 }
505
506 #[test]
turningon_turnoff_should_turningoff_and_send_command()507 fn turningon_turnoff_should_turningoff_and_send_command() {
508 let mut process_manager = MockProcessManager::new();
509 process_manager.expect_start();
510 // Expect to send stop command
511 process_manager.expect_stop();
512 let mut state_machine = ManagerStateMachine::new(process_manager);
513 state_machine.action_start_bluetooth(0);
514 state_machine.action_stop_bluetooth(0);
515 assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOff);
516 }
517
518 #[test]
on_turnoff_should_turningoff_and_send_command()519 fn on_turnoff_should_turningoff_and_send_command() {
520 let mut process_manager = MockProcessManager::new();
521 process_manager.expect_start();
522 // Expect to send stop command
523 process_manager.expect_stop();
524 let mut state_machine = ManagerStateMachine::new(process_manager);
525 state_machine.action_start_bluetooth(0);
526 state_machine.action_on_bluetooth_started(0, 0);
527 state_machine.action_stop_bluetooth(0);
528 assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOff);
529 }
530
531 #[test]
on_bluetooth_stopped()532 fn on_bluetooth_stopped() {
533 let mut process_manager = MockProcessManager::new();
534 process_manager.expect_start();
535 // Expect to start again
536 process_manager.expect_start();
537 let mut state_machine = ManagerStateMachine::new(process_manager);
538 state_machine.action_start_bluetooth(0);
539 state_machine.action_on_bluetooth_started(0, 0);
540 assert_eq!(state_machine.action_on_bluetooth_stopped(), false);
541 assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOn);
542 }
543
544 #[test]
turningoff_bluetooth_down_should_off()545 fn turningoff_bluetooth_down_should_off() {
546 let mut process_manager = MockProcessManager::new();
547 process_manager.expect_start();
548 process_manager.expect_stop();
549 let mut state_machine = ManagerStateMachine::new(process_manager);
550 state_machine.action_start_bluetooth(0);
551 state_machine.action_on_bluetooth_started(0, 0);
552 state_machine.action_stop_bluetooth(0);
553 state_machine.action_on_bluetooth_stopped();
554 assert_eq!(*state_machine.state.try_lock().unwrap(), State::Off);
555 }
556
557 #[test]
restart_bluetooth()558 fn restart_bluetooth() {
559 let mut process_manager = MockProcessManager::new();
560 process_manager.expect_start();
561 process_manager.expect_stop();
562 process_manager.expect_start();
563 let mut state_machine = ManagerStateMachine::new(process_manager);
564 state_machine.action_start_bluetooth(0);
565 state_machine.action_on_bluetooth_started(0, 0);
566 state_machine.action_stop_bluetooth(0);
567 state_machine.action_on_bluetooth_stopped();
568 state_machine.action_start_bluetooth(0);
569 state_machine.action_on_bluetooth_started(0, 0);
570 assert_eq!(*state_machine.state.try_lock().unwrap(), State::On);
571 }
572 }
573