1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use crate::info::State;
15 use crate::service::client::ClientManagerEntry;
16 use crate::task::notify::{NotifyData, SubscribeType};
17 
18 pub(crate) struct Notifier;
19 
20 impl Notifier {
complete(client_manager: &ClientManagerEntry, notify_data: NotifyData)21     pub(crate) fn complete(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
22         #[cfg(feature = "oh")]
23         let _ = publish_state_change_event(
24             notify_data.bundle.as_str(),
25             notify_data.task_id,
26             State::Completed.repr as i32,
27             notify_data.uid,
28         );
29         client_manager.send_notify_data(SubscribeType::Complete, notify_data)
30     }
31 
fail(client_manager: &ClientManagerEntry, notify_data: NotifyData)32     pub(crate) fn fail(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
33         #[cfg(feature = "oh")]
34         let _ = publish_state_change_event(
35             notify_data.bundle.as_str(),
36             notify_data.task_id,
37             State::Failed.repr as i32,
38             notify_data.uid,
39         );
40         client_manager.send_notify_data(SubscribeType::Fail, notify_data)
41     }
42 
pause(client_manager: &ClientManagerEntry, notify_data: NotifyData)43     pub(crate) fn pause(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
44         client_manager.send_notify_data(SubscribeType::Pause, notify_data)
45     }
46 
resume(client_manager: &ClientManagerEntry, notify_data: NotifyData)47     pub(crate) fn resume(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
48         client_manager.send_notify_data(SubscribeType::Resume, notify_data)
49     }
50 
header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData)51     pub(crate) fn header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
52         client_manager.send_notify_data(SubscribeType::HeaderReceive, notify_data)
53     }
54 
progress(client_manager: &ClientManagerEntry, notify_data: NotifyData)55     pub(crate) fn progress(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
56         let total_processed = notify_data.progress.common_data.total_processed;
57         let file_total_size: i64 = notify_data.progress.sizes.iter().sum();
58         if total_processed == 0 && file_total_size < 0 {
59             return;
60         }
61         client_manager.send_notify_data(SubscribeType::Progress, notify_data)
62     }
63 
remove(client_manager: &ClientManagerEntry, notify_data: NotifyData)64     pub(crate) fn remove(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
65         let task_id = notify_data.task_id;
66         client_manager.send_notify_data(SubscribeType::Remove, notify_data);
67         client_manager.notify_task_finished(task_id);
68     }
69 }
70 
71 #[cfg(feature = "oh")]
publish_state_change_event( bundle_name: &str, task_id: u32, state: i32, uid: u64, ) -> Result<(), ()>72 pub(crate) fn publish_state_change_event(
73     bundle_name: &str,
74     task_id: u32,
75     state: i32,
76     uid: u64,
77 ) -> Result<(), ()> {
78     match crate::utils::PublishStateChangeEvent(bundle_name, task_id, state, uid as i32) {
79         true => Ok(()),
80         false => Err(()),
81     }
82 }
83 #[allow(unused)]
84 #[cfg(test)]
85 mod test {
86     use std::fs::File;
87     use std::sync::Arc;
88     use std::time::Duration;
89 
90     use cxx::UniquePtr;
91     use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver};
92 
93     use crate::config::{Action, ConfigBuilder, Mode};
94     use crate::error::ErrorCode;
95     use crate::info::{State, TaskInfo};
96     use crate::manage::database::RequestDb;
97     use crate::manage::events::{TaskEvent, TaskManagerEvent};
98     use crate::manage::network::{Network, NetworkInfo, NetworkInner, NetworkState, NetworkType};
99     use crate::manage::network_manager::NetworkManager;
100     use crate::manage::task_manager::{TaskManagerRx, TaskManagerTx};
101     use crate::manage::TaskManager;
102     use crate::service::client::{ClientEvent, ClientManager, ClientManagerEntry};
103     use crate::service::run_count::RunCountManagerEntry;
104     use crate::task::notify::SubscribeType;
105     use crate::task::reason::Reason;
106     use crate::tests::{lock_database, test_init};
107 
108     const GITEE_FILE_LEN: usize = 1042003;
109 
init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>)110     fn init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>) {
111         let (tx, rx) = unbounded_channel();
112         let task_manager_tx = TaskManagerTx::new(tx);
113         let rx = TaskManagerRx::new(rx);
114         {
115             let network_manager = NetworkManager::get_instance().lock().unwrap();
116             let notifier = network_manager.network.inner.clone();
117             notifier.notify_online(NetworkInfo {
118                 network_type: NetworkType::Wifi,
119                 is_metered: false,
120                 is_roaming: false,
121             });
122         }
123         let (tx, _rx) = unbounded_channel();
124         let run_count = RunCountManagerEntry::new(tx);
125         let (tx, client_rx) = unbounded_channel();
126         let client = ClientManagerEntry::new(tx);
127         (
128             TaskManager::new(task_manager_tx, rx, run_count, client),
129             client_rx,
130         )
131     }
132 
133     #[cfg(feature = "oh")]
134     #[test]
ut_network()135     fn ut_network() {
136         test_init();
137         let notifier;
138         {
139             let network_manager = NetworkManager::get_instance().lock().unwrap();
140             notifier = network_manager.network.inner.clone();
141         }
142 
143         notifier.notify_online(NetworkInfo {
144             network_type: NetworkType::Wifi,
145             is_metered: false,
146             is_roaming: false,
147         });
148         assert!(NetworkManager::is_online());
149         assert_eq!(
150             NetworkManager::query_network(),
151             NetworkState::Online(NetworkInfo {
152                 network_type: NetworkType::Wifi,
153                 is_metered: false,
154                 is_roaming: false,
155             })
156         );
157         notifier.notify_offline();
158         assert!(!NetworkManager::is_online());
159         notifier.notify_online(NetworkInfo {
160             network_type: NetworkType::Cellular,
161             is_metered: true,
162             is_roaming: true,
163         });
164         assert!(NetworkManager::is_online());
165         assert_eq!(
166             NetworkManager::query_network(),
167             NetworkState::Online(NetworkInfo {
168                 network_type: NetworkType::Cellular,
169                 is_metered: true,
170                 is_roaming: true,
171             })
172         );
173     }
174 
175     #[cfg(feature = "oh")]
176     #[test]
ut_network_notify()177     fn ut_network_notify() {
178         test_init();
179         let notifier = NetworkInner::new();
180         notifier.notify_offline();
181         assert!(notifier.notify_online(NetworkInfo {
182             network_type: NetworkType::Wifi,
183             is_metered: true,
184             is_roaming: true,
185         }));
186         assert!(!notifier.notify_online(NetworkInfo {
187             network_type: NetworkType::Wifi,
188             is_metered: true,
189             is_roaming: true,
190         }));
191         assert!(notifier.notify_online(NetworkInfo {
192             network_type: NetworkType::Wifi,
193             is_metered: false,
194             is_roaming: true,
195         }));
196         assert!(notifier.notify_online(NetworkInfo {
197             network_type: NetworkType::Cellular,
198             is_metered: false,
199             is_roaming: true,
200         }));
201     }
202 
203     #[test]
ut_notify_progress()204     fn ut_notify_progress() {
205         test_init();
206         let _lock = lock_database();
207         let (mut manager, mut client_rx) = init_manager();
208 
209         let file_path = "test_files/ut_notify_completed.txt";
210 
211         let file = File::create(file_path).unwrap();
212         let config = ConfigBuilder::new()
213         .action(Action::Download)
214         .retry(true)
215         .mode(Mode::BackGround)
216         .file_spec(file)
217         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
218         .redirect(true)
219         .build();
220         let uid = config.common_data.uid;
221         let task_id = manager.create(config).unwrap();
222         manager.start(uid, task_id);
223         manager.scheduler.reschedule();
224         ylong_runtime::block_on(async {
225             let info = client_rx.recv().await.unwrap();
226             let ClientEvent::SendResponse(tid, version, status_code, reason, headers) = info else {
227                 panic!("unexpected event: {:?}", info);
228             };
229             assert_eq!(tid, task_id);
230             assert_eq!(version, "HTTP/1.1");
231             assert_eq!(status_code, 200);
232             assert_eq!(reason, "OK");
233             assert!(!headers.is_empty());
234             loop {
235                 let info = client_rx.recv().await.unwrap();
236                 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
237                     panic!("unexpected event: {:?}", info);
238                 };
239                 let mut previous = 0;
240                 assert_eq!(subscribe_type, SubscribeType::Progress);
241                 assert_eq!(data.task_id, task_id);
242                 assert!(!data.progress.extras.is_empty());
243                 assert_eq!(data.progress.common_data.state, State::Running.repr);
244                 assert_eq!(data.progress.common_data.index, 0);
245                 assert_eq!(
246                     data.progress.processed[0],
247                     data.progress.common_data.total_processed
248                 );
249 
250                 assert!(data.progress.common_data.total_processed >= previous);
251                 previous = data.progress.common_data.total_processed;
252                 if data.progress.common_data.total_processed == GITEE_FILE_LEN {
253                     break;
254                 }
255             }
256         })
257     }
258 
259     #[test]
ut_notify_pause_resume()260     fn ut_notify_pause_resume() {
261         test_init();
262         let _lock = lock_database();
263         let (mut manager, mut client_rx) = init_manager();
264 
265         let file_path = "test_files/ut_notify";
266 
267         let file = File::create(file_path).unwrap();
268         let config = ConfigBuilder::new()
269         .action(Action::Download)
270         .retry(true)
271         .mode(Mode::BackGround)
272         .file_spec(file)
273         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
274         .redirect(true)
275         .build();
276         let uid = config.common_data.uid;
277         let task_id = manager.create(config).unwrap();
278         manager.start(uid, task_id);
279         manager.pause(uid, task_id);
280         manager.resume(uid, task_id);
281         ylong_runtime::block_on(async {
282             let info = client_rx.recv().await.unwrap();
283             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
284                 panic!("unexpected event: {:?}", info);
285             };
286             assert_eq!(subscribe_type, SubscribeType::Pause);
287             assert!(data.progress.extras.is_empty());
288             assert_eq!(data.progress.common_data.state, State::Paused.repr);
289             assert_eq!(data.progress.common_data.index, 0);
290             assert_eq!(
291                 data.progress.processed[0],
292                 data.progress.common_data.total_processed
293             );
294             assert_eq!(data.progress.common_data.total_processed, 0);
295             let info = client_rx.recv().await.unwrap();
296             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
297                 panic!("unexpected event: {:?}", info);
298             };
299             assert_eq!(subscribe_type, SubscribeType::Resume);
300             assert!(data.progress.extras.is_empty());
301             assert_eq!(data.progress.common_data.state, State::Waiting.repr);
302             assert_eq!(data.progress.common_data.index, 0);
303             assert_eq!(
304                 data.progress.processed[0],
305                 data.progress.common_data.total_processed
306             );
307             assert_eq!(data.progress.common_data.total_processed, 0);
308         })
309     }
310 
311     #[test]
ut_notify_remove()312     fn ut_notify_remove() {
313         test_init();
314         let _lock = lock_database();
315         let (mut manager, mut client_rx) = init_manager();
316 
317         let file_path = "test_files/ut_notify";
318 
319         let file = File::create(file_path).unwrap();
320         let config = ConfigBuilder::new()
321         .action(Action::Download)
322         .retry(true)
323         .mode(Mode::BackGround)
324         .file_spec(file)
325         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
326         .redirect(true)
327         .build();
328         let uid = config.common_data.uid;
329         let task_id = manager.create(config).unwrap();
330         manager.remove(uid, task_id);
331         ylong_runtime::block_on(async {
332             let info = client_rx.recv().await.unwrap();
333             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
334                 panic!("unexpected event: {:?}", info);
335             };
336             assert_eq!(subscribe_type, SubscribeType::Remove);
337             assert!(data.progress.extras.is_empty());
338             assert_eq!(data.progress.common_data.state, State::Removed.repr);
339             assert_eq!(data.progress.common_data.index, 0);
340             assert_eq!(
341                 data.progress.processed[0],
342                 data.progress.common_data.total_processed
343             );
344             assert_eq!(data.progress.common_data.total_processed, 0);
345         })
346     }
347 
348     #[test]
ut_notify_completed()349     fn ut_notify_completed() {
350         test_init();
351         let _lock = lock_database();
352         let (mut manager, mut client_rx) = init_manager();
353 
354         let file_path = "test_files/ut_notify";
355 
356         let file = File::create(file_path).unwrap();
357         let config = ConfigBuilder::new()
358         .action(Action::Download)
359         .retry(true)
360         .mode(Mode::BackGround)
361         .file_spec(file)
362         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
363         .redirect(true)
364         .build();
365         let uid = config.common_data.uid;
366         let task_id = manager.create(config).unwrap();
367         manager.start(uid, task_id);
368         manager.scheduler.task_completed(uid, task_id);
369         ylong_runtime::block_on(async {
370             let info = client_rx.recv().await.unwrap();
371             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
372                 panic!("unexpected event: {:?}", info);
373             };
374             assert_eq!(subscribe_type, SubscribeType::Complete);
375             assert!(data.progress.extras.is_empty());
376             assert_eq!(data.progress.common_data.state, State::Completed.repr);
377             assert_eq!(data.progress.common_data.index, 0);
378             assert_eq!(
379                 data.progress.processed[0],
380                 data.progress.common_data.total_processed
381             );
382             assert_eq!(data.progress.common_data.total_processed, 0);
383         })
384     }
385 
386     #[test]
ut_notify_failed()387     fn ut_notify_failed() {
388         test_init();
389         let _lock = lock_database();
390         let (mut manager, mut client_rx) = init_manager();
391 
392         let file_path = "test_files/ut_notify";
393 
394         let file = File::create(file_path).unwrap();
395         let config = ConfigBuilder::new()
396         .action(Action::Download)
397         .retry(true)
398         .mode(Mode::BackGround)
399         .file_spec(file)
400         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
401         .redirect(true)
402         .build();
403         let uid = config.common_data.uid;
404         let task_id = manager.create(config).unwrap();
405         manager.start(uid, task_id);
406         manager.scheduler.task_failed(uid, task_id, Reason::IoError);
407         ylong_runtime::block_on(async {
408             let info = client_rx.recv().await.unwrap();
409             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
410                 panic!("unexpected event: {:?}", info);
411             };
412             assert_eq!(subscribe_type, SubscribeType::Fail);
413             assert!(data.progress.extras.is_empty());
414             assert_eq!(data.progress.common_data.state, State::Failed.repr);
415             assert_eq!(data.progress.common_data.index, 0);
416             assert_eq!(
417                 data.progress.processed[0],
418                 data.progress.common_data.total_processed
419             );
420             assert_eq!(data.progress.common_data.total_processed, 0);
421         })
422     }
423 
424     #[test]
ut_notify_pause_resume_completed()425     fn ut_notify_pause_resume_completed() {
426         test_init();
427         let _lock = lock_database();
428         let (mut manager, mut client_rx) = init_manager();
429 
430         let file_path = "test_files/ut_notify";
431 
432         let file = File::create(file_path).unwrap();
433         let config = ConfigBuilder::new()
434         .action(Action::Download)
435         .retry(true)
436         .mode(Mode::BackGround)
437         .file_spec(file)
438         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
439         .redirect(true)
440         .build();
441         let uid = config.common_data.uid;
442         let task_id = manager.create(config).unwrap();
443         manager.start(uid, task_id);
444         manager.pause(uid, task_id);
445         manager.scheduler.task_completed(uid, task_id);
446         manager.resume(uid, task_id);
447         ylong_runtime::block_on(async {
448             let info = client_rx.recv().await.unwrap();
449             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
450                 panic!("unexpected event: {:?}", info);
451             };
452             assert_eq!(subscribe_type, SubscribeType::Pause);
453             let info = client_rx.recv().await.unwrap();
454             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
455                 panic!("unexpected event: {:?}", info);
456             };
457             assert_eq!(subscribe_type, SubscribeType::Resume);
458             assert!(client_rx.is_empty());
459         })
460     }
461 
462     #[test]
ut_notify_pause_resume_failed()463     fn ut_notify_pause_resume_failed() {
464         test_init();
465         let _lock = lock_database();
466         let (mut manager, mut client_rx) = init_manager();
467 
468         let file_path = "test_files/ut_notify";
469 
470         let file = File::create(file_path).unwrap();
471         let config = ConfigBuilder::new()
472         .action(Action::Download)
473         .retry(true)
474         .mode(Mode::BackGround)
475         .file_spec(file)
476         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
477         .redirect(true)
478         .build();
479         let uid = config.common_data.uid;
480         let task_id = manager.create(config).unwrap();
481         manager.start(uid, task_id);
482         manager.pause(uid, task_id);
483         manager.scheduler.task_failed(uid, task_id, Reason::IoError);
484         manager.resume(uid, task_id);
485         ylong_runtime::block_on(async {
486             let info = client_rx.recv().await.unwrap();
487             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
488                 panic!("unexpected event: {:?}", info);
489             };
490             assert_eq!(subscribe_type, SubscribeType::Pause);
491             let info = client_rx.recv().await.unwrap();
492             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
493                 panic!("unexpected event: {:?}", info);
494             };
495             assert_eq!(subscribe_type, SubscribeType::Resume);
496             assert!(client_rx.is_empty());
497         })
498     }
499 }
500