1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io::SeekFrom;
15 use std::pin::Pin;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18 use std::task::{Context, Poll};
19 
20 use ylong_http_client::async_impl::{DownloadOperator, Downloader, Response};
21 use ylong_http_client::{ErrorKind, HttpClientError, SpeedLimit, Timeout};
22 use ylong_runtime::io::AsyncSeekExt;
23 
24 use super::operator::TaskOperator;
25 use super::reason::Reason;
26 use super::request_task::{TaskError, TaskPhase};
27 use crate::manage::database::RequestDb;
28 use crate::task::info::State;
29 use crate::task::request_task::RequestTask;
30 #[cfg(feature = "oh")]
31 use crate::trace::Trace;
32 
33 const SECONDS_IN_ONE_WEEK: u64 = 7 * 24 * 60 * 60;
34 
35 const LOW_SPEED_TIME: u64 = 60;
36 const LOW_SPEED_LIMIT: u64 = 1;
37 
38 impl DownloadOperator for TaskOperator {
poll_download( self: Pin<&mut Self>, cx: &mut Context<'_>, data: &[u8], ) -> Poll<Result<usize, HttpClientError>>39     fn poll_download(
40         self: Pin<&mut Self>,
41         cx: &mut Context<'_>,
42         data: &[u8],
43     ) -> Poll<Result<usize, HttpClientError>> {
44         self.poll_write_file(cx, data, 0)
45     }
46 
poll_progress( mut self: Pin<&mut Self>, cx: &mut Context<'_>, _downloaded: u64, _total: Option<u64>, ) -> Poll<Result<(), HttpClientError>>47     fn poll_progress(
48         mut self: Pin<&mut Self>,
49         cx: &mut Context<'_>,
50         _downloaded: u64,
51         _total: Option<u64>,
52     ) -> Poll<Result<(), HttpClientError>> {
53         self.poll_progress_common(cx)
54     }
55 }
56 
build_downloader( task: Arc<RequestTask>, response: Response, abort_flag: Arc<AtomicBool>, ) -> Downloader<TaskOperator>57 pub(crate) fn build_downloader(
58     task: Arc<RequestTask>,
59     response: Response,
60     abort_flag: Arc<AtomicBool>,
61 ) -> Downloader<TaskOperator> {
62     let task_operator = TaskOperator::new(task, abort_flag);
63 
64     Downloader::builder()
65         .body(response)
66         .operator(task_operator)
67         .timeout(Timeout::from_secs(SECONDS_IN_ONE_WEEK))
68         .speed_limit(SpeedLimit::new().min_speed(LOW_SPEED_LIMIT, LOW_SPEED_TIME))
69         .build()
70 }
71 
download(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>)72 pub(crate) async fn download(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) {
73     task.tries.store(0, Ordering::SeqCst);
74     loop {
75         if let Err(e) = download_inner(task.clone(), abort_flag.clone()).await {
76             match e {
77                 TaskError::Waiting(phase) => match phase {
78                     TaskPhase::NeedRetry => {
79                         continue;
80                     }
81                     TaskPhase::UserAbort => {}
82                     TaskPhase::NetworkOffline => {
83                         *task.running_result.lock().unwrap() = Some(Err(Reason::NetworkOffline));
84                     }
85                 },
86                 TaskError::Failed(reason) => {
87                     *task.running_result.lock().unwrap() = Some(Err(reason));
88                 }
89             }
90         } else {
91             *task.running_result.lock().unwrap() = Some(Ok(()));
92         }
93         break;
94     }
95 }
96 
97 impl RequestTask {
prepare_download(&self) -> Result<(), TaskError>98     async fn prepare_download(&self) -> Result<(), TaskError> {
99         let file = self.files.get_mut(0).unwrap();
100         file.seek(SeekFrom::End(0)).await?;
101         let downloaded = file.metadata().await?.len() as usize;
102         let mut progress = self.progress.lock().unwrap();
103         progress.common_data.index = 0;
104         progress.common_data.total_processed = downloaded;
105         progress.common_data.state = State::Running.repr;
106         progress.processed = vec![downloaded];
107         progress.sizes = vec![-1];
108         Ok(())
109     }
110 }
111 
download_inner( task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>, ) -> Result<(), TaskError>112 pub(crate) async fn download_inner(
113     task: Arc<RequestTask>,
114     abort_flag: Arc<AtomicBool>,
115 ) -> Result<(), TaskError> {
116     // Ensures `_trace` can only be freed when this function exits.
117     #[cfg(feature = "oh")]
118     let _trace = Trace::new("download file");
119 
120     task.prepare_download().await?;
121 
122     info!("download task {} running", task.task_id());
123 
124     let request = task.build_download_request().await?;
125 
126     let response = task.client.request(request).await;
127     match response.as_ref() {
128         Ok(response) => {
129             let status_code = response.status();
130             #[cfg(feature = "oh")]
131             task.notify_response(response);
132             info!(
133                 "task {} get response {}",
134                 task.conf.common_data.task_id, status_code
135             );
136             if status_code.is_server_error()
137                 || (status_code.as_u16() != 408 && status_code.is_client_error())
138                 || status_code.is_redirection()
139             {
140                 return Err(TaskError::Failed(Reason::ProtocolError));
141             }
142             if status_code.as_u16() == 408 {
143                 if task.timeout_tries.load(Ordering::SeqCst) < 2 {
144                     task.timeout_tries.fetch_add(1, Ordering::SeqCst);
145                     return Err(TaskError::Waiting(TaskPhase::NeedRetry));
146                 } else {
147                     return Err(TaskError::Failed(Reason::ProtocolError));
148                 }
149             } else {
150                 task.timeout_tries.store(0, Ordering::SeqCst);
151             }
152             if status_code.as_u16() == 200 {
153                 if task.require_range() {
154                     info!("task {} server not support range", task.task_id());
155                     return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
156                 }
157                 let file = task.files.get(0).unwrap();
158                 let has_downloaded = file.metadata().await?.len() > 0;
159                 if has_downloaded {
160                     error!("task {} file not cleared", task.task_id());
161                     task.clear_downloaded_file().await?;
162                 }
163             }
164         }
165         Err(e) => {
166             error!("Task {} {:?}", task.task_id(), e);
167 
168             match e.error_kind() {
169                 ErrorKind::Timeout => return Err(TaskError::Failed(Reason::ContinuousTaskTimeout)),
170                 ErrorKind::Request => return Err(TaskError::Failed(Reason::RequestError)),
171                 ErrorKind::Redirect => return Err(TaskError::Failed(Reason::RedirectError)),
172                 ErrorKind::Connect | ErrorKind::ConnectionUpgrade => {
173                     task.network_retry().await?;
174                     if e.is_dns_error() {
175                         return Err(TaskError::Failed(Reason::Dns));
176                     } else if e.is_tls_error() {
177                         return Err(TaskError::Failed(Reason::Ssl));
178                     } else {
179                         return Err(TaskError::Failed(Reason::Tcp));
180                     }
181                 }
182                 ErrorKind::BodyTransfer => {
183                     task.network_retry().await?;
184                     return Err(TaskError::Failed(Reason::OthersError));
185                 }
186                 _ => {
187                     if format!("{}", e).contains("No space left on device") {
188                         return Err(TaskError::Failed(Reason::InsufficientSpace));
189                     } else {
190                         return Err(TaskError::Failed(Reason::OthersError));
191                     }
192                 }
193             };
194         }
195     };
196 
197     let response = response.unwrap();
198     {
199         let mut guard = task.progress.lock().unwrap();
200         guard.extras.clear();
201         for (k, v) in response.headers() {
202             if let Ok(value) = v.to_string() {
203                 guard.extras.insert(k.to_string().to_lowercase(), value);
204             }
205         }
206     }
207     task.get_file_info(&response)?;
208     task.update_progress_in_database();
209     RequestDb::get_instance()
210         .update_task_sizes(task.task_id(), &task.progress.lock().unwrap().sizes);
211 
212     #[cfg(feature = "oh")]
213     let _trace = Trace::new(&format!(
214         "download file tid:{} size:{}",
215         task.task_id(),
216         task.progress.lock().unwrap().sizes[0]
217     ));
218     let mut downloader = build_downloader(task.clone(), response, abort_flag);
219 
220     if let Err(e) = downloader.download().await {
221         return task.handle_download_error(e).await;
222     }
223     let file = task.files.get_mut(0).unwrap();
224     file.sync_all().await?;
225 
226     #[cfg(not(test))]
227     check_file_exist(&task)?;
228     {
229         let mut guard = task.progress.lock().unwrap();
230         guard.sizes = vec![guard.processed[0] as i64];
231     }
232 
233     info!("task {} download ok", task.task_id());
234     Ok(())
235 }
236 
237 #[cfg(not(test))]
check_file_exist(task: &Arc<RequestTask>) -> Result<(), TaskError>238 fn check_file_exist(task: &Arc<RequestTask>) -> Result<(), TaskError> {
239     use crate::task::files::{convert_bundle_name, convert_path};
240 
241     let config = task.config();
242     let bundle_name = convert_bundle_name(config);
243     let real_path = convert_path(
244         config.common_data.uid,
245         &bundle_name,
246         &config.file_specs[0].path,
247     );
248     // Cannot compare because file_total_size will be changed when resume task.
249     match std::fs::metadata(real_path) {
250         Ok(metadata) => {
251             if !metadata.is_file() {
252                 error!("task {} check local not file", task.task_id());
253                 return Err(TaskError::Failed(Reason::IoError));
254             }
255         }
256         Err(e) => {
257             // Skip this situation when we loss some permission.
258             if e.kind() == std::io::ErrorKind::NotFound {
259                 error!("task {} check local not exist", task.task_id());
260                 return Err(TaskError::Failed(Reason::IoError));
261             }
262         }
263     }
264     Ok(())
265 }
266 
267 #[cfg(not(feature = "oh"))]
268 #[cfg(test)]
269 mod test {
270     use core::time;
271     use std::fs::File;
272     use std::io::{SeekFrom, Write};
273     use std::sync::Arc;
274 
275     use once_cell::sync::Lazy;
276     use ylong_runtime::io::AsyncSeekExt;
277 
278     use crate::config::{Action, ConfigBuilder, Mode, TaskConfig};
279     use crate::info::State;
280     use crate::manage::network::Network;
281     use crate::manage::task_manager::TaskManagerTx;
282     use crate::manage::TaskManager;
283     use crate::service::client::{ClientManager, ClientManagerEntry};
284     use crate::service::run_count::{RunCountManager, RunCountManagerEntry};
285     use crate::task::download::{download_inner, TaskPhase};
286     use crate::task::reason::Reason;
287     use crate::task::request_task::{check_config, RequestTask, TaskError};
288 
289     const GITEE_FILE_LEN: u64 = 1042003;
290     const FS_FILE_LEN: u64 = 274619168;
291 
build_task(config: TaskConfig) -> Arc<RequestTask>292     fn build_task(config: TaskConfig) -> Arc<RequestTask> {
293         static CLIENT: Lazy<ClientManagerEntry> = Lazy::new(|| ClientManager::init());
294         static RUN_COUNT_MANAGER: Lazy<RunCountManagerEntry> =
295             Lazy::new(|| RunCountManager::init());
296         static NETWORK: Lazy<Network> = Lazy::new(|| Network::new());
297 
298         static TASK_MANGER: Lazy<TaskManagerTx> = Lazy::new(|| {
299             TaskManager::init(RUN_COUNT_MANAGER.clone(), CLIENT.clone(), NETWORK.clone())
300         });
301         let (files, client) = check_config(&config).unwrap();
302 
303         let task = Arc::new(RequestTask::new(
304             config,
305             files,
306             client,
307             CLIENT.clone(),
308             NETWORK.clone(),
309         ));
310         task.status.lock().unwrap().state = State::Initialized;
311         task
312     }
313 
init()314     fn init() {
315         let _ = env_logger::builder().is_test(true).try_init();
316         let _ = std::fs::create_dir("test_files/");
317     }
318 
319     #[test]
ut_download_basic()320     fn ut_download_basic() {
321         init();
322         let file_path = "test_files/ut_download_basic.txt";
323 
324         let file = File::create(file_path).unwrap();
325         let config = ConfigBuilder::new()
326         .action(Action::Download)
327         .mode(Mode::BackGround)
328         .file_spec(file)
329         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
330         .redirect(true)
331         .build();
332 
333         let task = build_task(config);
334         ylong_runtime::block_on(async {
335             download_inner(task).await.unwrap();
336             let file = File::open(file_path).unwrap();
337             assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
338         });
339     }
340 
341     #[test]
ut_download_resume()342     fn ut_download_resume() {
343         init();
344         let file_path = "test_files/ut_download_resume.txt";
345 
346         let mut file = File::create(file_path).unwrap();
347         file.write(&[0; GITEE_FILE_LEN as usize - 10000]).unwrap();
348 
349         let config = ConfigBuilder::new()
350         .action(Action::Download)
351         .mode(Mode::BackGround)
352         .file_spec(file)
353         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
354         .redirect(true)
355         .build();
356         let task = build_task(config);
357         ylong_runtime::block_on(async {
358             download_inner(task).await.unwrap();
359             let file = File::open(file_path).unwrap();
360             assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
361         });
362     }
363 
364     #[test]
ut_download_not_support_range()365     fn ut_download_not_support_range() {
366         init();
367         let file_path = "test_files/ut_download_not_support_range.txt";
368 
369         let file = File::create(file_path).unwrap();
370 
371         let config = ConfigBuilder::new()
372         .action(Action::Download)
373         .mode(Mode::BackGround)
374         .file_spec(file)
375         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
376         .redirect(true)
377         .begins(5000)
378         .build();
379         let task = build_task(config);
380         ylong_runtime::block_on(async {
381             let res = download_inner(task).await.unwrap_err();
382             assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest));
383             let file = File::open(file_path).unwrap();
384             assert_eq!(0, file.metadata().unwrap().len());
385         });
386     }
387 
388     #[test]
ut_download_resume_not_support_range()389     fn ut_download_resume_not_support_range() {
390         init();
391         let file_path = "test_files/ut_download_resume_not_support_range.txt";
392 
393         let file = File::create(file_path).unwrap();
394 
395         let config = ConfigBuilder::new()
396         .action(Action::Download)
397         .mode(Mode::BackGround)
398         .file_spec(file)
399         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
400         .redirect(true)
401         .build();
402         let task = build_task(config);
403         ylong_runtime::block_on(async {
404             let clone_task = task.clone();
405             ylong_runtime::spawn(async move {
406                 ylong_runtime::time::sleep(time::Duration::from_secs(2)).await;
407                 clone_task.status.lock().unwrap().state = State::Waiting;
408             });
409             let err = download_inner(task.clone()).await.unwrap_err();
410             assert_eq!(err, TaskError::Waiting(TaskPhase::UserAbort));
411 
412             let file = task.files.get_mut(0).unwrap();
413             file.set_len(10000).await.unwrap();
414             file.seek(SeekFrom::End(0));
415 
416             download_inner(task.clone()).await.unwrap();
417             let file = File::open(file_path).unwrap();
418             assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
419         });
420     }
421 
422     #[test]
ut_download_not_support_range_resume()423     fn ut_download_not_support_range_resume() {
424         init();
425         let file_path = "test_files/ut_download_not_support_range_resume.txt";
426 
427         let mut file = File::create(file_path).unwrap();
428         file.write(&[0; 1000]).unwrap();
429 
430         let config = ConfigBuilder::new()
431         .action(Action::Download)
432         .mode(Mode::BackGround)
433         .file_spec(file)
434         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
435         .redirect(true)
436         .begins(5000)
437         .build();
438         let task = build_task(config);
439         ylong_runtime::block_on(async {
440             let res = download_inner(task).await.unwrap_err();
441             assert_eq!(res, TaskError::Failed(Reason::UnsupportedRangeRequest));
442             let file = File::open(file_path).unwrap();
443             assert_eq!(1000, file.metadata().unwrap().len());
444         });
445     }
446 
447     #[test]
ut_download_range_0()448     fn ut_download_range_0() {
449         init();
450         let file_path = "test_files/ut_download_range_0.txt";
451         let file = File::create(file_path).unwrap();
452         let config = ConfigBuilder::new()
453         .action(Action::Download)
454         .mode(Mode::BackGround)
455         .file_spec(file)
456         .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
457         .redirect(true)
458         .begins(5000)
459         .ends(10000)
460         .build();
461         let task = build_task(config);
462         ylong_runtime::block_on(async {
463             download_inner(task).await.unwrap();
464             let file = File::open(file_path).unwrap();
465             assert_eq!(5001, file.metadata().unwrap().len());
466         });
467     }
468 
469     #[test]
ut_download_range_1()470     fn ut_download_range_1() {
471         init();
472         let file_path = "test_files/ut_download_range_1.txt";
473 
474         let file = File::create(file_path).unwrap();
475         let config = ConfigBuilder::new()
476         .action(Action::Download)
477         .mode(Mode::BackGround)
478         .file_spec(file)
479         .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
480         .redirect(true)
481         .begins(273619168)
482         .build();
483         let task = build_task(config);
484         ylong_runtime::block_on(async {
485             download_inner(task).await.unwrap();
486             let file = File::open(file_path).unwrap();
487             assert_eq!(FS_FILE_LEN - 273619168, file.metadata().unwrap().len());
488         });
489     }
490 
491     #[test]
ut_download_range_resume_0()492     fn ut_download_range_resume_0() {
493         init();
494         let file_path = "test_files/ut_download_range_resume_0.txt";
495 
496         let mut file = File::create(file_path).unwrap();
497         file.write(&[0; FS_FILE_LEN as usize - 10000]).unwrap();
498         let config = ConfigBuilder::new()
499         .action(Action::Download)
500         .mode(Mode::BackGround)
501         .file_spec(file)
502         .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
503         .redirect(true)
504         .build();
505         let task = build_task(config);
506         ylong_runtime::block_on(async {
507             download_inner(task).await.unwrap();
508             let file = File::open(file_path).unwrap();
509             assert_eq!(FS_FILE_LEN, file.metadata().unwrap().len());
510         });
511     }
512 
513     #[test]
ut_download_range_resume_1()514     fn ut_download_range_resume_1() {
515         init();
516         let file_path = "test_files/ut_download_range_resume_1.txt";
517 
518         let file = File::create(file_path).unwrap();
519         file.set_len(FS_FILE_LEN - 10000).unwrap();
520         let config = ConfigBuilder::new()
521         .action(Action::Download)
522         .mode(Mode::BackGround)
523         .file_spec(file)
524         .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe")
525         .redirect(true)
526         .build();
527         let task = build_task(config);
528         ylong_runtime::block_on(async {
529             let clone_task = task.clone();
530             ylong_runtime::spawn(async move {
531                 ylong_runtime::time::sleep(time::Duration::from_secs(2)).await;
532                 clone_task.status.lock().unwrap().state = State::Waiting;
533             });
534             let ret = download_inner(task.clone()).await.unwrap_err();
535             assert_eq!(ret, TaskError::Waiting(TaskPhase::UserAbort));
536             let file = File::open(file_path).unwrap();
537             assert!(file.metadata().unwrap().len() < FS_FILE_LEN - 20000);
538             download_inner(task.clone()).await.unwrap();
539             assert_eq!(file.metadata().unwrap().len(), FS_FILE_LEN);
540         });
541     }
542 
543     #[test]
ut_download_invalid_task()544     fn ut_download_invalid_task() {
545         init();
546         let file_path = "test_files/ut_download_basic.txt";
547 
548         let file = File::create(file_path).unwrap();
549         let config = ConfigBuilder::new()
550         .action(Action::Download)
551         .mode(Mode::BackGround)
552         .file_spec(file)
553         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
554         .redirect(true)
555         .build();
556 
557         let task = build_task(config);
558         {
559             let mut progress = task.progress.lock().unwrap();
560             progress.sizes = vec![0];
561             progress.processed = vec![];
562             progress.common_data.index = 23;
563             progress.common_data.state = State::Failed.repr;
564             progress.common_data.total_processed = 321223;
565         }
566         ylong_runtime::block_on(async {
567             download_inner(task.clone()).await.unwrap();
568             let file = File::open(file_path).unwrap();
569             assert_eq!(GITEE_FILE_LEN, file.metadata().unwrap().len());
570 
571             assert_eq!(State::Completed, task.status.lock().unwrap().state);
572             assert_eq!(0, task.progress.lock().unwrap().common_data.index);
573             assert_eq!(
574                 GITEE_FILE_LEN,
575                 task.progress.lock().unwrap().common_data.total_processed as u64
576             );
577             assert_eq!(
578                 GITEE_FILE_LEN,
579                 task.progress.lock().unwrap().processed[0] as u64
580             );
581             assert_eq!(
582                 GITEE_FILE_LEN,
583                 task.progress.lock().unwrap().sizes[0] as u64
584             );
585         });
586     }
587 
588     /// For xts SUB_REQUEST_CROSSPLATFORM_DOWNDLOAD_API_TASKINFO_0002,
589     /// downloadTotalBytes to be -1
590     #[test]
ut_download_sizes()591     fn ut_download_sizes() {
592         init();
593         let file_path = "test_files/ut_download_basic.txt";
594 
595         let file = File::create(file_path).unwrap();
596         let config = ConfigBuilder::new()
597         .action(Action::Download)
598         .mode(Mode::BackGround)
599         .file_spec(file)
600         .url("https://gitee.com/chenzhixue/downloadTest/releases/download/v1.0/test_not_exists.apk")
601         .redirect(true)
602         .build();
603 
604         let task = build_task(config);
605         {
606             let mut progress = task.progress.lock().unwrap();
607             progress.sizes = vec![0, 1, 2, 3];
608             progress.processed = vec![];
609             progress.common_data.index = 23;
610             progress.common_data.state = State::Failed.repr;
611             progress.common_data.total_processed = 321223;
612         }
613         ylong_runtime::block_on(async {
614             let err = download_inner(task.clone()).await.unwrap_err();
615             assert_eq!(err, TaskError::Failed(Reason::ProtocolError));
616             let sizes = task.progress.lock().unwrap().sizes.clone();
617             assert_eq!(sizes, vec![-1]);
618         });
619     }
620 }
621