1 // Copyright (C) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::future::Future; 15 use std::pin::Pin; 16 use std::sync::atomic::{AtomicBool, Ordering}; 17 use std::sync::Arc; 18 use std::task::{Context, Poll}; 19 use std::time::Duration; 20 21 use ylong_http_client::HttpClientError; 22 use ylong_runtime::io::AsyncWrite; 23 use ylong_runtime::time::{sleep, Sleep}; 24 25 use crate::manage::notifier::Notifier; 26 use crate::task::config::Version; 27 use crate::task::request_task::RequestTask; 28 use crate::utils::get_current_timestamp; 29 30 const SPEED_LIMIT_INTERVAL: u64 = 1000; 31 const FRONT_NOTIFY_INTERVAL: u64 = 1000; 32 const BACKGROUND_NOTIFY_INTERVAL: u64 = 3000; 33 34 pub(crate) struct TaskOperator { 35 pub(crate) sleep: Option<Pin<Box<Sleep>>>, 36 pub(crate) task: Arc<RequestTask>, 37 pub(crate) last_time: u64, 38 pub(crate) last_size: u64, 39 pub(crate) more_sleep_time: u64, 40 pub(crate) abort_flag: Arc<AtomicBool>, 41 } 42 43 impl TaskOperator { new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self44 pub(crate) fn new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self { 45 Self { 46 sleep: None, 47 task, 48 last_time: 0, 49 last_size: 0, 50 more_sleep_time: 0, 51 abort_flag, 52 } 53 } 54 poll_progress_common( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), HttpClientError>>55 pub(crate) fn poll_progress_common( 56 &mut self, 57 cx: &mut Context<'_>, 58 ) -> Poll<Result<(), HttpClientError>> { 59 if self.abort_flag.load(Ordering::Acquire) { 60 return Poll::Ready(Err(HttpClientError::user_aborted())); 61 } 62 let current = get_current_timestamp(); 63 64 if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL { 65 let notify_data = self.task.build_notify_data(); 66 self.task.last_notify.store(current, Ordering::SeqCst); 67 Notifier::progress(&self.task.client_manager, notify_data); 68 } 69 70 let gauge = self.task.conf.common_data.gauge; 71 72 if self.task.conf.version == Version::API9 || gauge { 73 let last_background_notify_time = 74 self.task.background_notify_time.load(Ordering::SeqCst); 75 if get_current_timestamp() - last_background_notify_time >= BACKGROUND_NOTIFY_INTERVAL { 76 self.task.background_notify(); 77 } 78 } 79 80 let total_processed = self 81 .task 82 .progress 83 .lock() 84 .unwrap() 85 .common_data 86 .total_processed as u64; 87 88 self.sleep = None; 89 let speed_limit = self.task.rate_limiting.load(Ordering::SeqCst); 90 if speed_limit != 0 { 91 if self.more_sleep_time != 0 { 92 // wake up for notify, sleep until speed limit conditions are met 93 self.sleep = Some(Box::pin(sleep(Duration::from_millis(self.more_sleep_time)))); 94 self.more_sleep_time = 0; 95 } else if self.last_time == 0 { 96 // get the init time and size, for speed caculate 97 self.last_time = current; 98 self.last_size = total_processed; 99 } else if current - self.last_time < SPEED_LIMIT_INTERVAL 100 && ((total_processed - self.last_size) > speed_limit * SPEED_LIMIT_INTERVAL) 101 { 102 // sleep until notification is required or speed limit conditions are met 103 let limit_time = 104 (total_processed - self.last_size) / speed_limit - (current - self.last_time); 105 let notify_time = FRONT_NOTIFY_INTERVAL 106 - (current - self.task.last_notify.load(Ordering::SeqCst)); 107 let sleep_time = if limit_time > notify_time { 108 self.more_sleep_time = limit_time - notify_time; 109 notify_time 110 } else { 111 limit_time 112 }; 113 self.sleep = Some(Box::pin(sleep(Duration::from_millis(sleep_time)))); 114 } else if current - self.last_time >= SPEED_LIMIT_INTERVAL { 115 // last caculate window has meet speed limit, update last_time and last_size, 116 // for next poll's speed compare 117 self.last_time = current; 118 self.last_size = total_processed; 119 } 120 } 121 122 if self.sleep.is_some() { 123 match Pin::new(self.sleep.as_mut().unwrap()).poll(cx) { 124 Poll::Ready(_) => return Poll::Ready(Ok(())), 125 Poll::Pending => return Poll::Pending, 126 } 127 } 128 Poll::Ready(Ok(())) 129 } 130 poll_write_file( &self, cx: &mut Context<'_>, data: &[u8], skip_size: usize, ) -> Poll<Result<usize, HttpClientError>>131 pub(crate) fn poll_write_file( 132 &self, 133 cx: &mut Context<'_>, 134 data: &[u8], 135 skip_size: usize, 136 ) -> Poll<Result<usize, HttpClientError>> { 137 let file = self.task.files.get_mut(0).unwrap(); 138 let mut progress_guard = self.task.progress.lock().unwrap(); 139 match Pin::new(file).poll_write(cx, data) { 140 Poll::Ready(Ok(size)) => { 141 progress_guard.processed[0] += size; 142 progress_guard.common_data.total_processed += size; 143 Poll::Ready(Ok(size + skip_size)) 144 } 145 Poll::Pending => Poll::Pending, 146 Poll::Ready(Err(e)) => Poll::Ready(Err(HttpClientError::other(e))), 147 } 148 } 149 } 150