1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::fmt::Error;
15 use std::mem;
16 use std::mem::MaybeUninit;
17 use std::ptr::{addr_of_mut, NonNull};
18 use std::sync::atomic::AtomicBool;
19 use std::sync::atomic::Ordering::Relaxed;
20 use std::task::Waker;
21 use std::time::Duration;
22
23 use crate::util::linked_list::{Link, LinkedList, Node};
24
25 // In a slots, the number of slot.
26 const SLOTS_NUM: usize = 64;
27
28 // In a levels, the number of level.
29 const LEVELS_NUM: usize = 6;
30
31 // Maximum sleep duration.
32 pub(crate) const MAX_DURATION: u64 = (1 << (6 * LEVELS_NUM)) - 1;
33
34 // Struct for timing and waking up corresponding tasks on the timing wheel.
35 pub(crate) struct Clock {
36 // Expected expiration time.
37 expiration: u64,
38
39 // The level to which the clock will be inserted.
40 level: usize,
41
42 // Elapsed time duration.
43 duration: u64,
44
45 // The result obtained when the corresponding Sleep structure is woken up by
46 // which can be used to determine if the Future is completed correctly.
47 result: AtomicBool,
48
49 // Corresponding waker,
50 // which is used to wake up sleep coroutine.
51 waker: Option<Waker>,
52
53 // Linked_list node.
54 node: Node<Clock>,
55 }
56
57 impl Clock {
58 // Creates a default Clock structure.
new() -> Self59 pub(crate) fn new() -> Self {
60 Self {
61 expiration: 0,
62 level: 0,
63 duration: 0,
64 result: AtomicBool::new(false),
65 waker: None,
66 node: Node::new(),
67 }
68 }
69
70 // Returns the expected expiration time.
expiration(&self) -> u6471 pub(crate) fn expiration(&self) -> u64 {
72 self.expiration
73 }
74
75 // Sets the expected expiration time
set_expiration(&mut self, expiration: u64)76 pub(crate) fn set_expiration(&mut self, expiration: u64) {
77 self.expiration = expiration;
78 }
79
80 // Returns the level to which the clock will be inserted.
level(&self) -> usize81 pub(crate) fn level(&self) -> usize {
82 self.level
83 }
84
85 // Sets the level to which the clock will be inserted.
set_level(&mut self, level: usize)86 pub(crate) fn set_level(&mut self, level: usize) {
87 self.level = level;
88 }
89
duration(&self) -> u6490 pub(crate) fn duration(&self) -> u64 {
91 self.duration
92 }
93
set_duration(&mut self, duration: u64)94 pub(crate) fn set_duration(&mut self, duration: u64) {
95 self.duration = duration;
96 }
97
98 // Returns the corresponding waker.
take_waker(&mut self) -> Option<Waker>99 pub(crate) fn take_waker(&mut self) -> Option<Waker> {
100 self.waker.take()
101 }
102
103 // Sets the corresponding waker.
set_waker(&mut self, waker: Waker)104 pub(crate) fn set_waker(&mut self, waker: Waker) {
105 self.waker = Some(waker);
106 }
107
108 // Returns the result.
result(&self) -> bool109 pub(crate) fn result(&self) -> bool {
110 self.result.load(Relaxed)
111 }
112
113 // Sets the result.
set_result(&mut self, result: bool)114 pub(crate) fn set_result(&mut self, result: bool) {
115 self.result.store(result, Relaxed);
116 }
117 }
118
119 impl Default for Clock {
default() -> Self120 fn default() -> Self {
121 Clock::new()
122 }
123 }
124
125 unsafe impl Link for Clock {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,126 unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
127 where
128 Self: Sized,
129 {
130 let node_ptr = addr_of_mut!(ptr.as_mut().node);
131 NonNull::new_unchecked(node_ptr)
132 }
133 }
134
135 pub(crate) enum TimeOut {
136 ClockEntry(NonNull<Clock>),
137 Duration(Duration),
138 None,
139 }
140
141 pub(crate) struct Expiration {
142 level: usize,
143 slot: usize,
144 deadline: u64,
145 }
146
147 pub(crate) struct Wheel {
148 // Since the wheel started,
149 // the number of milliseconds elapsed.
150 elapsed: u64,
151
152 // The time wheel levels are similar to a multi-layered dial.
153 //
154 // levels:
155 //
156 // 1 ms slots == 64 ms range
157 // 64 ms slots ~= 4 sec range
158 // 4 sec slots ~= 4 min range
159 // 4 min slots ~= 4 hr range
160 // 4 hr slots ~= 12 day range
161 // 12 day slots ~= 2 yr range
162 levels: Vec<Level>,
163
164 // These corresponding timers have expired,
165 // and are ready to be triggered.
166 trigger: LinkedList<Clock>,
167 }
168
169 impl Wheel {
170 // Creates a new timing wheel.
new() -> Self171 pub(crate) fn new() -> Self {
172 let levels = (0..LEVELS_NUM).map(Level::new).collect();
173
174 Self {
175 elapsed: 0,
176 levels,
177 trigger: Default::default(),
178 }
179 }
180
181 // Return the elapsed.
elapsed(&self) -> u64182 pub(crate) fn elapsed(&self) -> u64 {
183 self.elapsed
184 }
185
186 // Set the elapsed.
set_elapsed(&mut self, elapsed: u64)187 pub(crate) fn set_elapsed(&mut self, elapsed: u64) {
188 self.elapsed = elapsed;
189 }
190
191 // Compare the timing wheel elapsed with the expiration,
192 // from which to decide which level to insert.
find_level(expiration: u64, elapsed: u64) -> usize193 pub(crate) fn find_level(expiration: u64, elapsed: u64) -> usize {
194 // 0011 1111
195 const SLOT_MASK: u64 = (1 << 6) - 1;
196
197 // Use the time difference value to find at which level.
198 // Use XOR to determine the insertion of inspiration currently need to be
199 // inserted into the level, using binary operations to determine which bit has
200 // changed, and further determine which level. If don't use XOR, it will lead to
201 // the re-insertion of the level of the calculation error, and insert to an
202 // incorrect level.
203 let mut masked = (expiration ^ elapsed) | SLOT_MASK;
204 // 1111 1111 1111 1111 1111 1111 1111 1111 1111
205 if masked >= MAX_DURATION {
206 masked = MAX_DURATION - 1;
207 }
208
209 let leading_zeros = masked.leading_zeros() as usize;
210 // Calculate how many valid bits there are.
211 let significant = 63 - leading_zeros;
212
213 // One level per 6 bit,
214 // one slots has 2^6 slots.
215 significant / 6
216 }
217
218 // Insert the corresponding TimerHandle into the specified position in the
219 // timing wheel.
insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error>220 pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error> {
221 let expiration = unsafe { clock_entry.as_ref().expiration() };
222
223 if expiration <= self.elapsed() {
224 // This means that the timeout period has passed,
225 // and the time should be triggered immediately.
226 return Err(Error);
227 }
228
229 let level = Self::find_level(expiration, self.elapsed());
230 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
231 // `Sleep` here does not go into `Ready`.
232 unsafe { clock_entry.as_mut().set_level(level) };
233 self.levels[level].insert(clock_entry);
234 Ok(expiration)
235 }
236
cancel(&mut self, clock_entry: NonNull<Clock>)237 pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
238 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
239 // `Sleep` here does not go into `Ready`.
240 let level = unsafe { clock_entry.as_ref().level() };
241 self.levels[level].cancel(clock_entry);
242 }
243
244 // Return where the next expiration is located, and its deadline.
next_expiration(&self) -> Option<Expiration>245 pub(crate) fn next_expiration(&self) -> Option<Expiration> {
246 for level in 0..LEVELS_NUM {
247 if let Some(expiration) = self.levels[level].next_expiration(self.elapsed()) {
248 return Some(expiration);
249 }
250 }
251
252 None
253 }
254
255 // Retrieve the corresponding expired TimerHandle.
process_expiration(&mut self, expiration: &Expiration)256 pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
257 let mut handles = self.levels[expiration.level].take_slot(expiration.slot);
258 while let Some(mut item) = handles.pop_back() {
259 let expected_expiration = unsafe { item.as_ref().expiration() };
260 if expected_expiration > expiration.deadline {
261 let level = Self::find_level(expected_expiration, expiration.deadline);
262
263 unsafe { item.as_mut().set_level(level) };
264
265 self.levels[level].insert(item);
266 } else {
267 self.trigger.push_front(item);
268 }
269 }
270 }
271
272 // Determine which timers have timed out at the current time.
poll(&mut self, now: u64) -> TimeOut273 pub(crate) fn poll(&mut self, now: u64) -> TimeOut {
274 loop {
275 if let Some(handle) = self.trigger.pop_back() {
276 return TimeOut::ClockEntry(handle);
277 }
278
279 let expiration = self.next_expiration();
280
281 match expiration {
282 Some(ref expiration) if expiration.deadline > now => {
283 return TimeOut::Duration(Duration::from_millis(expiration.deadline - now))
284 }
285 Some(ref expiration) => {
286 self.process_expiration(expiration);
287 self.set_elapsed(expiration.deadline);
288 }
289 None => {
290 self.set_elapsed(now);
291 break;
292 }
293 }
294 }
295
296 match self.trigger.pop_back() {
297 None => TimeOut::None,
298 Some(handle) => TimeOut::ClockEntry(handle),
299 }
300 }
301 }
302
303 // Level in the wheel.
304 // All level contains 64 slots.
305 pub struct Level {
306 // current level
307 level: usize,
308
309 // Determine which slot contains entries based on occupied bit.
310 occupied: u64,
311
312 // slots in a level.
313 slots: [LinkedList<Clock>; SLOTS_NUM],
314 }
315
316 impl Level {
317 // Specify the level and create a Level structure.
new(level: usize) -> Self318 pub(crate) fn new(level: usize) -> Self {
319 let mut slots: [MaybeUninit<LinkedList<Clock>>; SLOTS_NUM] =
320 unsafe { MaybeUninit::uninit().assume_init() };
321
322 for slot in slots.iter_mut() {
323 *slot = MaybeUninit::new(Default::default());
324 }
325
326 unsafe {
327 let slots = mem::transmute::<_, [LinkedList<Clock>; SLOTS_NUM]>(slots);
328 Self {
329 level,
330 occupied: 0,
331 slots,
332 }
333 }
334 }
335
336 // Based on the elapsed which the current time wheel is running,
337 // and the expected expiration time of the clock_entry,
338 // find the corresponding slot and insert it.
insert(&mut self, mut clock_entry: NonNull<Clock>)339 pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) {
340 // This duration represents how long it takes for the current slot to complete,
341 // at least 0.
342 let duration = unsafe { clock_entry.as_ref().expiration() };
343
344 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
345 // `Sleep` here does not go into `Ready`.
346 unsafe { clock_entry.as_mut().set_duration(duration) };
347
348 let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
349 self.slots[slot].push_front(clock_entry);
350
351 self.occupied |= 1 << slot;
352 }
353
cancel(&mut self, clock_entry: NonNull<Clock>)354 pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
355 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
356 // `Sleep` here does not go into `Ready`.
357 let duration = unsafe { clock_entry.as_ref().duration() };
358
359 let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
360
361 // Caller has unique access to the linked list.
362 // The clock entry is guaranteed to be inside the wheel, so we need to unset the
363 // occupied bit.
364 unsafe {
365 self.slots[slot].remove(clock_entry);
366 }
367
368 if self.slots[slot].is_empty() {
369 // Unset the bit
370 self.occupied &= !(1 << slot);
371 }
372 }
373
374 // Return where the next expiration is located, and its deadline.
next_expiration(&self, now: u64) -> Option<Expiration>375 pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
376 let slot = self.next_occupied_slot(now)?;
377
378 let deadline = Self::calculate_deadline(slot, self.level, now);
379
380 Some(Expiration {
381 level: self.level,
382 slot,
383 deadline,
384 })
385 }
386
calculate_deadline(slot: usize, level: usize, now: u64) -> u64387 fn calculate_deadline(slot: usize, level: usize, now: u64) -> u64 {
388 let slot_range = slot_range(level);
389 let level_range = slot_range * SLOTS_NUM as u64;
390 let level_start = now & !(level_range - 1);
391 // Add the time of the last slot at this level to represent a time period.
392 let mut deadline = level_start + slot as u64 * slot_range;
393
394 if deadline <= now {
395 // This only happened when Duration > MAX_DURATION
396 deadline += level_range;
397 }
398
399 deadline
400 }
401
402 // Find the next slot that needs to be executed.
next_occupied_slot(&self, now: u64) -> Option<usize>403 pub(crate) fn next_occupied_slot(&self, now: u64) -> Option<usize> {
404 if self.occupied == 0 {
405 return None;
406 }
407
408 let now_slot = now / slot_range(self.level);
409 let occupied = self.occupied.rotate_right(now_slot as u32);
410 let zeros = occupied.trailing_zeros();
411 let slot = (zeros as u64 + now_slot) % SLOTS_NUM as u64;
412
413 Some(slot as usize)
414 }
415
416 // Fetch all timers in a slot of the corresponding level.
take_slot(&mut self, slot: usize) -> LinkedList<Clock>417 pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList<Clock> {
418 self.occupied &= !(1 << slot);
419 mem::take(&mut self.slots[slot])
420 }
421 }
422
423 // All the slots before this level add up to approximately.
slot_range(level: usize) -> u64424 fn slot_range(level: usize) -> u64 {
425 SLOTS_NUM.pow(level as u32) as u64
426 }
427
428 #[cfg(test)]
429 mod test {
430 use crate::time::wheel::{Level, Wheel, LEVELS_NUM};
431 cfg_net!(
432 #[cfg(feature = "ffrt")]
433 use crate::time::TimeDriver;
434 use crate::time::{sleep, timeout};
435 use crate::net::UdpSocket;
436 use crate::task::JoinHandle;
437 use std::net::SocketAddr;
438 use std::time::Duration;
439 );
440
441 /// UT test cases for Wheel::new
442 ///
443 /// # Brief
444 /// 1. Use Wheel::new to create a Wheel Struct.
445 /// 2. Verify the data in the Wheel Struct.
446 #[test]
ut_wheel_new_test()447 fn ut_wheel_new_test() {
448 let wheel = Wheel::new();
449 assert_eq!(wheel.elapsed, 0);
450 assert_eq!(wheel.levels.len(), LEVELS_NUM);
451 }
452
453 /// UT test cases for Sleep drop.
454 ///
455 /// # Brief
456 /// 1. Use timeout to create a Timeout Struct.
457 /// 2. Enable the Sleep Struct corresponding to the Timeout Struct to enter
458 /// the Pending state.
459 /// 3. Verify the change of the internal TimerHandle during Sleep Struct
460 /// drop.
461 #[test]
462 #[cfg(feature = "net")]
ut_sleep_drop()463 fn ut_sleep_drop() {
464 const ADDR: &str = "127.0.0.1:0";
465
466 async fn udp_sender(rx: crate::sync::oneshot::Receiver<SocketAddr>) {
467 let sender = UdpSocket::bind(ADDR).await.unwrap();
468 let buf = [2; 10];
469 sleep(Duration::from_secs(1)).await;
470 let receiver_addr = rx.await.unwrap();
471 sender.send_to(buf.as_slice(), receiver_addr).await.unwrap();
472 }
473
474 async fn udp_receiver(tx: crate::sync::oneshot::Sender<SocketAddr>) {
475 let receiver = UdpSocket::bind(ADDR).await.unwrap();
476 let addr = receiver.local_addr().unwrap();
477 tx.send(addr).unwrap();
478 let mut buf = [0; 10];
479 assert!(
480 timeout(Duration::from_secs(2), receiver.recv_from(&mut buf[..]))
481 .await
482 .is_ok()
483 );
484 }
485
486 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
487 let (tx, rx) = crate::sync::oneshot::channel();
488 tasks.push(crate::spawn(udp_sender(rx)));
489 tasks.push(crate::spawn(udp_receiver(tx)));
490 for t in tasks {
491 let _ = crate::block_on(t);
492 }
493 #[cfg(feature = "ffrt")]
494 let lock = TimeDriver::get_ref().wheel.lock().unwrap();
495 #[cfg(feature = "ffrt")]
496 for slot in lock.levels[1].slots.iter() {
497 assert!(slot.is_empty());
498 }
499 }
500
501 /// UT test cases for Level::calculate_deadline
502 ///
503 /// # Brief
504 /// 1. Use Level::calculate_deadline() to calculate Level.
505 /// 2. Verify the deadline is right.
506 #[test]
ut_wheel_calculate_deadline()507 fn ut_wheel_calculate_deadline() {
508 let deadline = Level::calculate_deadline(36, 0, 95);
509 assert_eq!(deadline, 100);
510 let deadline = Level::calculate_deadline(1, 1, 63);
511 assert_eq!(deadline, 64);
512 let deadline = Level::calculate_deadline(37, 0, 79);
513 assert_eq!(deadline, 101);
514 let deadline = Level::calculate_deadline(31, 1, 960);
515 assert_eq!(deadline, 1984);
516 let deadline = Level::calculate_deadline(61, 1, 7001);
517 assert_eq!(deadline, 8000);
518 let deadline = Level::calculate_deadline(2, 2, 8001);
519 assert_eq!(deadline, 8192);
520 let deadline = Level::calculate_deadline(12, 1, 8192);
521 assert_eq!(deadline, 8960);
522 let deadline = Level::calculate_deadline(40, 0, 8960);
523 assert_eq!(deadline, 9000);
524 }
525 }
526