1 //
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 //! ProfCollect tracing scheduler.
18 
19 use std::fs;
20 use std::path::Path;
21 use std::sync::mpsc::{sync_channel, SyncSender};
22 use std::sync::Arc;
23 use std::sync::Mutex;
24 use std::thread;
25 
26 use crate::config::{Config, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
27 use crate::trace_provider::{self, TraceProvider};
28 use anyhow::{anyhow, ensure, Context, Result};
29 
30 pub struct Scheduler {
31     /// Signal to terminate the periodic collection worker thread, None if periodic collection is
32     /// not scheduled.
33     termination_ch: Option<SyncSender<()>>,
34     /// The preferred trace provider for the system.
35     trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
36 }
37 
38 impl Scheduler {
new() -> Result<Self>39     pub fn new() -> Result<Self> {
40         let p = trace_provider::get_trace_provider()?;
41         Ok(Scheduler { termination_ch: None, trace_provider: p })
42     }
43 
is_scheduled(&self) -> bool44     fn is_scheduled(&self) -> bool {
45         self.termination_ch.is_some()
46     }
47 
schedule_periodic(&mut self, config: &Config) -> Result<()>48     pub fn schedule_periodic(&mut self, config: &Config) -> Result<()> {
49         ensure!(!self.is_scheduled(), "Already scheduled.");
50 
51         let (sender, receiver) = sync_channel(1);
52         self.termination_ch = Some(sender);
53 
54         // Clone config and trace_provider ARC for the worker thread.
55         let config = config.clone();
56         let trace_provider = self.trace_provider.clone();
57 
58         thread::spawn(move || {
59             loop {
60                 match receiver.recv_timeout(config.collection_interval) {
61                     Ok(_) => break,
62                     Err(_) => {
63                         // Did not receive a termination signal, initiate trace event.
64                         if check_space_limit(*TRACE_OUTPUT_DIR, &config).unwrap() {
65                             trace_provider.lock().unwrap().trace(
66                                 &TRACE_OUTPUT_DIR,
67                                 "periodic",
68                                 &config.sampling_period,
69                             );
70                         }
71                     }
72                 }
73             }
74         });
75         Ok(())
76     }
77 
terminate_periodic(&mut self) -> Result<()>78     pub fn terminate_periodic(&mut self) -> Result<()> {
79         self.termination_ch
80             .as_ref()
81             .ok_or_else(|| anyhow!("Not scheduled"))?
82             .send(())
83             .context("Scheduler worker disappeared.")?;
84         self.termination_ch = None;
85         Ok(())
86     }
87 
one_shot(&self, config: &Config, tag: &str) -> Result<()>88     pub fn one_shot(&self, config: &Config, tag: &str) -> Result<()> {
89         let trace_provider = self.trace_provider.clone();
90         if check_space_limit(*TRACE_OUTPUT_DIR, config)? {
91             trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period);
92         }
93         Ok(())
94     }
95 
process(&self, blocking: bool) -> Result<()>96     pub fn process(&self, blocking: bool) -> Result<()> {
97         let trace_provider = self.trace_provider.clone();
98         let handle = thread::spawn(move || {
99             trace_provider
100                 .lock()
101                 .unwrap()
102                 .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR)
103                 .expect("Failed to process profiles.");
104         });
105         if blocking {
106             handle.join().map_err(|_| anyhow!("Profile process thread panicked."))?;
107         }
108         Ok(())
109     }
110 
get_trace_provider_name(&self) -> &'static str111     pub fn get_trace_provider_name(&self) -> &'static str {
112         self.trace_provider.lock().unwrap().get_name()
113     }
114 }
115 
116 /// Run if space usage is under limit.
check_space_limit(path: &Path, config: &Config) -> Result<bool>117 fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
118     let ret = dir_size(path)? <= config.max_trace_limit;
119     if !ret {
120         log::error!("trace storage exhausted.");
121     }
122     Ok(ret)
123 }
124 
125 /// Returns the size of a directory, non-recursive.
dir_size(path: &Path) -> Result<u64>126 fn dir_size(path: &Path) -> Result<u64> {
127     fs::read_dir(path)?.try_fold(0, |acc, file| {
128         let metadata = file?.metadata()?;
129         let size = if metadata.is_file() { metadata.len() } else { 0 };
130         Ok(acc + size)
131     })
132 }
133