1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
16 
17 use crate::error::ErrorKind;
18 
19 /// Task is currently running
20 const RUNNING: usize = 0b0001;
21 /// Task is in the schedule list
22 const SCHEDULING: usize = 0b0100;
23 /// Task has finished
24 const FINISHED: usize = 0b1000;
25 /// Task gets canceled
26 const CANCELED: usize = 0b1_0000;
27 /// Task needs to send the finished result back to the join handle
28 const CARE_JOIN_HANDLE: usize = 0b10_0000;
29 /// Task currently holds a waker to the join handle
30 const JOIN_WAKER: usize = 0b100_0000;
31 
32 const RC_MASK: usize = !0b11_1111_1111;
33 
34 const RC_SHIFT: usize = RC_MASK.count_zeros() as usize;
35 
36 /// Reference count
37 const REF_ONE: usize = 1 << RC_SHIFT;
38 
39 /// Initial state contains two ref count, one is held by join_handle, another
40 /// one is held by task itself.
41 const INIT: usize = CARE_JOIN_HANDLE | SCHEDULING | (REF_ONE * 2);
42 
43 #[inline]
ref_count(state: usize) -> usize44 pub(crate) fn ref_count(state: usize) -> usize {
45     (state & RC_MASK) >> RC_SHIFT
46 }
47 
48 #[inline]
is_last_ref_count(prev: usize) -> bool49 pub(crate) fn is_last_ref_count(prev: usize) -> bool {
50     ref_count(prev) == 1
51 }
52 
53 #[inline]
is_canceled(cur: usize) -> bool54 pub(crate) fn is_canceled(cur: usize) -> bool {
55     cur & CANCELED == CANCELED
56 }
57 
58 #[inline]
is_care_join_handle(cur: usize) -> bool59 pub(crate) fn is_care_join_handle(cur: usize) -> bool {
60     cur & CARE_JOIN_HANDLE == CARE_JOIN_HANDLE
61 }
62 
63 #[inline]
is_finished(cur: usize) -> bool64 pub(crate) fn is_finished(cur: usize) -> bool {
65     cur & FINISHED == FINISHED
66 }
67 
68 #[inline]
is_set_waker(cur: usize) -> bool69 pub(crate) fn is_set_waker(cur: usize) -> bool {
70     cur & JOIN_WAKER == JOIN_WAKER
71 }
72 
73 #[inline]
is_scheduling(cur: usize) -> bool74 pub(crate) fn is_scheduling(cur: usize) -> bool {
75     cur & SCHEDULING == SCHEDULING
76 }
77 
78 #[inline]
is_running(cur: usize) -> bool79 pub(crate) fn is_running(cur: usize) -> bool {
80     cur & RUNNING == RUNNING
81 }
82 
83 // A task need to satisfy these state requirements in order to get pushed back
84 // to the schedule list.
85 #[inline]
need_enqueue(cur: usize) -> bool86 pub(crate) fn need_enqueue(cur: usize) -> bool {
87     (cur & SCHEDULING != SCHEDULING) && (cur & RUNNING != RUNNING) && (cur & FINISHED != FINISHED)
88 }
89 
90 pub(crate) enum StateAction {
91     Success,
92     Canceled(usize),
93     Failed(usize),
94     Enqueue,
95 }
96 
97 pub(crate) struct TaskState(AtomicUsize);
98 impl TaskState {
99     #[inline]
new() -> Self100     pub(crate) fn new() -> Self {
101         TaskState(AtomicUsize::new(INIT))
102     }
103 
104     #[inline]
dec_ref(&self) -> usize105     pub(crate) fn dec_ref(&self) -> usize {
106         self.0.fetch_sub(REF_ONE, AcqRel)
107     }
108 
109     #[inline]
inc_ref(&self)110     pub(crate) fn inc_ref(&self) {
111         self.0.fetch_add(REF_ONE, AcqRel);
112     }
113 
114     #[inline]
get_current_state(&self) -> usize115     pub(crate) fn get_current_state(&self) -> usize {
116         self.0.load(Acquire)
117     }
118 
119     /// Turns the task state into running. Contains CAS operations.
120     ///
121     /// Fails when the task is already running, scheduling or is already
122     /// finished.
turning_to_running(&self) -> StateAction123     pub(crate) fn turning_to_running(&self) -> StateAction {
124         let mut cur = self.get_current_state();
125         loop {
126             let mut action = StateAction::Success;
127 
128             if is_running(cur) || is_finished(cur) || !is_scheduling(cur) {
129                 return StateAction::Failed(cur);
130             }
131 
132             let mut next = cur;
133             next &= !SCHEDULING;
134             next |= RUNNING;
135             if is_canceled(next) {
136                 action = StateAction::Canceled(next);
137             }
138 
139             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
140             match res {
141                 Ok(_) => return action,
142                 Err(actual) => cur = actual,
143             }
144         }
145     }
146 
147     /// Turns the task state into finished. Contains CAS operations.
148     ///
149     /// Fails when the task is already finished or is not running.
turning_to_finish(&self) -> Result<usize, ErrorKind>150     pub(crate) fn turning_to_finish(&self) -> Result<usize, ErrorKind> {
151         let mut cur = self.get_current_state();
152 
153         loop {
154             if is_finished(cur) {
155                 return Err(ErrorKind::TaskShutdown);
156             }
157 
158             if !is_running(cur) {
159                 return Err(ErrorKind::TaskStateInvalid);
160             }
161             let mut next = cur;
162             next &= !RUNNING;
163             next |= FINISHED;
164 
165             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
166             match res {
167                 Ok(_) => return Ok(next),
168                 Err(actual) => cur = actual,
169             }
170         }
171     }
172 
173     /// Turns the task state into idle. Contains CAS operations.
174     ///
175     /// Fails when the task is canceled or running.
turning_to_idle(&self) -> StateAction176     pub(crate) fn turning_to_idle(&self) -> StateAction {
177         let mut cur = self.get_current_state();
178 
179         loop {
180             let mut action = StateAction::Success;
181 
182             if !is_running(cur) {
183                 return StateAction::Failed(cur);
184             }
185 
186             if is_canceled(cur) {
187                 return StateAction::Canceled(cur);
188             }
189 
190             let mut next = cur;
191             next &= !RUNNING;
192 
193             if is_scheduling(next) {
194                 action = StateAction::Enqueue;
195             }
196 
197             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
198             match res {
199                 Ok(_) => return action,
200                 Err(actual) => cur = actual,
201             }
202         }
203     }
204 
205     /// Turns the task state into scheduling. Returns the old state value.
206     #[inline]
turn_to_scheduling(&self) -> usize207     pub(crate) fn turn_to_scheduling(&self) -> usize {
208         self.0.fetch_or(SCHEDULING, AcqRel)
209     }
210 
211     /// Turns the task state into unset_waker. Contains CAS operations.
212     ///
213     /// Fails when the task is already finished.
turn_to_un_set_waker(&self) -> Result<usize, usize>214     pub(crate) fn turn_to_un_set_waker(&self) -> Result<usize, usize> {
215         let mut cur = self.get_current_state();
216 
217         loop {
218             if !is_care_join_handle(cur) || !is_set_waker(cur) {
219                 return Err(cur);
220             }
221 
222             if is_finished(cur) {
223                 return Err(cur);
224             }
225 
226             let mut next = cur;
227             next &= !JOIN_WAKER;
228 
229             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
230             match res {
231                 Ok(_) => return Ok(next),
232                 Err(actual) => cur = actual,
233             }
234         }
235     }
236 
237     /// Turns off the Join_Waker bit of the task state. Contains CAS operations.
238     ///
239     /// Fails when the task is already finished.
turn_to_set_waker(&self) -> Result<usize, usize>240     pub(crate) fn turn_to_set_waker(&self) -> Result<usize, usize> {
241         let mut cur = self.get_current_state();
242 
243         loop {
244             if !is_care_join_handle(cur) || is_set_waker(cur) {
245                 return Err(cur);
246             }
247             if is_finished(cur) {
248                 return Err(cur);
249             }
250 
251             let mut next = cur;
252             next |= JOIN_WAKER;
253 
254             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
255             match res {
256                 Ok(_) => return Ok(next),
257                 Err(actual) => cur = actual,
258             }
259         }
260     }
261 
turn_to_canceled_and_scheduled(&self) -> bool262     pub(crate) fn turn_to_canceled_and_scheduled(&self) -> bool {
263         let mut cur = self.get_current_state();
264 
265         loop {
266             if is_canceled(cur) || is_finished(cur) {
267                 return false;
268             }
269 
270             let mut next = cur;
271             let need_schedule = if is_running(cur) {
272                 next |= SCHEDULING;
273                 next |= CANCELED;
274                 false
275             } else {
276                 next |= CANCELED;
277                 if !is_scheduling(next) {
278                     next |= SCHEDULING;
279                     true
280                 } else {
281                     false
282                 }
283             };
284 
285             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
286             match res {
287                 Ok(_) => return need_schedule,
288                 Err(actual) => cur = actual,
289             }
290         }
291     }
292 
293     /// Turns off the CARE_JOIN_HANDLE bit of the task state. Contains CAS
294     /// operations.
295     ///
296     /// Fails when the task is already finished.
turn_to_un_join_handle(&self) -> Result<usize, ()>297     pub(crate) fn turn_to_un_join_handle(&self) -> Result<usize, ()> {
298         let mut cur = self.get_current_state();
299 
300         loop {
301             if is_finished(cur) {
302                 return Err(());
303             }
304 
305             let mut next = cur;
306             next &= !CARE_JOIN_HANDLE;
307 
308             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
309             match res {
310                 Ok(_) => return Ok(next),
311                 Err(actual) => cur = actual,
312             }
313         }
314     }
315 
316     /// Attempts to turn off the CARE_JOIN_HANDLE bit of the task state.
317     ///
318     /// Returns true if successfully changed. Otherwise, returns false.
try_turning_to_un_join_handle(&self) -> bool319     pub(crate) fn try_turning_to_un_join_handle(&self) -> bool {
320         let old = INIT;
321         let new = (INIT - REF_ONE) & !CARE_JOIN_HANDLE;
322         self.0.compare_exchange(old, new, Relaxed, Relaxed) == Ok(old)
323     }
324 }
325 
326 #[cfg(test)]
327 mod test {
328     use std::sync::atomic::Ordering::{Acquire, Release};
329 
330     use crate::task::state::{
331         StateAction, TaskState, CANCELED, CARE_JOIN_HANDLE, FINISHED, INIT, JOIN_WAKER, REF_ONE,
332         RUNNING, SCHEDULING,
333     };
334 
335     /// UT test cases for TaskState::new()
336     ///
337     /// # Brief
338     /// 1. Verify that the status of the initialized completed task is INIT
339     #[test]
ut_task_state_new()340     fn ut_task_state_new() {
341         let task_state = TaskState::new();
342         assert_eq!(task_state.0.load(Acquire), INIT);
343     }
344 
345     /// UT test cases for TaskState::dec_ref()
346     ///
347     /// # Brief
348     /// 1. Verify that the status of the initialized completed task is
349     ///    INIT.wrapping_sub(REF_ONE) value should be INIT.wrapping_sub(REF_ONE)
350     #[test]
ut_task_state_dec_ref()351     fn ut_task_state_dec_ref() {
352         let task_state = TaskState::new();
353         task_state.dec_ref();
354         assert_eq!(task_state.0.load(Acquire), INIT.wrapping_sub(REF_ONE))
355     }
356 
357     /// UT test cases for TaskState::inc_ref()
358     ///
359     /// # Brief
360     /// 1. Verify that the status of the initialized completed task is
361     ///    INIT.wrapping_add(REF_ONE)
362     #[test]
ut_task_state_inc_ref()363     fn ut_task_state_inc_ref() {
364         let task_state = TaskState::new();
365         task_state.inc_ref();
366         assert_eq!(task_state.0.load(Acquire), INIT.wrapping_add(REF_ONE));
367     }
368 
369     /// UT test cases for TaskState::get_current_state()
370     ///
371     /// # Brief
372     /// 1. Verify that the status of the initialized completed task is INIT
373     #[test]
ut_task_state_get_current_state()374     fn ut_task_state_get_current_state() {
375         let task_state = TaskState::new();
376         assert_eq!(task_state.get_current_state(), INIT);
377     }
378 
379     /// UT test cases for TaskState::turning_to_running()
380     ///
381     /// # Brief
382     /// 1. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == true,
383     ///    represents the current state is already running state or has ended
384     ///    the state, the state does not information is not correct, directly
385     ///    return failure
386     /// 2. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == false,
387     ///    cur & SCHEDULING != SCHEDULING == true, means the current state is
388     ///    not schedule state, and the status information is not correct, so it
389     ///    returns an error directly
390     #[test]
ut_task_state_turning_to_running()391     fn ut_task_state_turning_to_running() {
392         let task_state = TaskState::new();
393         let mut test_task_state = INIT;
394         test_task_state &= !SCHEDULING;
395         test_task_state |= RUNNING;
396 
397         match task_state.turning_to_running() {
398             StateAction::Success => {}
399             _ => panic!(),
400         }
401 
402         match task_state.turning_to_running() {
403             StateAction::Failed(x) => assert_eq!(x, test_task_state),
404             _ => panic!(),
405         }
406     }
407 
408     /// UT test cases for TaskState::turning_to_finish()
409     ///
410     /// # Brief
411     /// 1. cur & FINISHED == FINISHED == true, Represents the current state is
412     ///    already the end state, the state does not information is not correct,
413     ///    directly return failure
414     /// 2. cur & FINISHED == FINISHED == false, cur & RUNNING != RUNNING ==
415     ///    true, means the current state is not running, and the status
416     ///    information is not correct, so the error is returned directly
417     #[test]
ut_task_state_turning_to_finish()418     fn ut_task_state_turning_to_finish() {
419         let task_state = TaskState::new();
420         task_state.turning_to_running();
421         let mut test_task_state = INIT;
422         test_task_state &= !RUNNING;
423         test_task_state |= FINISHED;
424         test_task_state &= !SCHEDULING;
425         let ret = task_state.turning_to_finish().unwrap();
426         assert_eq!(ret, test_task_state);
427         assert!(task_state.turning_to_finish().is_err());
428     }
429 
430     /// UT test cases for turning_to_idle
431     ///
432     /// # Brief
433     /// 1. Create a TaskState, set it to Canceled & Running
434     /// 2. Call turning_to_idle, check if return value equals to
435     ///    StateAction::canceled
436     /// 3. Create a TaskState, set it to init
437     /// 4. Call turning_to_idle, check if return value equals to
438     ///    StateAction::Failed
439     /// 5. Create a TaskState, set it to Running and not scheduling
440     /// 6. Call turning_to_idle, check if return value equals to
441     ///    StateAction::Success
442     /// 7. Create a TaskState, set it to Running and scheduling
443     #[test]
ut_task_state_turning_to_idle()444     fn ut_task_state_turning_to_idle() {
445         let task_state = TaskState::new();
446         let mut next_state = task_state.0.load(Acquire);
447         next_state |= CANCELED;
448         next_state |= RUNNING;
449         task_state.0.store(next_state, Release);
450         match task_state.turning_to_idle() {
451             StateAction::Canceled(cur) => assert_eq!(cur, next_state),
452             _ => panic!(),
453         }
454 
455         let task_state = TaskState::new();
456         match task_state.turning_to_idle() {
457             StateAction::Failed(cur) => assert_eq!(cur, INIT),
458             _ => panic!(),
459         }
460 
461         let task_state = TaskState::new();
462         let mut next_state = task_state.0.load(Acquire);
463         next_state |= RUNNING;
464         next_state &= !SCHEDULING;
465         task_state.0.store(next_state, Release);
466         let mut test_state = next_state;
467         test_state &= !RUNNING;
468         match task_state.turning_to_idle() {
469             StateAction::Success => assert_eq!(task_state.0.load(Acquire), test_state),
470             _ => panic!(),
471         }
472 
473         let task_state = TaskState::new();
474         let mut next_state = task_state.0.load(Acquire);
475         next_state |= RUNNING;
476         next_state |= SCHEDULING;
477         task_state.0.store(next_state, Release);
478         match task_state.turning_to_idle() {
479             StateAction::Enqueue => {}
480             _ => panic!(),
481         }
482     }
483 
484     /// UT test cases for TaskState::turn_to_scheduling()
485     ///
486     /// # Brief
487     /// 1. Check if the state transition is SCHEDULING
488     #[test]
ut_task_state_turning_to_scheduling()489     fn ut_task_state_turning_to_scheduling() {
490         let task_state = TaskState::new();
491         let mut test_state = task_state.0.load(Acquire);
492         test_state |= SCHEDULING;
493         assert_eq!(task_state.turn_to_scheduling(), test_state);
494     }
495 
496     /// UT test cases for TaskState::turn_to_un_set_waker()
497     ///
498     /// # Brief
499     /// 1. !is_care_join_handle(cur) || !is_set_waker(cur) == true, means that
500     ///    the current state is neither focused on hooks nor set waker
501     /// 2. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
502     ///    FINISHED == FINISHED == true, means the current status is FINISHED,
503     ///    directly return failure
504     /// 3. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
505     ///    FINISHED == FINISHED == false
506     #[test]
ut_task_state_turn_to_un_set_waker()507     fn ut_task_state_turn_to_un_set_waker() {
508         let task_state = TaskState::new();
509         let mut next_state = task_state.0.load(Acquire);
510         next_state &= !CARE_JOIN_HANDLE;
511         next_state &= !JOIN_WAKER;
512         task_state.0.store(next_state, Release);
513         assert!(task_state.turn_to_un_set_waker().is_err());
514 
515         let task_state = TaskState::new();
516         let mut next_state = task_state.0.load(Acquire);
517         next_state |= CARE_JOIN_HANDLE;
518         next_state |= JOIN_WAKER;
519         next_state |= FINISHED;
520         task_state.0.store(next_state, Release);
521         assert!(task_state.turn_to_un_set_waker().is_err());
522 
523         let task_state = TaskState::new();
524         let mut next_state = task_state.0.load(Acquire);
525         next_state |= CARE_JOIN_HANDLE;
526         next_state |= JOIN_WAKER;
527         next_state &= !FINISHED;
528         task_state.0.store(next_state, Release);
529         assert!(task_state.turn_to_un_set_waker().is_ok());
530     }
531 
532     /// UT test cases for TaskState::turn_to_set_waker()
533     ///
534     /// # Brief
535     /// 1. !is_care_join_handle(cur) || is_set_waker(cur) == true, means that
536     ///    the current state is neither concerned with hooks, has set waker
537     /// 2. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
538     ///    FINISHED == FINISHED == true, means the current status is FINISHED,
539     ///    directly return failure
540     /// 3. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
541     ///    FINISHED == FINISHED == false
542     #[test]
ut_task_state_turn_to_set_waker()543     fn ut_task_state_turn_to_set_waker() {
544         let task_state = TaskState::new();
545         let mut next_state = task_state.0.load(Acquire);
546         next_state &= !CARE_JOIN_HANDLE;
547         next_state |= JOIN_WAKER;
548         task_state.0.store(next_state, Release);
549         assert!(task_state.turn_to_set_waker().is_err());
550 
551         let task_state = TaskState::new();
552         let mut next_state = task_state.0.load(Acquire);
553         next_state |= CARE_JOIN_HANDLE;
554         next_state &= !JOIN_WAKER;
555         next_state |= FINISHED;
556         task_state.0.store(next_state, Release);
557         assert!(task_state.turn_to_set_waker().is_err());
558 
559         let task_state = TaskState::new();
560         let mut next_state = task_state.0.load(Acquire);
561         next_state |= CARE_JOIN_HANDLE;
562         next_state &= !JOIN_WAKER;
563         next_state &= !FINISHED;
564         task_state.0.store(next_state, Release);
565         assert!(task_state.turn_to_set_waker().is_ok());
566     }
567 
568     /// UT test cases for TaskState::turn_to_un_join_handle()
569     ///
570     /// # Brief
571     /// 1. cur & FINISHED == FINISHED == true, means the current state is
572     ///    FINISHED state, directly return failure
573     /// 2. cur & FINISHED == FINISHED == false
574     #[test]
ut_task_state_turn_to_un_join_handle()575     fn ut_task_state_turn_to_un_join_handle() {
576         let task_state = TaskState::new();
577         let mut next_state = task_state.0.load(Acquire);
578         next_state |= FINISHED;
579         task_state.0.store(next_state, Release);
580         assert!(task_state.turn_to_un_join_handle().is_err());
581 
582         let task_state = TaskState::new();
583         let mut next_state = task_state.0.load(Acquire);
584         next_state &= !FINISHED;
585         task_state.0.store(next_state, Release);
586         assert!(task_state.turn_to_un_join_handle().is_ok());
587     }
588 
589     /// UT test cases for TaskState::try_turning_to_un_join_handle()
590     ///
591     /// # Brief
592     /// 1. After calling this function, check if the status is modified to
593     ///    CARE_JOIN_HANDLE
594     #[test]
ut_task_state_turning_to_un_join_handle()595     fn ut_task_state_turning_to_un_join_handle() {
596         let task_state = TaskState::new();
597         assert!(task_state.try_turning_to_un_join_handle());
598     }
599 }
600