1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use crate::info::State;
15 use crate::service::client::ClientManagerEntry;
16 use crate::task::notify::{NotifyData, SubscribeType};
17
18 pub(crate) struct Notifier;
19
20 impl Notifier {
complete(client_manager: &ClientManagerEntry, notify_data: NotifyData)21 pub(crate) fn complete(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
22 #[cfg(feature = "oh")]
23 let _ = publish_state_change_event(
24 notify_data.bundle.as_str(),
25 notify_data.task_id,
26 State::Completed.repr as i32,
27 notify_data.uid,
28 );
29 client_manager.send_notify_data(SubscribeType::Complete, notify_data)
30 }
31
fail(client_manager: &ClientManagerEntry, notify_data: NotifyData)32 pub(crate) fn fail(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
33 #[cfg(feature = "oh")]
34 let _ = publish_state_change_event(
35 notify_data.bundle.as_str(),
36 notify_data.task_id,
37 State::Failed.repr as i32,
38 notify_data.uid,
39 );
40 client_manager.send_notify_data(SubscribeType::Fail, notify_data)
41 }
42
pause(client_manager: &ClientManagerEntry, notify_data: NotifyData)43 pub(crate) fn pause(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
44 client_manager.send_notify_data(SubscribeType::Pause, notify_data)
45 }
46
resume(client_manager: &ClientManagerEntry, notify_data: NotifyData)47 pub(crate) fn resume(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
48 client_manager.send_notify_data(SubscribeType::Resume, notify_data)
49 }
50
header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData)51 pub(crate) fn header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
52 client_manager.send_notify_data(SubscribeType::HeaderReceive, notify_data)
53 }
54
progress(client_manager: &ClientManagerEntry, notify_data: NotifyData)55 pub(crate) fn progress(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
56 let total_processed = notify_data.progress.common_data.total_processed;
57 let file_total_size: i64 = notify_data.progress.sizes.iter().sum();
58 if total_processed == 0 && file_total_size < 0 {
59 return;
60 }
61 client_manager.send_notify_data(SubscribeType::Progress, notify_data)
62 }
63
remove(client_manager: &ClientManagerEntry, notify_data: NotifyData)64 pub(crate) fn remove(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
65 let task_id = notify_data.task_id;
66 client_manager.send_notify_data(SubscribeType::Remove, notify_data);
67 client_manager.notify_task_finished(task_id);
68 }
69 }
70
71 #[cfg(feature = "oh")]
publish_state_change_event( bundle_name: &str, task_id: u32, state: i32, uid: u64, ) -> Result<(), ()>72 pub(crate) fn publish_state_change_event(
73 bundle_name: &str,
74 task_id: u32,
75 state: i32,
76 uid: u64,
77 ) -> Result<(), ()> {
78 match crate::utils::PublishStateChangeEvent(bundle_name, task_id, state, uid as i32) {
79 true => Ok(()),
80 false => Err(()),
81 }
82 }
83 #[allow(unused)]
84 #[cfg(test)]
85 mod test {
86 use std::fs::File;
87 use std::sync::Arc;
88 use std::time::Duration;
89
90 use cxx::UniquePtr;
91 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver};
92
93 use crate::config::{Action, ConfigBuilder, Mode};
94 use crate::error::ErrorCode;
95 use crate::info::{State, TaskInfo};
96 use crate::manage::database::RequestDb;
97 use crate::manage::events::{TaskEvent, TaskManagerEvent};
98 use crate::manage::network::{Network, NetworkInfo, NetworkInner, NetworkState, NetworkType};
99 use crate::manage::network_manager::NetworkManager;
100 use crate::manage::task_manager::{TaskManagerRx, TaskManagerTx};
101 use crate::manage::TaskManager;
102 use crate::service::client::{ClientEvent, ClientManager, ClientManagerEntry};
103 use crate::service::run_count::RunCountManagerEntry;
104 use crate::task::notify::SubscribeType;
105 use crate::task::reason::Reason;
106 use crate::tests::{lock_database, test_init};
107
108 const GITEE_FILE_LEN: usize = 1042003;
109
init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>)110 fn init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>) {
111 let (tx, rx) = unbounded_channel();
112 let task_manager_tx = TaskManagerTx::new(tx);
113 let rx = TaskManagerRx::new(rx);
114 {
115 let network_manager = NetworkManager::get_instance().lock().unwrap();
116 let notifier = network_manager.network.inner.clone();
117 notifier.notify_online(NetworkInfo {
118 network_type: NetworkType::Wifi,
119 is_metered: false,
120 is_roaming: false,
121 });
122 }
123 let (tx, _rx) = unbounded_channel();
124 let run_count = RunCountManagerEntry::new(tx);
125 let (tx, client_rx) = unbounded_channel();
126 let client = ClientManagerEntry::new(tx);
127 (
128 TaskManager::new(task_manager_tx, rx, run_count, client),
129 client_rx,
130 )
131 }
132
133 #[cfg(feature = "oh")]
134 #[test]
ut_network()135 fn ut_network() {
136 test_init();
137 let notifier;
138 {
139 let network_manager = NetworkManager::get_instance().lock().unwrap();
140 notifier = network_manager.network.inner.clone();
141 }
142
143 notifier.notify_online(NetworkInfo {
144 network_type: NetworkType::Wifi,
145 is_metered: false,
146 is_roaming: false,
147 });
148 assert!(NetworkManager::is_online());
149 assert_eq!(
150 NetworkManager::query_network(),
151 NetworkState::Online(NetworkInfo {
152 network_type: NetworkType::Wifi,
153 is_metered: false,
154 is_roaming: false,
155 })
156 );
157 notifier.notify_offline();
158 assert!(!NetworkManager::is_online());
159 notifier.notify_online(NetworkInfo {
160 network_type: NetworkType::Cellular,
161 is_metered: true,
162 is_roaming: true,
163 });
164 assert!(NetworkManager::is_online());
165 assert_eq!(
166 NetworkManager::query_network(),
167 NetworkState::Online(NetworkInfo {
168 network_type: NetworkType::Cellular,
169 is_metered: true,
170 is_roaming: true,
171 })
172 );
173 }
174
175 #[cfg(feature = "oh")]
176 #[test]
ut_network_notify()177 fn ut_network_notify() {
178 test_init();
179 let notifier = NetworkInner::new();
180 notifier.notify_offline();
181 assert!(notifier.notify_online(NetworkInfo {
182 network_type: NetworkType::Wifi,
183 is_metered: true,
184 is_roaming: true,
185 }));
186 assert!(!notifier.notify_online(NetworkInfo {
187 network_type: NetworkType::Wifi,
188 is_metered: true,
189 is_roaming: true,
190 }));
191 assert!(notifier.notify_online(NetworkInfo {
192 network_type: NetworkType::Wifi,
193 is_metered: false,
194 is_roaming: true,
195 }));
196 assert!(notifier.notify_online(NetworkInfo {
197 network_type: NetworkType::Cellular,
198 is_metered: false,
199 is_roaming: true,
200 }));
201 }
202
203 #[test]
ut_notify_progress()204 fn ut_notify_progress() {
205 test_init();
206 let _lock = lock_database();
207 let (mut manager, mut client_rx) = init_manager();
208
209 let file_path = "test_files/ut_notify_completed.txt";
210
211 let file = File::create(file_path).unwrap();
212 let config = ConfigBuilder::new()
213 .action(Action::Download)
214 .retry(true)
215 .mode(Mode::BackGround)
216 .file_spec(file)
217 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
218 .redirect(true)
219 .build();
220 let uid = config.common_data.uid;
221 let task_id = manager.create(config).unwrap();
222 manager.start(uid, task_id);
223 manager.scheduler.reschedule();
224 ylong_runtime::block_on(async {
225 let info = client_rx.recv().await.unwrap();
226 let ClientEvent::SendResponse(tid, version, status_code, reason, headers) = info else {
227 panic!("unexpected event: {:?}", info);
228 };
229 assert_eq!(tid, task_id);
230 assert_eq!(version, "HTTP/1.1");
231 assert_eq!(status_code, 200);
232 assert_eq!(reason, "OK");
233 assert!(!headers.is_empty());
234 loop {
235 let info = client_rx.recv().await.unwrap();
236 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
237 panic!("unexpected event: {:?}", info);
238 };
239 let mut previous = 0;
240 assert_eq!(subscribe_type, SubscribeType::Progress);
241 assert_eq!(data.task_id, task_id);
242 assert!(!data.progress.extras.is_empty());
243 assert_eq!(data.progress.common_data.state, State::Running.repr);
244 assert_eq!(data.progress.common_data.index, 0);
245 assert_eq!(
246 data.progress.processed[0],
247 data.progress.common_data.total_processed
248 );
249
250 assert!(data.progress.common_data.total_processed >= previous);
251 previous = data.progress.common_data.total_processed;
252 if data.progress.common_data.total_processed == GITEE_FILE_LEN {
253 break;
254 }
255 }
256 })
257 }
258
259 #[test]
ut_notify_pause_resume()260 fn ut_notify_pause_resume() {
261 test_init();
262 let _lock = lock_database();
263 let (mut manager, mut client_rx) = init_manager();
264
265 let file_path = "test_files/ut_notify";
266
267 let file = File::create(file_path).unwrap();
268 let config = ConfigBuilder::new()
269 .action(Action::Download)
270 .retry(true)
271 .mode(Mode::BackGround)
272 .file_spec(file)
273 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
274 .redirect(true)
275 .build();
276 let uid = config.common_data.uid;
277 let task_id = manager.create(config).unwrap();
278 manager.start(uid, task_id);
279 manager.pause(uid, task_id);
280 manager.resume(uid, task_id);
281 ylong_runtime::block_on(async {
282 let info = client_rx.recv().await.unwrap();
283 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
284 panic!("unexpected event: {:?}", info);
285 };
286 assert_eq!(subscribe_type, SubscribeType::Pause);
287 assert!(data.progress.extras.is_empty());
288 assert_eq!(data.progress.common_data.state, State::Paused.repr);
289 assert_eq!(data.progress.common_data.index, 0);
290 assert_eq!(
291 data.progress.processed[0],
292 data.progress.common_data.total_processed
293 );
294 assert_eq!(data.progress.common_data.total_processed, 0);
295 let info = client_rx.recv().await.unwrap();
296 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
297 panic!("unexpected event: {:?}", info);
298 };
299 assert_eq!(subscribe_type, SubscribeType::Resume);
300 assert!(data.progress.extras.is_empty());
301 assert_eq!(data.progress.common_data.state, State::Waiting.repr);
302 assert_eq!(data.progress.common_data.index, 0);
303 assert_eq!(
304 data.progress.processed[0],
305 data.progress.common_data.total_processed
306 );
307 assert_eq!(data.progress.common_data.total_processed, 0);
308 })
309 }
310
311 #[test]
ut_notify_remove()312 fn ut_notify_remove() {
313 test_init();
314 let _lock = lock_database();
315 let (mut manager, mut client_rx) = init_manager();
316
317 let file_path = "test_files/ut_notify";
318
319 let file = File::create(file_path).unwrap();
320 let config = ConfigBuilder::new()
321 .action(Action::Download)
322 .retry(true)
323 .mode(Mode::BackGround)
324 .file_spec(file)
325 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
326 .redirect(true)
327 .build();
328 let uid = config.common_data.uid;
329 let task_id = manager.create(config).unwrap();
330 manager.remove(uid, task_id);
331 ylong_runtime::block_on(async {
332 let info = client_rx.recv().await.unwrap();
333 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
334 panic!("unexpected event: {:?}", info);
335 };
336 assert_eq!(subscribe_type, SubscribeType::Remove);
337 assert!(data.progress.extras.is_empty());
338 assert_eq!(data.progress.common_data.state, State::Removed.repr);
339 assert_eq!(data.progress.common_data.index, 0);
340 assert_eq!(
341 data.progress.processed[0],
342 data.progress.common_data.total_processed
343 );
344 assert_eq!(data.progress.common_data.total_processed, 0);
345 })
346 }
347
348 #[test]
ut_notify_completed()349 fn ut_notify_completed() {
350 test_init();
351 let _lock = lock_database();
352 let (mut manager, mut client_rx) = init_manager();
353
354 let file_path = "test_files/ut_notify";
355
356 let file = File::create(file_path).unwrap();
357 let config = ConfigBuilder::new()
358 .action(Action::Download)
359 .retry(true)
360 .mode(Mode::BackGround)
361 .file_spec(file)
362 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
363 .redirect(true)
364 .build();
365 let uid = config.common_data.uid;
366 let task_id = manager.create(config).unwrap();
367 manager.start(uid, task_id);
368 manager.scheduler.task_completed(uid, task_id);
369 ylong_runtime::block_on(async {
370 let info = client_rx.recv().await.unwrap();
371 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
372 panic!("unexpected event: {:?}", info);
373 };
374 assert_eq!(subscribe_type, SubscribeType::Complete);
375 assert!(data.progress.extras.is_empty());
376 assert_eq!(data.progress.common_data.state, State::Completed.repr);
377 assert_eq!(data.progress.common_data.index, 0);
378 assert_eq!(
379 data.progress.processed[0],
380 data.progress.common_data.total_processed
381 );
382 assert_eq!(data.progress.common_data.total_processed, 0);
383 })
384 }
385
386 #[test]
ut_notify_failed()387 fn ut_notify_failed() {
388 test_init();
389 let _lock = lock_database();
390 let (mut manager, mut client_rx) = init_manager();
391
392 let file_path = "test_files/ut_notify";
393
394 let file = File::create(file_path).unwrap();
395 let config = ConfigBuilder::new()
396 .action(Action::Download)
397 .retry(true)
398 .mode(Mode::BackGround)
399 .file_spec(file)
400 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
401 .redirect(true)
402 .build();
403 let uid = config.common_data.uid;
404 let task_id = manager.create(config).unwrap();
405 manager.start(uid, task_id);
406 manager.scheduler.task_failed(uid, task_id, Reason::IoError);
407 ylong_runtime::block_on(async {
408 let info = client_rx.recv().await.unwrap();
409 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
410 panic!("unexpected event: {:?}", info);
411 };
412 assert_eq!(subscribe_type, SubscribeType::Fail);
413 assert!(data.progress.extras.is_empty());
414 assert_eq!(data.progress.common_data.state, State::Failed.repr);
415 assert_eq!(data.progress.common_data.index, 0);
416 assert_eq!(
417 data.progress.processed[0],
418 data.progress.common_data.total_processed
419 );
420 assert_eq!(data.progress.common_data.total_processed, 0);
421 })
422 }
423
424 #[test]
ut_notify_pause_resume_completed()425 fn ut_notify_pause_resume_completed() {
426 test_init();
427 let _lock = lock_database();
428 let (mut manager, mut client_rx) = init_manager();
429
430 let file_path = "test_files/ut_notify";
431
432 let file = File::create(file_path).unwrap();
433 let config = ConfigBuilder::new()
434 .action(Action::Download)
435 .retry(true)
436 .mode(Mode::BackGround)
437 .file_spec(file)
438 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
439 .redirect(true)
440 .build();
441 let uid = config.common_data.uid;
442 let task_id = manager.create(config).unwrap();
443 manager.start(uid, task_id);
444 manager.pause(uid, task_id);
445 manager.scheduler.task_completed(uid, task_id);
446 manager.resume(uid, task_id);
447 ylong_runtime::block_on(async {
448 let info = client_rx.recv().await.unwrap();
449 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
450 panic!("unexpected event: {:?}", info);
451 };
452 assert_eq!(subscribe_type, SubscribeType::Pause);
453 let info = client_rx.recv().await.unwrap();
454 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
455 panic!("unexpected event: {:?}", info);
456 };
457 assert_eq!(subscribe_type, SubscribeType::Resume);
458 assert!(client_rx.is_empty());
459 })
460 }
461
462 #[test]
ut_notify_pause_resume_failed()463 fn ut_notify_pause_resume_failed() {
464 test_init();
465 let _lock = lock_database();
466 let (mut manager, mut client_rx) = init_manager();
467
468 let file_path = "test_files/ut_notify";
469
470 let file = File::create(file_path).unwrap();
471 let config = ConfigBuilder::new()
472 .action(Action::Download)
473 .retry(true)
474 .mode(Mode::BackGround)
475 .file_spec(file)
476 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
477 .redirect(true)
478 .build();
479 let uid = config.common_data.uid;
480 let task_id = manager.create(config).unwrap();
481 manager.start(uid, task_id);
482 manager.pause(uid, task_id);
483 manager.scheduler.task_failed(uid, task_id, Reason::IoError);
484 manager.resume(uid, task_id);
485 ylong_runtime::block_on(async {
486 let info = client_rx.recv().await.unwrap();
487 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
488 panic!("unexpected event: {:?}", info);
489 };
490 assert_eq!(subscribe_type, SubscribeType::Pause);
491 let info = client_rx.recv().await.unwrap();
492 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
493 panic!("unexpected event: {:?}", info);
494 };
495 assert_eq!(subscribe_type, SubscribeType::Resume);
496 assert!(client_rx.is_empty());
497 })
498 }
499 }
500