1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::io::SeekFrom;
15 use std::pin::Pin;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18 use std::task::{Context, Poll};
19
20 use ylong_http_client::async_impl::{DownloadOperator, Downloader, Response};
21 use ylong_http_client::{ErrorKind, HttpClientError, SpeedLimit, Timeout};
22 use ylong_runtime::io::AsyncSeekExt;
23
24 use super::operator::TaskOperator;
25 use super::reason::Reason;
26 use super::request_task::{TaskError, TaskPhase};
27 use crate::manage::database::RequestDb;
28 use crate::task::info::State;
29 use crate::task::request_task::RequestTask;
30 #[cfg(feature = "oh")]
31 use crate::trace::Trace;
32
33 const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60;
34
35 const LOW_SPEED_TIME: u64 = 60;
36 const LOW_SPEED_LIMIT: u64 = 1;
37
38 impl DownloadOperator for TaskOperator {
poll_download( self: Pin<&mut Self>, cx: &mut Context<'_>, data: &[u8], ) -> Poll<Result<usize, HttpClientError>>39 fn poll_download(
40 self: Pin<&mut Self>,
41 cx: &mut Context<'_>,
42 data: &[u8],
43 ) -> Poll<Result<usize, HttpClientError>> {
44 self.poll_write_file(cx, data, 0)
45 }
46
poll_progress( mut self: Pin<&mut Self>, cx: &mut Context<'_>, _downloaded: u64, _total: Option<u64>, ) -> Poll<Result<(), HttpClientError>>47 fn poll_progress(
48 mut self: Pin<&mut Self>,
49 cx: &mut Context<'_>,
50 _downloaded: u64,
51 _total: Option<u64>,
52 ) -> Poll<Result<(), HttpClientError>> {
53 self.poll_progress_common(cx)
54 }
55 }
56
build_downloader( task: Arc<RequestTask>, response: Response, abort_flag: Arc<AtomicBool>, ) -> Downloader<TaskOperator>57 pub(crate) fn build_downloader(
58 task: Arc<RequestTask>,
59 response: Response,
60 abort_flag: Arc<AtomicBool>,
61 ) -> Downloader<TaskOperator> {
62 let task_operator = TaskOperator::new(task, abort_flag);
63
64 Downloader::builder()
65 .body(response)
66 .operator(task_operator)
67 .timeout(Timeout::from_secs(SECONDS_IN_ONE_WEEK))
68 .speed_limit(SpeedLimit::new().min_speed(LOW_SPEED_LIMIT, LOW_SPEED_TIME))
69 .build()
70 }
71
download(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>)72 pub(crate) async fn download(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) {
73 task.tries.store(0, Ordering::SeqCst);
74 loop {
75 if let Err(e) = download_inner(task.clone(), abort_flag.clone()).await {
76 match e {
77 TaskError::Waiting(phase) => match phase {
78 TaskPhase::NeedRetry => {
79 continue;
80 }
81 TaskPhase::UserAbort => {}
82 TaskPhase::NetworkOffline => {
83 *task.running_result.lock().unwrap() = Some(Err(Reason::NetworkOffline));
84 }
85 },
86 TaskError::Failed(reason) => {
87 *task.running_result.lock().unwrap() = Some(Err(reason));
88 }
89 }
90 } else {
91 *task.running_result.lock().unwrap() = Some(Ok(()));
92 }
93 break;
94 }
95 }
96
97 impl RequestTask {
prepare_download(&self) -> Result<(), TaskError>98 async fn prepare_download(&self) -> Result<(), TaskError> {
99 let file = self.files.get_mut(0).unwrap();
100 file.seek(SeekFrom::End(0)).await?;
101 let downloaded = file.metadata().await?.len() as usize;
102 let mut progress = self.progress.lock().unwrap();
103 progress.common_data.index = 0;
104 progress.common_data.total_processed = downloaded;
105 progress.common_data.state = State::Running.repr;
106 progress.processed = vec![downloaded];
107 progress.sizes = vec![-1];
108 Ok(())
109 }
110 }
111
download_inner( task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>, ) -> Result<(), TaskError>112 pub(crate) async fn download_inner(
113 task: Arc<RequestTask>,
114 abort_flag: Arc<AtomicBool>,
115 ) -> Result<(), TaskError> {
116 // Ensures `_trace` can only be freed when this function exits.
117 #[cfg(feature = "oh")]
118 let _trace = Trace::new("download file");
119
120 task.prepare_download().await?;
121
122 info!("download task {} running", task.task_id());
123
124 let request = task.build_download_request().await?;
125
126 let response = task.client.request(request).await;
127 match response.as_ref() {
128 Ok(response) => {
129 let status_code = response.status();
130 #[cfg(feature = "oh")]
131 task.notify_response(response);
132 info!(
133 "task {} get response {}",
134 task.conf.common_data.task_id, status_code
135 );
136 if status_code.is_server_error()
137 || (status_code.as_u16() != 408 && status_code.is_client_error())
138 || status_code.is_redirection()
139 {
140 return Err(TaskError::Failed(Reason::ProtocolError));
141 }
142 if status_code.as_u16() == 408 {
143 if task.timeout_tries.load(Ordering::SeqCst) < 2 {
144 task.timeout_tries.fetch_add(1, Ordering::SeqCst);
145 return Err(TaskError::Waiting(TaskPhase::NeedRetry));
146 } else {
147 return Err(TaskError::Failed(Reason::ProtocolError));
148 }
149 } else {
150 task.timeout_tries.store(0, Ordering::SeqCst);
151 }
152 if status_code.as_u16() == 200 {
153 if task.require_range() {
154 info!("task {} server not support range", task.task_id());
155 return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
156 }
157 let file = task.files.get(0).unwrap();
158 let has_downloaded = file.metadata().await?.len() > 0;
159 if has_downloaded {
160 error!("task {} file not cleared", task.task_id());
161 task.clear_downloaded_file().await?;
162 }
163 }
164 }
165 Err(e) => {
166 error!("Task {} {:?}", task.task_id(), e);
167
168 match e.error_kind() {
169 ErrorKind::Timeout => return Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
170 ErrorKind::Request => return Err(TaskError::Failed(Reason::RequestError)),
171 ErrorKind::Redirect => return Err(TaskError::Failed(Reason::RedirectError)),
172 ErrorKind::Connect | ErrorKind::ConnectionUpgrade => {
173 task.network_retry().await?;
174 if e.is_dns_error() {
175 return Err(TaskError::Failed(Reason::Dns));
176 } else if e.is_tls_error() {
177 return Err(TaskError::Failed(Reason::Ssl));
178 } else {
179 return Err(TaskError::Failed(Reason::Tcp));
180 }
181 }
182 ErrorKind::BodyTransfer => {
183 task.network_retry().await?;
184 return Err(TaskError::Failed(Reason::OthersError));
185 }
186 _ => {
187 if format!("{}", e).contains("No space left on device") {
188 return Err(TaskError::Failed(Reason::InsufficientSpace));
189 } else {
190 return Err(TaskError::Failed(Reason::OthersError));
191 }
192 }
193 };
194 }
195 };
196
197 let response = response.unwrap();
198 {
199 let mut guard = task.progress.lock().unwrap();
200 guard.extras.clear();
201 for (k, v) in response.headers() {
202 if let Ok(value) = v.to_string() {
203 guard.extras.insert(k.to_string().to_lowercase(), value);
204 }
205 }
206 }
207 task.get_file_info(&response)?;
208 task.update_progress_in_database();
209 RequestDb::get_instance()
210 .update_task_sizes(task.task_id(), &task.progress.lock().unwrap().sizes);
211
212 #[cfg(feature = "oh")]
213 let _trace = Trace::new(&format!(
214 "download file tid:{} size:{}",
215 task.task_id(),
216 task.progress.lock().unwrap().sizes[0]
217 ));
218 let mut downloader = build_downloader(task.clone(), response, abort_flag);
219
220 if let Err(e) = downloader.download().await {
221 return task.handle_download_error(e).await;
222 }
223 let file = task.files.get_mut(0).unwrap();
224 file.sync_all().await?;
225
226 #[cfg(not(test))]
227 check_file_exist(&task)?;
228 {
229 let mut guard = task.progress.lock().unwrap();
230 guard.sizes = vec![guard.processed[0] as i64];
231 }
232
233 info!("task {} download ok", task.task_id());
234 Ok(())
235 }
236
237 #[cfg(not(test))]
check_file_exist(task: &Arc<RequestTask>) -> Result<(), TaskError>238 fn check_file_exist(task: &Arc<RequestTask>) -> Result<(), TaskError> {
239 use crate::task::files::{convert_bundle_name, convert_path};
240
241 let config = task.config();
242 let bundle_name = convert_bundle_name(config);
243 let real_path = convert_path(
244 config.common_data.uid,
245 &bundle_name,
246 &config.file_specs[0].path,
247 );
248 // Cannot compare because file_total_size will be changed when resume task.
249 match std::fs::metadata(real_path) {
250 Ok(metadata) => {
251 if !metadata.is_file() {
252 error!("task {} check local not file", task.task_id());
253 return Err(TaskError::Failed(Reason::IoError));
254 }
255 }
256 Err(e) => {
257 // Skip this situation when we loss some permission.
258 if e.kind() == std::io::ErrorKind::NotFound {
259 error!("task {} check local not exist", task.task_id());
260 return Err(TaskError::Failed(Reason::IoError));
261 }
262 }
263 }
264 Ok(())
265 }
266
267 #[cfg(not(feature = "oh"))]
268 #[cfg(test)]
269 mod test {
270 use core::time;
271 use std::fs::File;
272 use std::io::{SeekFrom, Write};
273 use std::sync::Arc;
274
275 use once_cell::sync::Lazy;
276 use ylong_runtime::io::AsyncSeekExt;
277
278 use crate::config::{Action, ConfigBuilder, Mode, TaskConfig};
279 use crate::info::State;
280 use crate::manage::network::Network;
281 use crate::manage::task_manager::TaskManagerTx;
282 use crate::manage::TaskManager;
283 use crate::service::client::{ClientManager, ClientManagerEntry};
284 use crate::service::run_count::{RunCountManager, RunCountManagerEntry};
285 use crate::task::download::{download_inner, TaskPhase};
286 use crate::task::reason::Reason;
287 use crate::task::request_task::{check_config, RequestTask, TaskError};
288
289 const GITEE_FILE_LEN: u64 = 1042003;
290 const FS_FILE_LEN: u64 = 274619168;
291
build_task(config: TaskConfig) -> Arc<RequestTask>292 fn build_task(config: TaskConfig) -> Arc<RequestTask> {
293 static CLIENT: Lazy<ClientManagerEntry> = Lazy::new(|| ClientManager::init());
294 static RUN_COUNT_MANAGER: Lazy<RunCountManagerEntry> =
295 Lazy::new(|| RunCountManager::init());
296 static NETWORK: Lazy<Network> = Lazy::new(|| Network::new());
297
298 static TASK_MANGER: Lazy<TaskManagerTx> = Lazy::new(|| {
299 TaskManager::init(RUN_COUNT_MANAGER.clone(), CLIENT.clone(), NETWORK.clone())
300 });
301 let (files, client) = check_config(&config).unwrap();
302
303 let task = Arc::new(RequestTask::new(
304 config,
305 files,
306 client,
307 CLIENT.clone(),
308 NETWORK.clone(),
309 ));
310 task.status.lock().unwrap().state = State::Initialized;
311 task
312 }
313
init()314 fn init() {
315 let _ = env_logger::builder().is_test(true).try_init();
316 let _ = std::fs::create_dir("test_files/");
317 }
318
319 #[test]
ut_download_basic()320 fn ut_download_basic() {
321 init();
322 let file_path = "test_files/ut_download_basic.txt";
323
324 let file = File::create(file_path).unwrap();
325 let config = ConfigBuilder::new()
326 .action(Action::Download)
327 .mode(Mode::BackGround)
328 .file_spec(file)
329 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
330 .redirect(true)
331 .build();
332
333 let task = build_task(config);
334 ylong_runtime::block_on(async {
335 download_inner(task).await.unwrap();
336 let file = File::open(file_path).unwrap();
337 assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
338 });
339 }
340
341 #[test]
ut_download_resume()342 fn ut_download_resume() {
343 init();
344 let file_path = "test_files/ut_download_resume.txt";
345
346 let mut file = File::create(file_path).unwrap();
347 file.write(&[0; GITEE_FILE_LEN as usize - 10000]).unwrap();
348
349 let config = ConfigBuilder::new()
350 .action(Action::Download)
351 .mode(Mode::BackGround)
352 .file_spec(file)
353 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
354 .redirect(true)
355 .build();
356 let task = build_task(config);
357 ylong_runtime::block_on(async {
358 download_inner(task).await.unwrap();
359 let file = File::open(file_path).unwrap();
360 assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
361 });
362 }
363
364 #[test]
ut_download_not_support_range()365 fn ut_download_not_support_range() {
366 init();
367 let file_path = "test_files/ut_download_not_support_range.txt";
368
369 let file = File::create(file_path).unwrap();
370
371 let config = ConfigBuilder::new()
372 .action(Action::Download)
373 .mode(Mode::BackGround)
374 .file_spec(file)
375 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
376 .redirect(true)
377 .begins(5000)
378 .build();
379 let task = build_task(config);
380 ylong_runtime::block_on(async {
381 let res = download_inner(task).await.unwrap_err();
382 assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest));
383 let file = File::open(file_path).unwrap();
384 assert_eq!(0, file.metadata().unwrap().len());
385 });
386 }
387
388 #[test]
ut_download_resume_not_support_range()389 fn ut_download_resume_not_support_range() {
390 init();
391 let file_path = "test_files/ut_download_resume_not_support_range.txt";
392
393 let file = File::create(file_path).unwrap();
394
395 let config = ConfigBuilder::new()
396 .action(Action::Download)
397 .mode(Mode::BackGround)
398 .file_spec(file)
399 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
400 .redirect(true)
401 .build();
402 let task = build_task(config);
403 ylong_runtime::block_on(async {
404 let clone_task = task.clone();
405 ylong_runtime::spawn(async move {
406 ylong_runtime::time::sleep(time::Duration::from_secs(2)).await;
407 clone_task.status.lock().unwrap().state = State::Waiting;
408 });
409 let err = download_inner(task.clone()).await.unwrap_err();
410 assert_eq!(err, TaskError::Waiting(TaskPhase::UserAbort));
411
412 let file = task.files.get_mut(0).unwrap();
413 file.set_len(10000).await.unwrap();
414 file.seek(SeekFrom::End(0));
415
416 download_inner(task.clone()).await.unwrap();
417 let file = File::open(file_path).unwrap();
418 assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
419 });
420 }
421
422 #[test]
ut_download_not_support_range_resume()423 fn ut_download_not_support_range_resume() {
424 init();
425 let file_path = "test_files/ut_download_not_support_range_resume.txt";
426
427 let mut file = File::create(file_path).unwrap();
428 file.write(&[0; 1000]).unwrap();
429
430 let config = ConfigBuilder::new()
431 .action(Action::Download)
432 .mode(Mode::BackGround)
433 .file_spec(file)
434 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
435 .redirect(true)
436 .begins(5000)
437 .build();
438 let task = build_task(config);
439 ylong_runtime::block_on(async {
440 let res = download_inner(task).await.unwrap_err();
441 assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest));
442 let file = File::open(file_path).unwrap();
443 assert_eq!(1000, file.metadata().unwrap().len());
444 });
445 }
446
447 #[test]
ut_download_range_0()448 fn ut_download_range_0() {
449 init();
450 let file_path = "test_files/ut_download_range_0.txt";
451 let file = File::create(file_path).unwrap();
452 let config = ConfigBuilder::new()
453 .action(Action::Download)
454 .mode(Mode::BackGround)
455 .file_spec(file)
456 .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
457 .redirect(true)
458 .begins(5000)
459 .ends(10000)
460 .build();
461 let task = build_task(config);
462 ylong_runtime::block_on(async {
463 download_inner(task).await.unwrap();
464 let file = File::open(file_path).unwrap();
465 assert_eq!(5001, file.metadata().unwrap().len());
466 });
467 }
468
469 #[test]
ut_download_range_1()470 fn ut_download_range_1() {
471 init();
472 let file_path = "test_files/ut_download_range_1.txt";
473
474 let file = File::create(file_path).unwrap();
475 let config = ConfigBuilder::new()
476 .action(Action::Download)
477 .mode(Mode::BackGround)
478 .file_spec(file)
479 .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
480 .redirect(true)
481 .begins(273619168)
482 .build();
483 let task = build_task(config);
484 ylong_runtime::block_on(async {
485 download_inner(task).await.unwrap();
486 let file = File::open(file_path).unwrap();
487 assert_eq!(FS_FILE_LEN - 273619168, file.metadata().unwrap().len());
488 });
489 }
490
491 #[test]
ut_download_range_resume_0()492 fn ut_download_range_resume_0() {
493 init();
494 let file_path = "test_files/ut_download_range_resume_0.txt";
495
496 let mut file = File::create(file_path).unwrap();
497 file.write(&[0; FS_FILE_LEN as usize - 10000]).unwrap();
498 let config = ConfigBuilder::new()
499 .action(Action::Download)
500 .mode(Mode::BackGround)
501 .file_spec(file)
502 .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
503 .redirect(true)
504 .build();
505 let task = build_task(config);
506 ylong_runtime::block_on(async {
507 download_inner(task).await.unwrap();
508 let file = File::open(file_path).unwrap();
509 assert_eq!(FS_FILE_LEN, file.metadata().unwrap().len());
510 });
511 }
512
513 #[test]
ut_download_range_resume_1()514 fn ut_download_range_resume_1() {
515 init();
516 let file_path = "test_files/ut_download_range_resume_1.txt";
517
518 let file = File::create(file_path).unwrap();
519 file.set_len(FS_FILE_LEN - 10000).unwrap();
520 let config = ConfigBuilder::new()
521 .action(Action::Download)
522 .mode(Mode::BackGround)
523 .file_spec(file)
524 .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
525 .redirect(true)
526 .build();
527 let task = build_task(config);
528 ylong_runtime::block_on(async {
529 let clone_task = task.clone();
530 ylong_runtime::spawn(async move {
531 ylong_runtime::time::sleep(time::Duration::from_secs(2)).await;
532 clone_task.status.lock().unwrap().state = State::Waiting;
533 });
534 let ret = download_inner(task.clone()).await.unwrap_err();
535 assert_eq!(ret, TaskError::Waiting(TaskPhase::UserAbort));
536 let file = File::open(file_path).unwrap();
537 assert!(file.metadata().unwrap().len() < FS_FILE_LEN - 20000);
538 download_inner(task.clone()).await.unwrap();
539 assert_eq!(file.metadata().unwrap().len(), FS_FILE_LEN);
540 });
541 }
542
543 #[test]
ut_download_invalid_task()544 fn ut_download_invalid_task() {
545 init();
546 let file_path = "test_files/ut_download_basic.txt";
547
548 let file = File::create(file_path).unwrap();
549 let config = ConfigBuilder::new()
550 .action(Action::Download)
551 .mode(Mode::BackGround)
552 .file_spec(file)
553 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
554 .redirect(true)
555 .build();
556
557 let task = build_task(config);
558 {
559 let mut progress = task.progress.lock().unwrap();
560 progress.sizes = vec![0];
561 progress.processed = vec![];
562 progress.common_data.index = 23;
563 progress.common_data.state = State::Failed.repr;
564 progress.common_data.total_processed = 321223;
565 }
566 ylong_runtime::block_on(async {
567 download_inner(task.clone()).await.unwrap();
568 let file = File::open(file_path).unwrap();
569 assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
570
571 assert_eq!(State::Completed, task.status.lock().unwrap().state);
572 assert_eq!(0, task.progress.lock().unwrap().common_data.index);
573 assert_eq!(
574 GITEE_FILE_LEN,
575 task.progress.lock().unwrap().common_data.total_processed as u64
576 );
577 assert_eq!(
578 GITEE_FILE_LEN,
579 task.progress.lock().unwrap().processed[0] as u64
580 );
581 assert_eq!(
582 GITEE_FILE_LEN,
583 task.progress.lock().unwrap().sizes[0] as u64
584 );
585 });
586 }
587
588 /// For xts SUB_REQUEST_CROSSPLATFORM_DOWNDLOAD_API_TASKINFO_0002,
589 /// downloadTotalBytes to be -1
590 #[test]
ut_download_sizes()591 fn ut_download_sizes() {
592 init();
593 let file_path = "test_files/ut_download_basic.txt";
594
595 let file = File::create(file_path).unwrap();
596 let config = ConfigBuilder::new()
597 .action(Action::Download)
598 .mode(Mode::BackGround)
599 .file_spec(file)
600 .url("https://gitee.com/chenzhixue/downloadTest/releases/download/v1.0/test_not_exists.apk")
601 .redirect(true)
602 .build();
603
604 let task = build_task(config);
605 {
606 let mut progress = task.progress.lock().unwrap();
607 progress.sizes = vec![0, 1, 2, 3];
608 progress.processed = vec![];
609 progress.common_data.index = 23;
610 progress.common_data.state = State::Failed.repr;
611 progress.common_data.total_processed = 321223;
612 }
613 ylong_runtime::block_on(async {
614 let err = download_inner(task.clone()).await.unwrap_err();
615 assert_eq!(err, TaskError::Failed(Reason::ProtocolError));
616 let sizes = task.progress.lock().unwrap().sizes.clone();
617 assert_eq!(sizes, vec![-1]);
618 });
619 }
620 }
621