1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::pin::Pin;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18 use std::task::{Context, Poll};
19 use std::time::Duration;
20 
21 use ylong_http_client::HttpClientError;
22 use ylong_runtime::io::AsyncWrite;
23 use ylong_runtime::time::{sleep, Sleep};
24 
25 use crate::manage::notifier::Notifier;
26 use crate::task::config::Version;
27 use crate::task::request_task::RequestTask;
28 use crate::utils::get_current_timestamp;
29 
30 const SPEED_LIMIT_INTERVAL: u64 = 1000;
31 const FRONT_NOTIFY_INTERVAL: u64 = 1000;
32 const BACKGROUND_NOTIFY_INTERVAL: u64 = 3000;
33 
34 pub(crate) struct TaskOperator {
35     pub(crate) sleep: Option<Pin<Box<Sleep>>>,
36     pub(crate) task: Arc<RequestTask>,
37     pub(crate) last_time: u64,
38     pub(crate) last_size: u64,
39     pub(crate) more_sleep_time: u64,
40     pub(crate) abort_flag: Arc<AtomicBool>,
41 }
42 
43 impl TaskOperator {
new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self44     pub(crate) fn new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self {
45         Self {
46             sleep: None,
47             task,
48             last_time: 0,
49             last_size: 0,
50             more_sleep_time: 0,
51             abort_flag,
52         }
53     }
54 
poll_progress_common( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), HttpClientError>>55     pub(crate) fn poll_progress_common(
56         &mut self,
57         cx: &mut Context<'_>,
58     ) -> Poll<Result<(), HttpClientError>> {
59         if self.abort_flag.load(Ordering::Acquire) {
60             return Poll::Ready(Err(HttpClientError::user_aborted()));
61         }
62         let current = get_current_timestamp();
63 
64         if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL {
65             let notify_data = self.task.build_notify_data();
66             self.task.last_notify.store(current, Ordering::SeqCst);
67             Notifier::progress(&self.task.client_manager, notify_data);
68         }
69 
70         let gauge = self.task.conf.common_data.gauge;
71 
72         if self.task.conf.version == Version::API9 || gauge {
73             let last_background_notify_time =
74                 self.task.background_notify_time.load(Ordering::SeqCst);
75             if get_current_timestamp() - last_background_notify_time >= BACKGROUND_NOTIFY_INTERVAL {
76                 self.task.background_notify();
77             }
78         }
79 
80         let total_processed = self
81             .task
82             .progress
83             .lock()
84             .unwrap()
85             .common_data
86             .total_processed as u64;
87 
88         self.sleep = None;
89         let speed_limit = self.task.rate_limiting.load(Ordering::SeqCst);
90         if speed_limit != 0 {
91             if self.more_sleep_time != 0 {
92                 // wake up for notify, sleep until speed limit conditions are met
93                 self.sleep = Some(Box::pin(sleep(Duration::from_millis(self.more_sleep_time))));
94                 self.more_sleep_time = 0;
95             } else if self.last_time == 0 {
96                 // get the init time and size, for speed caculate
97                 self.last_time = current;
98                 self.last_size = total_processed;
99             } else if current - self.last_time < SPEED_LIMIT_INTERVAL
100                 && ((total_processed - self.last_size) > speed_limit * SPEED_LIMIT_INTERVAL)
101             {
102                 // sleep until notification is required or speed limit conditions are met
103                 let limit_time =
104                     (total_processed - self.last_size) / speed_limit - (current - self.last_time);
105                 let notify_time = FRONT_NOTIFY_INTERVAL
106                     - (current - self.task.last_notify.load(Ordering::SeqCst));
107                 let sleep_time = if limit_time > notify_time {
108                     self.more_sleep_time = limit_time - notify_time;
109                     notify_time
110                 } else {
111                     limit_time
112                 };
113                 self.sleep = Some(Box::pin(sleep(Duration::from_millis(sleep_time))));
114             } else if current - self.last_time >= SPEED_LIMIT_INTERVAL {
115                 // last caculate window has meet speed limit, update last_time and last_size,
116                 // for next poll's speed compare
117                 self.last_time = current;
118                 self.last_size = total_processed;
119             }
120         }
121 
122         if self.sleep.is_some() {
123             match Pin::new(self.sleep.as_mut().unwrap()).poll(cx) {
124                 Poll::Ready(_) => return Poll::Ready(Ok(())),
125                 Poll::Pending => return Poll::Pending,
126             }
127         }
128         Poll::Ready(Ok(()))
129     }
130 
poll_write_file( &self, cx: &mut Context<'_>, data: &[u8], skip_size: usize, ) -> Poll<Result<usize, HttpClientError>>131     pub(crate) fn poll_write_file(
132         &self,
133         cx: &mut Context<'_>,
134         data: &[u8],
135         skip_size: usize,
136     ) -> Poll<Result<usize, HttpClientError>> {
137         let file = self.task.files.get_mut(0).unwrap();
138         let mut progress_guard = self.task.progress.lock().unwrap();
139         match Pin::new(file).poll_write(cx, data) {
140             Poll::Ready(Ok(size)) => {
141                 progress_guard.processed[0] += size;
142                 progress_guard.common_data.total_processed += size;
143                 Poll::Ready(Ok(size + skip_size))
144             }
145             Poll::Pending => Poll::Pending,
146             Poll::Ready(Err(e)) => Poll::Ready(Err(HttpClientError::other(e))),
147         }
148     }
149 }
150