1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::{hash_map, HashMap};
15 
16 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
17 use ylong_runtime::sync::oneshot::Sender;
18 
19 use super::{Client, ClientEvent};
20 
21 cfg_oh! {
22     use crate::ability::PANIC_INFO;
23 }
24 use crate::error::ErrorCode;
25 use crate::utils::runtime_spawn;
26 
27 #[derive(Clone)]
28 pub(crate) struct ClientManagerEntry {
29     tx: UnboundedSender<ClientEvent>,
30 }
31 
32 impl ClientManagerEntry {
new(tx: UnboundedSender<ClientEvent>) -> Self33     pub(crate) fn new(tx: UnboundedSender<ClientEvent>) -> Self {
34         Self { tx }
35     }
36 
send_event(&self, event: ClientEvent) -> bool37     pub(crate) fn send_event(&self, event: ClientEvent) -> bool {
38         if self.tx.send(event).is_err() {
39             #[cfg(feature = "oh")]
40             unsafe {
41                 if let Some(e) = PANIC_INFO.as_ref() {
42                     error!("Sends ClientManager event failed {}", e);
43                 } else {
44                     info!("ClientManager is unloading");
45                 }
46             }
47             return false;
48         }
49         true
50     }
51 }
52 pub(crate) struct ClientManager {
53     // map from pid to client and fd
54     clients: HashMap<u64, (UnboundedSender<ClientEvent>, i32)>,
55     pid_map: HashMap<u32, u64>,
56     rx: UnboundedReceiver<ClientEvent>,
57 }
58 
59 impl ClientManager {
init() -> ClientManagerEntry60     pub(crate) fn init() -> ClientManagerEntry {
61         debug!("ClientManager init");
62         let (tx, rx) = unbounded_channel();
63         let client_manager = ClientManager {
64             clients: HashMap::new(),
65             pid_map: HashMap::new(),
66             rx,
67         };
68         runtime_spawn(client_manager.run());
69         ClientManagerEntry::new(tx)
70     }
71 
run(mut self)72     async fn run(mut self) {
73         loop {
74             let recv = match self.rx.recv().await {
75                 Ok(message) => message,
76                 Err(e) => {
77                     error!("ClientManager recv error {:?}", e);
78                     continue;
79                 }
80             };
81 
82             match recv {
83                 ClientEvent::OpenChannel(pid, tx) => self.handle_open_channel(pid, tx),
84                 ClientEvent::Subscribe(tid, pid, uid, token_id, tx) => {
85                     self.handle_subscribe(tid, pid, uid, token_id, tx)
86                 }
87                 ClientEvent::Unsubscribe(tid, tx) => self.handle_unsubscribe(tid, tx),
88                 ClientEvent::TaskFinished(tid) => self.handle_task_finished(tid),
89                 ClientEvent::Terminate(pid, tx) => self.handle_process_terminated(pid, tx),
90                 ClientEvent::SendResponse(tid, version, status_code, reason, headers) => {
91                     if let Some(&pid) = self.pid_map.get(&tid) {
92                         if let Some((tx, _fd)) = self.clients.get_mut(&pid) {
93                             if let Err(err) = tx.send(ClientEvent::SendResponse(
94                                 tid,
95                                 version,
96                                 status_code,
97                                 reason,
98                                 headers,
99                             )) {
100                                 error!("send response error, {}", err);
101                             }
102                         } else {
103                             debug!("response client not found");
104                         }
105                     } else {
106                         debug!("response pid not found");
107                     }
108                 }
109                 ClientEvent::SendNotifyData(subscribe_type, notify_data) => {
110                     if let Some(&pid) = self.pid_map.get(&(notify_data.task_id)) {
111                         if let Some((tx, _fd)) = self.clients.get_mut(&pid) {
112                             if let Err(err) =
113                                 tx.send(ClientEvent::SendNotifyData(subscribe_type, notify_data))
114                             {
115                                 error!("send notify data error, {}", err);
116                             }
117                         } else {
118                             debug!("response client not found");
119                         }
120                     } else {
121                         debug!("notify data pid not found");
122                     }
123                 }
124                 _ => {}
125             }
126 
127             debug!("ClientManager handle message done");
128         }
129     }
130 
handle_open_channel(&mut self, pid: u64, tx: Sender<Result<i32, ErrorCode>>)131     fn handle_open_channel(&mut self, pid: u64, tx: Sender<Result<i32, ErrorCode>>) {
132         match self.clients.entry(pid) {
133             hash_map::Entry::Occupied(o) => {
134                 let (_, fd) = o.get();
135                 let _ = tx.send(Ok(*fd));
136             }
137             hash_map::Entry::Vacant(v) => match Client::constructor(pid) {
138                 Some((client, fd)) => {
139                     let _ = tx.send(Ok(fd));
140                     v.insert((client, fd));
141                 }
142                 None => {
143                     let _ = tx.send(Err(ErrorCode::Other));
144                 }
145             },
146         }
147     }
148 
handle_subscribe( &mut self, tid: u32, pid: u64, _uid: u64, _token_id: u64, tx: Sender<ErrorCode>, )149     fn handle_subscribe(
150         &mut self,
151         tid: u32,
152         pid: u64,
153         _uid: u64,
154         _token_id: u64,
155         tx: Sender<ErrorCode>,
156     ) {
157         if let Some(_client) = self.clients.get_mut(&pid) {
158             self.pid_map.insert(tid, pid);
159             let _ = tx.send(ErrorCode::ErrOk);
160         } else {
161             info!("channel not open, pid {}", pid);
162             let _ = tx.send(ErrorCode::ChannelNotOpen);
163         }
164     }
165 
handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>)166     fn handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>) {
167         if let Some(&pid) = self.pid_map.get(&tid) {
168             self.pid_map.remove(&tid);
169             if let Some(_client) = self.clients.get_mut(&pid) {
170                 let _ = tx.send(ErrorCode::ErrOk);
171                 return;
172             } else {
173                 debug!("client not found");
174             }
175         } else {
176             debug!("unsubscribe tid not found");
177         }
178         let _ = tx.send(ErrorCode::Other);
179     }
180 
handle_task_finished(&mut self, tid: u32)181     fn handle_task_finished(&mut self, tid: u32) {
182         if self.pid_map.contains_key(&tid) {
183             self.pid_map.remove(&tid);
184             debug!("unsubscribe tid {:?}", tid);
185         } else {
186             debug!("unsubscribe tid not found");
187         }
188     }
189 
handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>)190     fn handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>) {
191         if let Some((tx, _)) = self.clients.get_mut(&pid) {
192             let _ = tx.send(ClientEvent::Shutdown);
193             self.clients.remove(&pid);
194         } else {
195             debug!("terminate pid not found");
196         }
197         let _ = tx.send(ErrorCode::ErrOk);
198     }
199 }
200