1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::{hash_map, HashMap}; 15 16 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; 17 use ylong_runtime::sync::oneshot::Sender; 18 19 use super::{Client, ClientEvent}; 20 21 cfg_oh! { 22 use crate::ability::PANIC_INFO; 23 } 24 use crate::error::ErrorCode; 25 use crate::utils::runtime_spawn; 26 27 #[derive(Clone)] 28 pub(crate) struct ClientManagerEntry { 29 tx: UnboundedSender<ClientEvent>, 30 } 31 32 impl ClientManagerEntry { new(tx: UnboundedSender<ClientEvent>) -> Self33 pub(crate) fn new(tx: UnboundedSender<ClientEvent>) -> Self { 34 Self { tx } 35 } 36 send_event(&self, event: ClientEvent) -> bool37 pub(crate) fn send_event(&self, event: ClientEvent) -> bool { 38 if self.tx.send(event).is_err() { 39 #[cfg(feature = "oh")] 40 unsafe { 41 if let Some(e) = PANIC_INFO.as_ref() { 42 error!("Sends ClientManager event failed {}", e); 43 } else { 44 info!("ClientManager is unloading"); 45 } 46 } 47 return false; 48 } 49 true 50 } 51 } 52 pub(crate) struct ClientManager { 53 // map from pid to client and fd 54 clients: HashMap<u64, (UnboundedSender<ClientEvent>, i32)>, 55 pid_map: HashMap<u32, u64>, 56 rx: UnboundedReceiver<ClientEvent>, 57 } 58 59 impl ClientManager { init() -> ClientManagerEntry60 pub(crate) fn init() -> ClientManagerEntry { 61 debug!("ClientManager init"); 62 let (tx, rx) = unbounded_channel(); 63 let client_manager = ClientManager { 64 clients: HashMap::new(), 65 pid_map: HashMap::new(), 66 rx, 67 }; 68 runtime_spawn(client_manager.run()); 69 ClientManagerEntry::new(tx) 70 } 71 run(mut self)72 async fn run(mut self) { 73 loop { 74 let recv = match self.rx.recv().await { 75 Ok(message) => message, 76 Err(e) => { 77 error!("ClientManager recv error {:?}", e); 78 continue; 79 } 80 }; 81 82 match recv { 83 ClientEvent::OpenChannel(pid, tx) => self.handle_open_channel(pid, tx), 84 ClientEvent::Subscribe(tid, pid, uid, token_id, tx) => { 85 self.handle_subscribe(tid, pid, uid, token_id, tx) 86 } 87 ClientEvent::Unsubscribe(tid, tx) => self.handle_unsubscribe(tid, tx), 88 ClientEvent::TaskFinished(tid) => self.handle_task_finished(tid), 89 ClientEvent::Terminate(pid, tx) => self.handle_process_terminated(pid, tx), 90 ClientEvent::SendResponse(tid, version, status_code, reason, headers) => { 91 if let Some(&pid) = self.pid_map.get(&tid) { 92 if let Some((tx, _fd)) = self.clients.get_mut(&pid) { 93 if let Err(err) = tx.send(ClientEvent::SendResponse( 94 tid, 95 version, 96 status_code, 97 reason, 98 headers, 99 )) { 100 error!("send response error, {}", err); 101 } 102 } else { 103 debug!("response client not found"); 104 } 105 } else { 106 debug!("response pid not found"); 107 } 108 } 109 ClientEvent::SendNotifyData(subscribe_type, notify_data) => { 110 if let Some(&pid) = self.pid_map.get(&(notify_data.task_id)) { 111 if let Some((tx, _fd)) = self.clients.get_mut(&pid) { 112 if let Err(err) = 113 tx.send(ClientEvent::SendNotifyData(subscribe_type, notify_data)) 114 { 115 error!("send notify data error, {}", err); 116 } 117 } else { 118 debug!("response client not found"); 119 } 120 } else { 121 debug!("notify data pid not found"); 122 } 123 } 124 _ => {} 125 } 126 127 debug!("ClientManager handle message done"); 128 } 129 } 130 handle_open_channel(&mut self, pid: u64, tx: Sender<Result<i32, ErrorCode>>)131 fn handle_open_channel(&mut self, pid: u64, tx: Sender<Result<i32, ErrorCode>>) { 132 match self.clients.entry(pid) { 133 hash_map::Entry::Occupied(o) => { 134 let (_, fd) = o.get(); 135 let _ = tx.send(Ok(*fd)); 136 } 137 hash_map::Entry::Vacant(v) => match Client::constructor(pid) { 138 Some((client, fd)) => { 139 let _ = tx.send(Ok(fd)); 140 v.insert((client, fd)); 141 } 142 None => { 143 let _ = tx.send(Err(ErrorCode::Other)); 144 } 145 }, 146 } 147 } 148 handle_subscribe( &mut self, tid: u32, pid: u64, _uid: u64, _token_id: u64, tx: Sender<ErrorCode>, )149 fn handle_subscribe( 150 &mut self, 151 tid: u32, 152 pid: u64, 153 _uid: u64, 154 _token_id: u64, 155 tx: Sender<ErrorCode>, 156 ) { 157 if let Some(_client) = self.clients.get_mut(&pid) { 158 self.pid_map.insert(tid, pid); 159 let _ = tx.send(ErrorCode::ErrOk); 160 } else { 161 info!("channel not open, pid {}", pid); 162 let _ = tx.send(ErrorCode::ChannelNotOpen); 163 } 164 } 165 handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>)166 fn handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>) { 167 if let Some(&pid) = self.pid_map.get(&tid) { 168 self.pid_map.remove(&tid); 169 if let Some(_client) = self.clients.get_mut(&pid) { 170 let _ = tx.send(ErrorCode::ErrOk); 171 return; 172 } else { 173 debug!("client not found"); 174 } 175 } else { 176 debug!("unsubscribe tid not found"); 177 } 178 let _ = tx.send(ErrorCode::Other); 179 } 180 handle_task_finished(&mut self, tid: u32)181 fn handle_task_finished(&mut self, tid: u32) { 182 if self.pid_map.contains_key(&tid) { 183 self.pid_map.remove(&tid); 184 debug!("unsubscribe tid {:?}", tid); 185 } else { 186 debug!("unsubscribe tid not found"); 187 } 188 } 189 handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>)190 fn handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>) { 191 if let Some((tx, _)) = self.clients.get_mut(&pid) { 192 let _ = tx.send(ClientEvent::Shutdown); 193 self.clients.remove(&pid); 194 } else { 195 debug!("terminate pid not found"); 196 } 197 let _ = tx.send(ErrorCode::ErrOk); 198 } 199 } 200