1 use bt_common::time::Alarm;
2 use std::collections::VecDeque;
3 use std::process::{Child, Command, Stdio};
4 use std::sync::Arc;
5 use std::time::Duration;
6 use tokio::io::unix::AsyncFd;
7 use tokio::sync::{mpsc, Mutex};
8 use tokio::sync::mpsc::error::SendError;
9 
10 #[derive(Debug, PartialEq, Copy, Clone)]
11 pub enum State {
12     Off,        // Bluetooth is not running
13     TurningOn,  // We are not notified that the Bluetooth is running
14     On,         // Bluetooth is running
15     TurningOff, // We are not notified that the Bluetooth is stopped
16 }
17 
18 #[derive(Debug)]
19 pub enum StateMachineActions {
20     StartBluetooth(i32),
21     StopBluetooth(i32),
22     BluetoothStarted(i32, i32),  // PID and HCI
23     BluetoothStopped(),
24 }
25 
26 pub struct StateMachineContext<PM> {
27     tx: mpsc::Sender<StateMachineActions>,
28     rx: mpsc::Receiver<StateMachineActions>,
29     state_machine: ManagerStateMachine<PM>,
30 }
31 
32 impl<PM> StateMachineContext<PM> {
new(state_machine: ManagerStateMachine<PM>) -> StateMachineContext<PM> where PM: ProcessManager + Send,33     fn new(state_machine: ManagerStateMachine<PM>) -> StateMachineContext<PM>
34     where
35         PM: ProcessManager + Send,
36     {
37         let (tx, rx) = mpsc::channel::<StateMachineActions>(1);
38         StateMachineContext { tx: tx, rx: rx, state_machine: state_machine }
39     }
40 
get_proxy(&self) -> StateMachineProxy41     pub fn get_proxy(&self) -> StateMachineProxy {
42         StateMachineProxy {
43             tx: self.tx.clone(),
44             state: self.state_machine.state.clone(),
45             state_change_observers: self.state_machine.state_change_observers.clone(),
46         }
47     }
48 }
49 
start_new_state_machine_context() -> StateMachineContext<NativeSubprocess>50 pub fn start_new_state_machine_context() -> StateMachineContext<NativeSubprocess> {
51     StateMachineContext::new(ManagerStateMachine::new_native())
52 }
53 
54 #[derive(Clone)]
55 pub struct StateMachineProxy {
56     tx: mpsc::Sender<StateMachineActions>,
57     state: Arc<Mutex<State>>,
58     state_change_observers: Arc<Mutex<Vec<String>>>,
59 }
60 
61 impl StateMachineProxy {
start_bluetooth( &self, hci_interface: i32, ) -> Result<(), SendError<StateMachineActions>>62     pub async fn start_bluetooth(
63         &self,
64         hci_interface: i32,
65     ) -> Result<(), SendError<StateMachineActions>> {
66         self.tx.send(StateMachineActions::StartBluetooth(hci_interface)).await
67     }
68 
stop_bluetooth(&self, hci_interface: i32,) -> Result<(), SendError<StateMachineActions>>69     pub async fn stop_bluetooth(&self, hci_interface: i32,) -> Result<(), SendError<StateMachineActions>> {
70         self.tx.send(StateMachineActions::StopBluetooth(hci_interface)).await
71     }
72 
get_state(&self) -> State73     pub async fn get_state(&self) -> State {
74         *self.state.lock().await
75     }
76 
register_state_change_observer( &self, object_path: String, ) -> Result<(), SendError<StateMachineActions>>77     pub async fn register_state_change_observer(
78         &self,
79         object_path: String,
80     ) -> Result<(), SendError<StateMachineActions>> {
81         self.state_change_observers.lock().await.push(object_path);
82         Ok(())
83     }
84 
unregister_state_change_observer( &self, object_path: String, ) -> Result<(), SendError<StateMachineActions>>85     pub async fn unregister_state_change_observer(
86         &self,
87         object_path: String,
88     ) -> Result<(), SendError<StateMachineActions>> {
89         let mut observers = self.state_change_observers.lock().await;
90         let index = observers.iter().position(|x| *x == object_path).unwrap();
91         observers.remove(index);
92         Ok(())
93     }
94 }
95 
mainloop<PM>(mut context: StateMachineContext<PM>) where PM: ProcessManager + Send,96 pub async fn mainloop<PM>(mut context: StateMachineContext<PM>)
97 where
98     PM: ProcessManager + Send,
99 {
100     let mut command_timeout = Alarm::new();
101     let mut pid_detector = inotify::Inotify::init().expect("cannot use inotify");
102     pid_detector
103         .add_watch("/var/run", inotify::WatchMask::CREATE | inotify::WatchMask::DELETE)
104         .expect("failed to add watch");
105     let mut pid_async_fd = AsyncFd::new(pid_detector).expect("failed to add async fd");
106     // let mut async_fd = pid_async_fd.readable_mut();
107     // tokio::pin!(async_fd);
108     let command_timeout_duration = Duration::from_secs(2);
109     loop {
110         tokio::select! {
111             Some(action) = context.rx.recv() => {
112               match action {
113                 StateMachineActions::StartBluetooth(i) => {
114                     match context.state_machine.action_start_bluetooth(i) {
115                         true => {
116                             command_timeout.reset(command_timeout_duration);
117                         },
118                         false => (),
119                     }
120                 },
121                 StateMachineActions::StopBluetooth(i) => {
122                   match context.state_machine.action_stop_bluetooth(i) {
123                       true => command_timeout.reset(command_timeout_duration),
124                       false => (),
125                   }
126                 },
127                 StateMachineActions::BluetoothStarted(pid, hci) => {
128                   match context.state_machine.action_on_bluetooth_started(pid, hci) {
129                       true => command_timeout.cancel(),
130                       false => println!("unexpected BluetoothStarted pid{} hci{}", pid, hci),
131                   }
132                 },
133                 StateMachineActions::BluetoothStopped() => {
134                   match context.state_machine.action_on_bluetooth_stopped() {
135                       true => command_timeout.cancel(),
136                       false => {
137                         println!("BluetoothStopped");
138                           command_timeout.reset(command_timeout_duration);
139                       }
140                   }
141                 },
142               }
143             },
144             _ = command_timeout.expired() => {
145                 println!("expired {:?}", *context.state_machine.state.lock().await);
146                 let timeout_action = context.state_machine.action_on_command_timeout();
147                 match timeout_action {
148                     StateMachineTimeoutActions::Noop => (),
149                     _ => command_timeout.reset(command_timeout_duration),
150                 }
151             },
152             r = pid_async_fd.readable_mut() => {
153                 let mut fd_ready = r.unwrap();
154                 let mut buffer: [u8; 1024] = [0; 1024];
155                 match fd_ready.try_io(|inner| inner.get_mut().read_events(&mut buffer)) {
156                     Ok(Ok(events)) => {
157                         for event in events {
158                             match event.mask {
159                                 inotify::EventMask::CREATE => {
160                                     if event.name == Some(std::ffi::OsStr::new("bluetooth.pid")) {
161                                         let read_result = tokio::fs::read("/var/run/bluetooth.pid").await;
162                                         match read_result {
163                                             Ok(v) => {
164                                                 let file_string = String::from_utf8(v).expect("invalid pid file");
165                                                 let mut iter = file_string.split_ascii_whitespace();
166                                                 let pid = match iter.next() {
167                                                     Some(s) => s.parse::<i32>().unwrap(),
168                                                     None => 0
169                                                 };
170                                                 let hci = match iter.next() {
171                                                     Some(s) => s.parse::<i32>().unwrap(),
172                                                     None => 0
173                                                 };
174                                                 context.tx.send(StateMachineActions::BluetoothStarted(pid, hci)).await;
175                                             },
176                                             Err(e) => println!("{}", e)
177                                         }
178                                     }
179                                 },
180                                 inotify::EventMask::DELETE => {
181                                     if event.name == Some(std::ffi::OsStr::new("bluetooth.pid")) {
182                                         context.tx.send(StateMachineActions::BluetoothStopped()).await;
183                                       }
184                                   },
185                                 _ => println!("Ignored event {:?}", event.mask)
186                             }
187                         }
188                     }
189                     Err(_) | Ok(Err(_)) => panic!("why can't we read while the asyncfd is ready?"),
190                 }
191                 fd_ready.clear_ready();
192                 drop(fd_ready);
193             },
194         }
195     }
196 }
197 
198 pub trait ProcessManager {
start(&mut self, hci_interface: String)199     fn start(&mut self, hci_interface: String);
stop(&mut self, hci_interface: String)200     fn stop(&mut self, hci_interface: String);
201 }
202 
203 pub struct NativeSubprocess {
204     process_container: Option<Child>,
205 }
206 
207 impl NativeSubprocess {
new() -> NativeSubprocess208     pub fn new() -> NativeSubprocess {
209         NativeSubprocess { process_container: None }
210     }
211 }
212 
213 impl ProcessManager for NativeSubprocess {
start(&mut self, hci_interface: String)214     fn start(&mut self, hci_interface: String) {
215         self.process_container = Some(
216             Command::new("/usr/bin/touch")
217                 .arg("/var/run/bluetooth.pid")
218                 .stdout(Stdio::piped())
219                 .spawn()
220                 .expect("cannot open"),
221         );
222     }
stop(&mut self, hci_interface: String)223     fn stop(&mut self, hci_interface: String) {
224         match self.process_container {
225             Some(ref mut p) => {
226                 // TODO: Maybe just SIGINT first, not kill
227                 p.kill();
228                 self.process_container = None;
229             }
230             None => {
231                 println!("Process doesn't exist");
232             }
233         }
234     }
235 }
236 
237 pub struct UpstartInvoker {
238     // Upstart version not implemented
239 }
240 
241 impl UpstartInvoker {
new() -> UpstartInvoker242     pub fn new() -> UpstartInvoker {
243         UpstartInvoker {}
244     }
245 }
246 
247 impl ProcessManager for UpstartInvoker {
start(&mut self, hci_interface: String)248     fn start(&mut self, hci_interface: String) {
249         Command::new("initctl")
250             .arg("start")
251             .arg("bluetooth")
252             .arg(format!("HCI={}", hci_interface))
253             .output()
254             .expect("failed to start bluetooth");
255     }
256 
stop(&mut self, hci_interface: String)257     fn stop(&mut self, hci_interface: String) {
258         Command::new("initctl")
259             .arg("stop")
260             .arg("bluetooth")
261             .arg(format!("HCI={}", hci_interface))
262             .output()
263             .expect("failed to stop bluetooth");
264     }
265 }
266 
267 struct ManagerStateMachine<PM> {
268     state: Arc<Mutex<State>>,
269     process_manager: PM,
270     state_change_observers: Arc<Mutex<Vec<String>>>,
271     hci_interface: i32,
272     bluetooth_pid: i32,
273 }
274 
275 impl ManagerStateMachine<NativeSubprocess> {
new_native() -> ManagerStateMachine<NativeSubprocess>276     pub fn new_native() -> ManagerStateMachine<NativeSubprocess> {
277         ManagerStateMachine::new(NativeSubprocess::new())
278     }
279 }
280 
281 #[derive(Debug, PartialEq)]
282 enum StateMachineTimeoutActions {
283     RetryStart,
284     RetryStop,
285     Killed,
286     Noop,
287 }
288 
289 impl<PM> ManagerStateMachine<PM>
290 where
291     PM: ProcessManager + Send,
292 {
new(process_manager: PM) -> ManagerStateMachine<PM>293     pub fn new(process_manager: PM) -> ManagerStateMachine<PM> {
294         ManagerStateMachine {
295             state: Arc::new(Mutex::new(State::Off)),
296             process_manager: process_manager,
297             state_change_observers: Arc::new(Mutex::new(Vec::new())),
298             hci_interface: 0,
299             bluetooth_pid: 0,
300         }
301     }
302 
303     /// Returns true if we are starting bluetooth process.
action_start_bluetooth(&mut self, hci_interface: i32) -> bool304     pub fn action_start_bluetooth(&mut self, hci_interface: i32) -> bool {
305         let mut state = self.state.try_lock().unwrap();  // TODO hsz: fix me
306         match *state {
307             State::Off => {
308                 *state = State::TurningOn;
309                 self.hci_interface = hci_interface;
310                 self.process_manager.start(format!("hci{}", hci_interface));
311                 true
312             }
313             // Otherwise no op
314             _ => false,
315         }
316     }
317 
318     /// Returns true if we are stopping bluetooth process.
action_stop_bluetooth(&mut self, hci_interface: i32) -> bool319     pub fn action_stop_bluetooth(&mut self, hci_interface: i32) -> bool {
320         if self.hci_interface != hci_interface {
321             println!("We are running hci{} but attempting to stop hci{}", self.hci_interface, hci_interface);
322             return false
323         }
324 
325         let mut state = self.state.try_lock().unwrap();  // TODO hsz: fix me
326         match *state {
327             State::On | State::TurningOn => {
328                 *state = State::TurningOff;
329                 self.process_manager.stop(self.hci_interface.to_string());
330                 true
331             }
332             // Otherwise no op
333             _ => false,
334         }
335     }
336 
337     /// Returns true if the event is expected.
action_on_bluetooth_started(&mut self, pid: i32, hci_interface: i32) -> bool338     pub fn action_on_bluetooth_started(&mut self, pid: i32, hci_interface: i32) -> bool {
339         let mut state = self.state.try_lock().unwrap();  // TODO hsz: fix me
340         if self.hci_interface != hci_interface {
341             println!("We should start hci{} but hci{} is started; capturing that process", self.hci_interface, hci_interface);
342             self.hci_interface = hci_interface;
343         }
344         if *state != State::TurningOn {
345             println!("Unexpected Bluetooth started");
346         }
347         *state = State::On;
348         self.bluetooth_pid = pid;
349         true
350     }
351 
352     /// Returns true if the event is expected.
353     /// If unexpected, Bluetooth probably crashed;
354     /// start the timer for restart timeout
action_on_bluetooth_stopped(&mut self) -> bool355     pub fn action_on_bluetooth_stopped(&mut self) -> bool {
356         // Need to check if file exists
357         let mut state = self.state.try_lock().unwrap();  // TODO hsz: fix me
358 
359         match *state {
360             State::TurningOff => {
361                 *state = State::Off;
362                 true
363             }
364             State::On => {
365                 println!("Bluetooth stopped unexpectedly, try restarting");
366                 *state = State::TurningOn;
367                 self.process_manager.start(format!("hci{}", self.hci_interface));
368                 false
369             }
370             State::TurningOn | State::Off => {
371                 // Unexpected
372                 panic!("unexpected bluetooth shutdown");
373             }
374         }
375     }
376 
377     /// Triggered on Bluetooth start/stop timeout.  Return the actions that the
378     /// state machine has taken, for the external context to reset the timer.
action_on_command_timeout(&mut self) -> StateMachineTimeoutActions379     pub fn action_on_command_timeout(&mut self) -> StateMachineTimeoutActions {
380         let mut state = self.state.try_lock().unwrap();  // TODO hsz: fix me
381         match *state {
382             State::TurningOn => {
383                 println!("Restarting bluetooth");
384                 *state = State::TurningOn;
385                 self.process_manager.start(format! {"hci{}", self.hci_interface});
386                 StateMachineTimeoutActions::RetryStart
387             }
388             State::TurningOff => {
389                 println!("Killing bluetooth");
390 
391                 *state = State::Off;
392                 StateMachineTimeoutActions::RetryStop
393                 // kill bluetooth
394                 // tx.try_send(StateMachineActions::StopBluetooth());
395             }
396             _ => panic!("Unexpected timeout on {:?}", *state),
397         }
398     }
399 }
400 
401 #[cfg(test)]
402 mod tests {
403     use super::*;
404 
405     #[derive(Debug, PartialEq)]
406     enum ExecutedCommand {
407         Start,
408         Stop,
409     }
410 
411     struct MockProcessManager {
412         last_command: VecDeque<ExecutedCommand>,
413     }
414 
415     impl MockProcessManager {
new() -> MockProcessManager416         fn new() -> MockProcessManager {
417             MockProcessManager { last_command: VecDeque::new() }
418         }
419 
expect_start(&mut self)420         fn expect_start(&mut self) {
421             self.last_command.push_back(ExecutedCommand::Start);
422         }
423 
expect_stop(&mut self)424         fn expect_stop(&mut self) {
425             self.last_command.push_back(ExecutedCommand::Stop);
426         }
427     }
428 
429     impl ProcessManager for MockProcessManager {
start(&mut self, hci_interface: String)430         fn start(&mut self, hci_interface: String) {
431             let start = self.last_command.pop_front().expect("Should expect start event");
432             assert_eq!(start, ExecutedCommand::Start);
433         }
434 
stop(&mut self, hci_interface: String)435         fn stop(&mut self, hci_interface: String) {
436             let stop = self.last_command.pop_front().expect("Should expect stop event");
437             assert_eq!(stop, ExecutedCommand::Stop);
438         }
439     }
440 
441     impl Drop for MockProcessManager {
drop(&mut self)442         fn drop(&mut self) {
443             assert_eq!(self.last_command.len(), 0);
444         }
445     }
446 
447     #[test]
initial_state_is_off()448     fn initial_state_is_off() {
449         let process_manager = MockProcessManager::new();
450         let state_machine = ManagerStateMachine::new(process_manager);
451         assert_eq!(*state_machine.state.try_lock().unwrap(), State::Off);
452     }
453 
454     #[test]
off_turnoff_should_noop()455     fn off_turnoff_should_noop() {
456         let process_manager = MockProcessManager::new();
457         let mut state_machine = ManagerStateMachine::new(process_manager);
458         state_machine.action_stop_bluetooth(0);
459         assert_eq!(*state_machine.state.try_lock().unwrap(), State::Off);
460     }
461 
462     #[test]
off_turnon_should_turningon()463     fn off_turnon_should_turningon() {
464         let mut process_manager = MockProcessManager::new();
465         // Expect to send start command
466         process_manager.expect_start();
467         let mut state_machine = ManagerStateMachine::new(process_manager);
468         state_machine.action_start_bluetooth(0);
469         assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOn);
470     }
471 
472     #[test]
turningon_turnon_again_noop()473     fn turningon_turnon_again_noop() {
474         let mut process_manager = MockProcessManager::new();
475         // Expect to send start command just once
476         process_manager.expect_start();
477         let mut state_machine = ManagerStateMachine::new(process_manager);
478         state_machine.action_start_bluetooth(0);
479         assert_eq!(state_machine.action_start_bluetooth(0), false);
480     }
481 
482     #[test]
turningon_bluetooth_started()483     fn turningon_bluetooth_started() {
484         let mut process_manager = MockProcessManager::new();
485         process_manager.expect_start();
486         let mut state_machine = ManagerStateMachine::new(process_manager);
487         state_machine.action_start_bluetooth(0);
488         state_machine.action_on_bluetooth_started(0, 0);
489         assert_eq!(*state_machine.state.try_lock().unwrap(), State::On);
490     }
491 
492     #[test]
turningon_timeout()493     fn turningon_timeout() {
494         let mut process_manager = MockProcessManager::new();
495         process_manager.expect_start();
496         process_manager.expect_start(); // start bluetooth again
497         let mut state_machine = ManagerStateMachine::new(process_manager);
498         state_machine.action_start_bluetooth(0);
499         assert_eq!(
500             state_machine.action_on_command_timeout(),
501             StateMachineTimeoutActions::RetryStart
502         );
503         assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOn);
504     }
505 
506     #[test]
turningon_turnoff_should_turningoff_and_send_command()507     fn turningon_turnoff_should_turningoff_and_send_command() {
508         let mut process_manager = MockProcessManager::new();
509         process_manager.expect_start();
510         // Expect to send stop command
511         process_manager.expect_stop();
512         let mut state_machine = ManagerStateMachine::new(process_manager);
513         state_machine.action_start_bluetooth(0);
514         state_machine.action_stop_bluetooth(0);
515         assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOff);
516     }
517 
518     #[test]
on_turnoff_should_turningoff_and_send_command()519     fn on_turnoff_should_turningoff_and_send_command() {
520         let mut process_manager = MockProcessManager::new();
521         process_manager.expect_start();
522         // Expect to send stop command
523         process_manager.expect_stop();
524         let mut state_machine = ManagerStateMachine::new(process_manager);
525         state_machine.action_start_bluetooth(0);
526         state_machine.action_on_bluetooth_started(0, 0);
527         state_machine.action_stop_bluetooth(0);
528         assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOff);
529     }
530 
531     #[test]
on_bluetooth_stopped()532     fn on_bluetooth_stopped() {
533         let mut process_manager = MockProcessManager::new();
534         process_manager.expect_start();
535         // Expect to start again
536         process_manager.expect_start();
537         let mut state_machine = ManagerStateMachine::new(process_manager);
538         state_machine.action_start_bluetooth(0);
539         state_machine.action_on_bluetooth_started(0, 0);
540         assert_eq!(state_machine.action_on_bluetooth_stopped(), false);
541         assert_eq!(*state_machine.state.try_lock().unwrap(), State::TurningOn);
542     }
543 
544     #[test]
turningoff_bluetooth_down_should_off()545     fn turningoff_bluetooth_down_should_off() {
546         let mut process_manager = MockProcessManager::new();
547         process_manager.expect_start();
548         process_manager.expect_stop();
549         let mut state_machine = ManagerStateMachine::new(process_manager);
550         state_machine.action_start_bluetooth(0);
551         state_machine.action_on_bluetooth_started(0, 0);
552         state_machine.action_stop_bluetooth(0);
553         state_machine.action_on_bluetooth_stopped();
554         assert_eq!(*state_machine.state.try_lock().unwrap(), State::Off);
555     }
556 
557     #[test]
restart_bluetooth()558     fn restart_bluetooth() {
559         let mut process_manager = MockProcessManager::new();
560         process_manager.expect_start();
561         process_manager.expect_stop();
562         process_manager.expect_start();
563         let mut state_machine = ManagerStateMachine::new(process_manager);
564         state_machine.action_start_bluetooth(0);
565         state_machine.action_on_bluetooth_started(0, 0);
566         state_machine.action_stop_bluetooth(0);
567         state_machine.action_on_bluetooth_stopped();
568         state_machine.action_start_bluetooth(0);
569         state_machine.action_on_bluetooth_started(0, 0);
570         assert_eq!(*state_machine.state.try_lock().unwrap(), State::On);
571     }
572 }
573