1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::fmt::Error;
15 use std::mem;
16 use std::mem::MaybeUninit;
17 use std::ptr::{addr_of_mut, NonNull};
18 use std::sync::atomic::AtomicBool;
19 use std::sync::atomic::Ordering::Relaxed;
20 use std::task::Waker;
21 use std::time::Duration;
22 
23 use crate::util::linked_list::{Link, LinkedList, Node};
24 
25 // In a slots, the number of slot.
26 const SLOTS_NUM: usize = 64;
27 
28 // In a levels, the number of level.
29 const LEVELS_NUM: usize = 6;
30 
31 // Maximum sleep duration.
32 pub(crate) const MAX_DURATION: u64 = (1 << (6 * LEVELS_NUM)) - 1;
33 
34 // Struct for timing and waking up corresponding tasks on the timing wheel.
35 pub(crate) struct Clock {
36     // Expected expiration time.
37     expiration: u64,
38 
39     // The level to which the clock will be inserted.
40     level: usize,
41 
42     // Elapsed time duration.
43     duration: u64,
44 
45     // The result obtained when the corresponding Sleep structure is woken up by
46     // which can be used to determine if the Future is completed correctly.
47     result: AtomicBool,
48 
49     // Corresponding waker,
50     // which is used to wake up sleep coroutine.
51     waker: Option<Waker>,
52 
53     // Linked_list node.
54     node: Node<Clock>,
55 }
56 
57 impl Clock {
58     // Creates a default Clock structure.
new() -> Self59     pub(crate) fn new() -> Self {
60         Self {
61             expiration: 0,
62             level: 0,
63             duration: 0,
64             result: AtomicBool::new(false),
65             waker: None,
66             node: Node::new(),
67         }
68     }
69 
70     // Returns the expected expiration time.
expiration(&self) -> u6471     pub(crate) fn expiration(&self) -> u64 {
72         self.expiration
73     }
74 
75     // Sets the expected expiration time
set_expiration(&mut self, expiration: u64)76     pub(crate) fn set_expiration(&mut self, expiration: u64) {
77         self.expiration = expiration;
78     }
79 
80     // Returns the level to which the clock will be inserted.
level(&self) -> usize81     pub(crate) fn level(&self) -> usize {
82         self.level
83     }
84 
85     // Sets the level to which the clock will be inserted.
set_level(&mut self, level: usize)86     pub(crate) fn set_level(&mut self, level: usize) {
87         self.level = level;
88     }
89 
duration(&self) -> u6490     pub(crate) fn duration(&self) -> u64 {
91         self.duration
92     }
93 
set_duration(&mut self, duration: u64)94     pub(crate) fn set_duration(&mut self, duration: u64) {
95         self.duration = duration;
96     }
97 
98     // Returns the corresponding waker.
take_waker(&mut self) -> Option<Waker>99     pub(crate) fn take_waker(&mut self) -> Option<Waker> {
100         self.waker.take()
101     }
102 
103     // Sets the corresponding waker.
set_waker(&mut self, waker: Waker)104     pub(crate) fn set_waker(&mut self, waker: Waker) {
105         self.waker = Some(waker);
106     }
107 
108     // Returns the result.
result(&self) -> bool109     pub(crate) fn result(&self) -> bool {
110         self.result.load(Relaxed)
111     }
112 
113     // Sets the result.
set_result(&mut self, result: bool)114     pub(crate) fn set_result(&mut self, result: bool) {
115         self.result.store(result, Relaxed);
116     }
117 }
118 
119 impl Default for Clock {
default() -> Self120     fn default() -> Self {
121         Clock::new()
122     }
123 }
124 
125 unsafe impl Link for Clock {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,126     unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
127     where
128         Self: Sized,
129     {
130         let node_ptr = addr_of_mut!(ptr.as_mut().node);
131         NonNull::new_unchecked(node_ptr)
132     }
133 }
134 
135 pub(crate) enum TimeOut {
136     ClockEntry(NonNull<Clock>),
137     Duration(Duration),
138     None,
139 }
140 
141 pub(crate) struct Expiration {
142     level: usize,
143     slot: usize,
144     deadline: u64,
145 }
146 
147 pub(crate) struct Wheel {
148     // Since the wheel started,
149     // the number of milliseconds elapsed.
150     elapsed: u64,
151 
152     // The time wheel levels are similar to a multi-layered dial.
153     //
154     // levels:
155     //
156     // 1  ms slots == 64 ms range
157     // 64 ms slots ~= 4 sec range
158     // 4 sec slots ~= 4 min range
159     // 4 min slots ~= 4 hr range
160     // 4 hr slots ~= 12 day range
161     // 12 day slots ~= 2 yr range
162     levels: Vec<Level>,
163 
164     // These corresponding timers have expired,
165     // and are ready to be triggered.
166     trigger: LinkedList<Clock>,
167 }
168 
169 impl Wheel {
170     // Creates a new timing wheel.
new() -> Self171     pub(crate) fn new() -> Self {
172         let levels = (0..LEVELS_NUM).map(Level::new).collect();
173 
174         Self {
175             elapsed: 0,
176             levels,
177             trigger: Default::default(),
178         }
179     }
180 
181     // Return the elapsed.
elapsed(&self) -> u64182     pub(crate) fn elapsed(&self) -> u64 {
183         self.elapsed
184     }
185 
186     // Set the elapsed.
set_elapsed(&mut self, elapsed: u64)187     pub(crate) fn set_elapsed(&mut self, elapsed: u64) {
188         self.elapsed = elapsed;
189     }
190 
191     // Compare the timing wheel elapsed with the expiration,
192     // from which to decide which level to insert.
find_level(expiration: u64, elapsed: u64) -> usize193     pub(crate) fn find_level(expiration: u64, elapsed: u64) -> usize {
194         // 0011 1111
195         const SLOT_MASK: u64 = (1 << 6) - 1;
196 
197         // Use the time difference value to find at which level.
198         // Use XOR to determine the insertion of inspiration currently need to be
199         // inserted into the level, using binary operations to determine which bit has
200         // changed, and further determine which level. If don't use XOR, it will lead to
201         // the re-insertion of the level of the calculation error, and insert to an
202         // incorrect level.
203         let mut masked = (expiration ^ elapsed) | SLOT_MASK;
204         // 1111 1111 1111 1111 1111 1111 1111 1111 1111
205         if masked >= MAX_DURATION {
206             masked = MAX_DURATION - 1;
207         }
208 
209         let leading_zeros = masked.leading_zeros() as usize;
210         // Calculate how many valid bits there are.
211         let significant = 63 - leading_zeros;
212 
213         // One level per 6 bit,
214         // one slots has 2^6 slots.
215         significant / 6
216     }
217 
218     // Insert the corresponding TimerHandle into the specified position in the
219     // timing wheel.
insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error>220     pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error> {
221         let expiration = unsafe { clock_entry.as_ref().expiration() };
222 
223         if expiration <= self.elapsed() {
224             // This means that the timeout period has passed,
225             // and the time should be triggered immediately.
226             return Err(Error);
227         }
228 
229         let level = Self::find_level(expiration, self.elapsed());
230         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
231         // `Sleep` here does not go into `Ready`.
232         unsafe { clock_entry.as_mut().set_level(level) };
233         self.levels[level].insert(clock_entry);
234         Ok(expiration)
235     }
236 
cancel(&mut self, clock_entry: NonNull<Clock>)237     pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
238         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
239         // `Sleep` here does not go into `Ready`.
240         let level = unsafe { clock_entry.as_ref().level() };
241         self.levels[level].cancel(clock_entry);
242     }
243 
244     // Return where the next expiration is located, and its deadline.
next_expiration(&self) -> Option<Expiration>245     pub(crate) fn next_expiration(&self) -> Option<Expiration> {
246         for level in 0..LEVELS_NUM {
247             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed()) {
248                 return Some(expiration);
249             }
250         }
251 
252         None
253     }
254 
255     // Retrieve the corresponding expired TimerHandle.
process_expiration(&mut self, expiration: &Expiration)256     pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
257         let mut handles = self.levels[expiration.level].take_slot(expiration.slot);
258         while let Some(mut item) = handles.pop_back() {
259             let expected_expiration = unsafe { item.as_ref().expiration() };
260             if expected_expiration > expiration.deadline {
261                 let level = Self::find_level(expected_expiration, expiration.deadline);
262 
263                 unsafe { item.as_mut().set_level(level) };
264 
265                 self.levels[level].insert(item);
266             } else {
267                 self.trigger.push_front(item);
268             }
269         }
270     }
271 
272     // Determine which timers have timed out at the current time.
poll(&mut self, now: u64) -> TimeOut273     pub(crate) fn poll(&mut self, now: u64) -> TimeOut {
274         loop {
275             if let Some(handle) = self.trigger.pop_back() {
276                 return TimeOut::ClockEntry(handle);
277             }
278 
279             let expiration = self.next_expiration();
280 
281             match expiration {
282                 Some(ref expiration) if expiration.deadline > now => {
283                     return TimeOut::Duration(Duration::from_millis(expiration.deadline - now))
284                 }
285                 Some(ref expiration) => {
286                     self.process_expiration(expiration);
287                     self.set_elapsed(expiration.deadline);
288                 }
289                 None => {
290                     self.set_elapsed(now);
291                     break;
292                 }
293             }
294         }
295 
296         match self.trigger.pop_back() {
297             None => TimeOut::None,
298             Some(handle) => TimeOut::ClockEntry(handle),
299         }
300     }
301 }
302 
303 // Level in the wheel.
304 // All level contains 64 slots.
305 pub struct Level {
306     // current level
307     level: usize,
308 
309     // Determine which slot contains entries based on occupied bit.
310     occupied: u64,
311 
312     // slots in a level.
313     slots: [LinkedList<Clock>; SLOTS_NUM],
314 }
315 
316 impl Level {
317     // Specify the level and create a Level structure.
new(level: usize) -> Self318     pub(crate) fn new(level: usize) -> Self {
319         let mut slots: [MaybeUninit<LinkedList<Clock>>; SLOTS_NUM] =
320             unsafe { MaybeUninit::uninit().assume_init() };
321 
322         for slot in slots.iter_mut() {
323             *slot = MaybeUninit::new(Default::default());
324         }
325 
326         unsafe {
327             let slots = mem::transmute::<_, [LinkedList<Clock>; SLOTS_NUM]>(slots);
328             Self {
329                 level,
330                 occupied: 0,
331                 slots,
332             }
333         }
334     }
335 
336     // Based on the elapsed which the current time wheel is running,
337     // and the expected expiration time of the clock_entry,
338     // find the corresponding slot and insert it.
insert(&mut self, mut clock_entry: NonNull<Clock>)339     pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) {
340         // This duration represents how long it takes for the current slot to complete,
341         // at least 0.
342         let duration = unsafe { clock_entry.as_ref().expiration() };
343 
344         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
345         // `Sleep` here does not go into `Ready`.
346         unsafe { clock_entry.as_mut().set_duration(duration) };
347 
348         let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
349         self.slots[slot].push_front(clock_entry);
350 
351         self.occupied |= 1 << slot;
352     }
353 
cancel(&mut self, clock_entry: NonNull<Clock>)354     pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
355         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
356         // `Sleep` here does not go into `Ready`.
357         let duration = unsafe { clock_entry.as_ref().duration() };
358 
359         let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
360 
361         // Caller has unique access to the linked list.
362         // The clock entry is guaranteed to be inside the wheel, so we need to unset the
363         // occupied bit.
364         unsafe {
365             self.slots[slot].remove(clock_entry);
366         }
367 
368         if self.slots[slot].is_empty() {
369             // Unset the bit
370             self.occupied &= !(1 << slot);
371         }
372     }
373 
374     // Return where the next expiration is located, and its deadline.
next_expiration(&self, now: u64) -> Option<Expiration>375     pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
376         let slot = self.next_occupied_slot(now)?;
377 
378         let deadline = Self::calculate_deadline(slot, self.level, now);
379 
380         Some(Expiration {
381             level: self.level,
382             slot,
383             deadline,
384         })
385     }
386 
calculate_deadline(slot: usize, level: usize, now: u64) -> u64387     fn calculate_deadline(slot: usize, level: usize, now: u64) -> u64 {
388         let slot_range = slot_range(level);
389         let level_range = slot_range * SLOTS_NUM as u64;
390         let level_start = now & !(level_range - 1);
391         // Add the time of the last slot at this level to represent a time period.
392         let mut deadline = level_start + slot as u64 * slot_range;
393 
394         if deadline <= now {
395             // This only happened when Duration > MAX_DURATION
396             deadline += level_range;
397         }
398 
399         deadline
400     }
401 
402     // Find the next slot that needs to be executed.
next_occupied_slot(&self, now: u64) -> Option<usize>403     pub(crate) fn next_occupied_slot(&self, now: u64) -> Option<usize> {
404         if self.occupied == 0 {
405             return None;
406         }
407 
408         let now_slot = now / slot_range(self.level);
409         let occupied = self.occupied.rotate_right(now_slot as u32);
410         let zeros = occupied.trailing_zeros();
411         let slot = (zeros as u64 + now_slot) % SLOTS_NUM as u64;
412 
413         Some(slot as usize)
414     }
415 
416     // Fetch all timers in a slot of the corresponding level.
take_slot(&mut self, slot: usize) -> LinkedList<Clock>417     pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList<Clock> {
418         self.occupied &= !(1 << slot);
419         mem::take(&mut self.slots[slot])
420     }
421 }
422 
423 // All the slots before this level add up to approximately.
slot_range(level: usize) -> u64424 fn slot_range(level: usize) -> u64 {
425     SLOTS_NUM.pow(level as u32) as u64
426 }
427 
428 #[cfg(test)]
429 mod test {
430     use crate::time::wheel::{Level, Wheel, LEVELS_NUM};
431     cfg_net!(
432         #[cfg(feature = "ffrt")]
433         use crate::time::TimeDriver;
434         use crate::time::{sleep, timeout};
435         use crate::net::UdpSocket;
436         use crate::task::JoinHandle;
437         use std::net::SocketAddr;
438         use std::time::Duration;
439     );
440 
441     /// UT test cases for Wheel::new
442     ///
443     /// # Brief
444     /// 1. Use Wheel::new to create a Wheel Struct.
445     /// 2. Verify the data in the Wheel Struct.
446     #[test]
ut_wheel_new_test()447     fn ut_wheel_new_test() {
448         let wheel = Wheel::new();
449         assert_eq!(wheel.elapsed, 0);
450         assert_eq!(wheel.levels.len(), LEVELS_NUM);
451     }
452 
453     /// UT test cases for Sleep drop.
454     ///
455     /// # Brief
456     /// 1. Use timeout to create a Timeout Struct.
457     /// 2. Enable the Sleep Struct corresponding to the Timeout Struct to enter
458     ///    the Pending state.
459     /// 3. Verify the change of the internal TimerHandle during Sleep Struct
460     ///    drop.
461     #[test]
462     #[cfg(feature = "net")]
ut_sleep_drop()463     fn ut_sleep_drop() {
464         const ADDR: &str = "127.0.0.1:0";
465 
466         async fn udp_sender(rx: crate::sync::oneshot::Receiver<SocketAddr>) {
467             let sender = UdpSocket::bind(ADDR).await.unwrap();
468             let buf = [2; 10];
469             sleep(Duration::from_secs(1)).await;
470             let receiver_addr = rx.await.unwrap();
471             sender.send_to(buf.as_slice(), receiver_addr).await.unwrap();
472         }
473 
474         async fn udp_receiver(tx: crate::sync::oneshot::Sender<SocketAddr>) {
475             let receiver = UdpSocket::bind(ADDR).await.unwrap();
476             let addr = receiver.local_addr().unwrap();
477             tx.send(addr).unwrap();
478             let mut buf = [0; 10];
479             assert!(
480                 timeout(Duration::from_secs(2), receiver.recv_from(&mut buf[..]))
481                     .await
482                     .is_ok()
483             );
484         }
485 
486         let mut tasks: Vec<JoinHandle<()>> = Vec::new();
487         let (tx, rx) = crate::sync::oneshot::channel();
488         tasks.push(crate::spawn(udp_sender(rx)));
489         tasks.push(crate::spawn(udp_receiver(tx)));
490         for t in tasks {
491             let _ = crate::block_on(t);
492         }
493         #[cfg(feature = "ffrt")]
494         let lock = TimeDriver::get_ref().wheel.lock().unwrap();
495         #[cfg(feature = "ffrt")]
496         for slot in lock.levels[1].slots.iter() {
497             assert!(slot.is_empty());
498         }
499     }
500 
501     /// UT test cases for Level::calculate_deadline
502     ///
503     /// # Brief
504     /// 1. Use Level::calculate_deadline() to calculate Level.
505     /// 2. Verify the deadline is right.
506     #[test]
ut_wheel_calculate_deadline()507     fn ut_wheel_calculate_deadline() {
508         let deadline = Level::calculate_deadline(36, 0, 95);
509         assert_eq!(deadline, 100);
510         let deadline = Level::calculate_deadline(1, 1, 63);
511         assert_eq!(deadline, 64);
512         let deadline = Level::calculate_deadline(37, 0, 79);
513         assert_eq!(deadline, 101);
514         let deadline = Level::calculate_deadline(31, 1, 960);
515         assert_eq!(deadline, 1984);
516         let deadline = Level::calculate_deadline(61, 1, 7001);
517         assert_eq!(deadline, 8000);
518         let deadline = Level::calculate_deadline(2, 2, 8001);
519         assert_eq!(deadline, 8192);
520         let deadline = Level::calculate_deadline(12, 1, 8192);
521         assert_eq!(deadline, 8960);
522         let deadline = Level::calculate_deadline(40, 0, 8960);
523         assert_eq!(deadline, 9000);
524     }
525 }
526