1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::UnsafeCell;
15 use std::hint::spin_loop;
16 use std::ops::{Deref, DerefMut};
17 use std::sync::atomic::{AtomicUsize, Ordering};
18 use std::task::Waker;
19 
20 use crate::util::slots::{Slots, SlotsError};
21 
22 /// The first left most bit represents LOCKED state
23 const LOCKED: usize = 1 << 0;
24 /// The third left most bit represents NOTIFIABLE state
25 const NOTIFIABLE: usize = 1 << 1;
26 
27 pub(crate) struct Inner {
28     wake_list: Slots<Waker>,
29 }
30 
31 /// Lists of Wakers
32 pub(crate) struct WakerList {
33     flag: AtomicUsize,
34     inner: UnsafeCell<Inner>,
35 }
36 
37 /// Safety: `WakerList` is  not `Sync` and `Send` because of `UnsafeCell`.
38 /// However, we lock `WakerList` first when we try to access it. So it is safe
39 /// for `WakerList` to be sent and borrowed across threads.
40 unsafe impl Sync for WakerList {}
41 unsafe impl Send for WakerList {}
42 
43 impl WakerList {
44     #[inline]
new() -> WakerList45     pub fn new() -> WakerList {
46         WakerList {
47             flag: AtomicUsize::new(0),
48             inner: UnsafeCell::new(Inner {
49                 wake_list: Slots::new(),
50             }),
51         }
52     }
53 
54     /// Pushes a waker into the list and return its index in the list.
insert(&self, waker: Waker) -> usize55     pub fn insert(&self, waker: Waker) -> usize {
56         let mut list = self.lock();
57 
58         list.wake_list.push_back(waker)
59     }
60 
61     /// Removes the waker corresponding to the key.
remove(&self, key: usize) -> Result<Waker, SlotsError>62     pub fn remove(&self, key: usize) -> Result<Waker, SlotsError> {
63         let mut inner = self.lock();
64         inner.wake_list.remove(key)
65     }
66 
67     /// Wakes up one more member, no matter whether someone is being waking up
68     /// at the same time. This method is an atomic operation. If a
69     /// non-atomic operation is required, call `lock` first and then call
70     /// `notify_one`.
71     #[inline]
notify_one(&self) -> bool72     pub fn notify_one(&self) -> bool {
73         self.notify(Notify::One)
74     }
75 
76     /// Wakes up all members in the WakerList, and return the result.
77     /// This method is an atomic operation. If a non-atomic operation is
78     /// required, call `lock` first and then call `notify_all`.
79     #[inline]
notify_all(&self) -> bool80     pub fn notify_all(&self) -> bool {
81         self.notify(Notify::All)
82     }
83 
notify(&self, notify_type: Notify) -> bool84     fn notify(&self, notify_type: Notify) -> bool {
85         if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
86             let mut inner = self.lock();
87             inner.notify(notify_type)
88         } else {
89             false
90         }
91     }
92 
93     /// Locks up the WakerList. If it has been already locked, spin loop until
94     /// fetch the lock.
lock(&self) -> Lock<'_>95     pub fn lock(&self) -> Lock<'_> {
96         // This condition will be false only if the flag is LOCKED.
97         while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
98             spin_loop();
99         }
100         Lock { waker_set: self }
101     }
102 }
103 
104 impl Inner {
105     /// Wakes up one or more members in the WakerList, and return the result.
106     #[inline]
notify(&mut self, notify_type: Notify) -> bool107     fn notify(&mut self, notify_type: Notify) -> bool {
108         let mut is_wake = false;
109         while let Some(waker) = self.wake_list.pop_front() {
110             waker.wake();
111             is_wake = true;
112 
113             if notify_type == Notify::One {
114                 return is_wake;
115             }
116         }
117         is_wake
118     }
119 
120     /// Wakes up one more member, no matter whether someone is being waking up
121     /// at the same time.
122     #[inline]
notify_one(&mut self) -> bool123     pub fn notify_one(&mut self) -> bool {
124         self.notify(Notify::One)
125     }
126 
127     /// Wakes up all members in the WakerList, and return the result.
128     #[inline]
notify_all(&mut self) -> bool129     pub fn notify_all(&mut self) -> bool {
130         self.notify(Notify::All)
131     }
132 }
133 
134 /// The guard holding the WakerList.
135 pub(crate) struct Lock<'a> {
136     waker_set: &'a WakerList,
137 }
138 
139 impl Drop for Lock<'_> {
140     #[inline]
drop(&mut self)141     fn drop(&mut self) {
142         let mut flag = 0;
143         // If there're members that can be notified, set the third left most bit, which
144         // means to add NOTIFIABLE state to the flag.
145         if !self.wake_list.is_empty() {
146             flag |= NOTIFIABLE;
147         }
148         self.waker_set.flag.store(flag, Ordering::SeqCst);
149     }
150 }
151 
152 impl Deref for Lock<'_> {
153     type Target = Inner;
154 
deref(&self) -> &Inner155     fn deref(&self) -> &Inner {
156         unsafe { &*self.waker_set.inner.get() }
157     }
158 }
159 
160 impl DerefMut for Lock<'_> {
deref_mut(&mut self) -> &mut Inner161     fn deref_mut(&mut self) -> &mut Inner {
162         unsafe { &mut *self.waker_set.inner.get() }
163     }
164 }
165 
166 #[derive(Clone, Copy, Eq, PartialEq)]
167 enum Notify {
168     // Wake up one more member based on the current state
169     One,
170     // Wake up all members
171     All,
172 }
173 
174 #[cfg(test)]
175 mod tests {
176     use super::*;
177 
178     /// UT test cases for WakeList::new().
179     ///
180     /// # Brief
181     /// 1. Check the initial value of flag.
182     /// 2. Check the initial value of waiting_number.
183     #[test]
ut_wakelist_new_01()184     fn ut_wakelist_new_01() {
185         let wakelist = WakerList::new();
186         assert_eq!(wakelist.flag.load(Ordering::SeqCst), 0);
187         unsafe {
188             assert_eq!((*wakelist.inner.get()).wake_list.len, 0);
189         }
190     }
191 }
192