1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
16
17 use crate::error::ErrorKind;
18
19 /// Task is currently running
20 const RUNNING: usize = 0b0001;
21 /// Task is in the schedule list
22 const SCHEDULING: usize = 0b0100;
23 /// Task has finished
24 const FINISHED: usize = 0b1000;
25 /// Task gets canceled
26 const CANCELED: usize = 0b1_0000;
27 /// Task needs to send the finished result back to the join handle
28 const CARE_JOIN_HANDLE: usize = 0b10_0000;
29 /// Task currently holds a waker to the join handle
30 const JOIN_WAKER: usize = 0b100_0000;
31
32 const RC_MASK: usize = !0b11_1111_1111;
33
34 const RC_SHIFT: usize = RC_MASK.count_zeros() as usize;
35
36 /// Reference count
37 const REF_ONE: usize = 1 << RC_SHIFT;
38
39 /// Initial state contains two ref count, one is held by join_handle, another
40 /// one is held by task itself.
41 const INIT: usize = CARE_JOIN_HANDLE | SCHEDULING | (REF_ONE * 2);
42
43 #[inline]
ref_count(state: usize) -> usize44 pub(crate) fn ref_count(state: usize) -> usize {
45 (state & RC_MASK) >> RC_SHIFT
46 }
47
48 #[inline]
is_last_ref_count(prev: usize) -> bool49 pub(crate) fn is_last_ref_count(prev: usize) -> bool {
50 ref_count(prev) == 1
51 }
52
53 #[inline]
is_canceled(cur: usize) -> bool54 pub(crate) fn is_canceled(cur: usize) -> bool {
55 cur & CANCELED == CANCELED
56 }
57
58 #[inline]
is_care_join_handle(cur: usize) -> bool59 pub(crate) fn is_care_join_handle(cur: usize) -> bool {
60 cur & CARE_JOIN_HANDLE == CARE_JOIN_HANDLE
61 }
62
63 #[inline]
is_finished(cur: usize) -> bool64 pub(crate) fn is_finished(cur: usize) -> bool {
65 cur & FINISHED == FINISHED
66 }
67
68 #[inline]
is_set_waker(cur: usize) -> bool69 pub(crate) fn is_set_waker(cur: usize) -> bool {
70 cur & JOIN_WAKER == JOIN_WAKER
71 }
72
73 #[inline]
is_scheduling(cur: usize) -> bool74 pub(crate) fn is_scheduling(cur: usize) -> bool {
75 cur & SCHEDULING == SCHEDULING
76 }
77
78 #[inline]
is_running(cur: usize) -> bool79 pub(crate) fn is_running(cur: usize) -> bool {
80 cur & RUNNING == RUNNING
81 }
82
83 // A task need to satisfy these state requirements in order to get pushed back
84 // to the schedule list.
85 #[inline]
need_enqueue(cur: usize) -> bool86 pub(crate) fn need_enqueue(cur: usize) -> bool {
87 (cur & SCHEDULING != SCHEDULING) && (cur & RUNNING != RUNNING) && (cur & FINISHED != FINISHED)
88 }
89
90 pub(crate) enum StateAction {
91 Success,
92 Canceled(usize),
93 Failed(usize),
94 Enqueue,
95 }
96
97 pub(crate) struct TaskState(AtomicUsize);
98 impl TaskState {
99 #[inline]
new() -> Self100 pub(crate) fn new() -> Self {
101 TaskState(AtomicUsize::new(INIT))
102 }
103
104 #[inline]
dec_ref(&self) -> usize105 pub(crate) fn dec_ref(&self) -> usize {
106 self.0.fetch_sub(REF_ONE, AcqRel)
107 }
108
109 #[inline]
inc_ref(&self)110 pub(crate) fn inc_ref(&self) {
111 self.0.fetch_add(REF_ONE, AcqRel);
112 }
113
114 #[inline]
get_current_state(&self) -> usize115 pub(crate) fn get_current_state(&self) -> usize {
116 self.0.load(Acquire)
117 }
118
119 /// Turns the task state into running. Contains CAS operations.
120 ///
121 /// Fails when the task is already running, scheduling or is already
122 /// finished.
turning_to_running(&self) -> StateAction123 pub(crate) fn turning_to_running(&self) -> StateAction {
124 let mut cur = self.get_current_state();
125 loop {
126 let mut action = StateAction::Success;
127
128 if is_running(cur) || is_finished(cur) || !is_scheduling(cur) {
129 return StateAction::Failed(cur);
130 }
131
132 let mut next = cur;
133 next &= !SCHEDULING;
134 next |= RUNNING;
135 if is_canceled(next) {
136 action = StateAction::Canceled(next);
137 }
138
139 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
140 match res {
141 Ok(_) => return action,
142 Err(actual) => cur = actual,
143 }
144 }
145 }
146
147 /// Turns the task state into finished. Contains CAS operations.
148 ///
149 /// Fails when the task is already finished or is not running.
turning_to_finish(&self) -> Result<usize, ErrorKind>150 pub(crate) fn turning_to_finish(&self) -> Result<usize, ErrorKind> {
151 let mut cur = self.get_current_state();
152
153 loop {
154 if is_finished(cur) {
155 return Err(ErrorKind::TaskShutdown);
156 }
157
158 if !is_running(cur) {
159 return Err(ErrorKind::TaskStateInvalid);
160 }
161 let mut next = cur;
162 next &= !RUNNING;
163 next |= FINISHED;
164
165 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
166 match res {
167 Ok(_) => return Ok(next),
168 Err(actual) => cur = actual,
169 }
170 }
171 }
172
173 /// Turns the task state into idle. Contains CAS operations.
174 ///
175 /// Fails when the task is canceled or running.
turning_to_idle(&self) -> StateAction176 pub(crate) fn turning_to_idle(&self) -> StateAction {
177 let mut cur = self.get_current_state();
178
179 loop {
180 let mut action = StateAction::Success;
181
182 if !is_running(cur) {
183 return StateAction::Failed(cur);
184 }
185
186 if is_canceled(cur) {
187 return StateAction::Canceled(cur);
188 }
189
190 let mut next = cur;
191 next &= !RUNNING;
192
193 if is_scheduling(next) {
194 action = StateAction::Enqueue;
195 }
196
197 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
198 match res {
199 Ok(_) => return action,
200 Err(actual) => cur = actual,
201 }
202 }
203 }
204
205 /// Turns the task state into scheduling. Returns the old state value.
206 #[inline]
turn_to_scheduling(&self) -> usize207 pub(crate) fn turn_to_scheduling(&self) -> usize {
208 self.0.fetch_or(SCHEDULING, AcqRel)
209 }
210
211 /// Turns the task state into unset_waker. Contains CAS operations.
212 ///
213 /// Fails when the task is already finished.
turn_to_un_set_waker(&self) -> Result<usize, usize>214 pub(crate) fn turn_to_un_set_waker(&self) -> Result<usize, usize> {
215 let mut cur = self.get_current_state();
216
217 loop {
218 if !is_care_join_handle(cur) || !is_set_waker(cur) {
219 return Err(cur);
220 }
221
222 if is_finished(cur) {
223 return Err(cur);
224 }
225
226 let mut next = cur;
227 next &= !JOIN_WAKER;
228
229 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
230 match res {
231 Ok(_) => return Ok(next),
232 Err(actual) => cur = actual,
233 }
234 }
235 }
236
237 /// Turns off the Join_Waker bit of the task state. Contains CAS operations.
238 ///
239 /// Fails when the task is already finished.
turn_to_set_waker(&self) -> Result<usize, usize>240 pub(crate) fn turn_to_set_waker(&self) -> Result<usize, usize> {
241 let mut cur = self.get_current_state();
242
243 loop {
244 if !is_care_join_handle(cur) || is_set_waker(cur) {
245 return Err(cur);
246 }
247 if is_finished(cur) {
248 return Err(cur);
249 }
250
251 let mut next = cur;
252 next |= JOIN_WAKER;
253
254 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
255 match res {
256 Ok(_) => return Ok(next),
257 Err(actual) => cur = actual,
258 }
259 }
260 }
261
turn_to_canceled_and_scheduled(&self) -> bool262 pub(crate) fn turn_to_canceled_and_scheduled(&self) -> bool {
263 let mut cur = self.get_current_state();
264
265 loop {
266 if is_canceled(cur) || is_finished(cur) {
267 return false;
268 }
269
270 let mut next = cur;
271 let need_schedule = if is_running(cur) {
272 next |= SCHEDULING;
273 next |= CANCELED;
274 false
275 } else {
276 next |= CANCELED;
277 if !is_scheduling(next) {
278 next |= SCHEDULING;
279 true
280 } else {
281 false
282 }
283 };
284
285 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
286 match res {
287 Ok(_) => return need_schedule,
288 Err(actual) => cur = actual,
289 }
290 }
291 }
292
293 /// Turns off the CARE_JOIN_HANDLE bit of the task state. Contains CAS
294 /// operations.
295 ///
296 /// Fails when the task is already finished.
turn_to_un_join_handle(&self) -> Result<usize, ()>297 pub(crate) fn turn_to_un_join_handle(&self) -> Result<usize, ()> {
298 let mut cur = self.get_current_state();
299
300 loop {
301 if is_finished(cur) {
302 return Err(());
303 }
304
305 let mut next = cur;
306 next &= !CARE_JOIN_HANDLE;
307
308 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
309 match res {
310 Ok(_) => return Ok(next),
311 Err(actual) => cur = actual,
312 }
313 }
314 }
315
316 /// Attempts to turn off the CARE_JOIN_HANDLE bit of the task state.
317 ///
318 /// Returns true if successfully changed. Otherwise, returns false.
try_turning_to_un_join_handle(&self) -> bool319 pub(crate) fn try_turning_to_un_join_handle(&self) -> bool {
320 let old = INIT;
321 let new = (INIT - REF_ONE) & !CARE_JOIN_HANDLE;
322 self.0.compare_exchange(old, new, Relaxed, Relaxed) == Ok(old)
323 }
324 }
325
326 #[cfg(test)]
327 mod test {
328 use std::sync::atomic::Ordering::{Acquire, Release};
329
330 use crate::task::state::{
331 StateAction, TaskState, CANCELED, CARE_JOIN_HANDLE, FINISHED, INIT, JOIN_WAKER, REF_ONE,
332 RUNNING, SCHEDULING,
333 };
334
335 /// UT test cases for TaskState::new()
336 ///
337 /// # Brief
338 /// 1. Verify that the status of the initialized completed task is INIT
339 #[test]
ut_task_state_new()340 fn ut_task_state_new() {
341 let task_state = TaskState::new();
342 assert_eq!(task_state.0.load(Acquire), INIT);
343 }
344
345 /// UT test cases for TaskState::dec_ref()
346 ///
347 /// # Brief
348 /// 1. Verify that the status of the initialized completed task is
349 /// INIT.wrapping_sub(REF_ONE) value should be INIT.wrapping_sub(REF_ONE)
350 #[test]
ut_task_state_dec_ref()351 fn ut_task_state_dec_ref() {
352 let task_state = TaskState::new();
353 task_state.dec_ref();
354 assert_eq!(task_state.0.load(Acquire), INIT.wrapping_sub(REF_ONE))
355 }
356
357 /// UT test cases for TaskState::inc_ref()
358 ///
359 /// # Brief
360 /// 1. Verify that the status of the initialized completed task is
361 /// INIT.wrapping_add(REF_ONE)
362 #[test]
ut_task_state_inc_ref()363 fn ut_task_state_inc_ref() {
364 let task_state = TaskState::new();
365 task_state.inc_ref();
366 assert_eq!(task_state.0.load(Acquire), INIT.wrapping_add(REF_ONE));
367 }
368
369 /// UT test cases for TaskState::get_current_state()
370 ///
371 /// # Brief
372 /// 1. Verify that the status of the initialized completed task is INIT
373 #[test]
ut_task_state_get_current_state()374 fn ut_task_state_get_current_state() {
375 let task_state = TaskState::new();
376 assert_eq!(task_state.get_current_state(), INIT);
377 }
378
379 /// UT test cases for TaskState::turning_to_running()
380 ///
381 /// # Brief
382 /// 1. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == true,
383 /// represents the current state is already running state or has ended
384 /// the state, the state does not information is not correct, directly
385 /// return failure
386 /// 2. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == false,
387 /// cur & SCHEDULING != SCHEDULING == true, means the current state is
388 /// not schedule state, and the status information is not correct, so it
389 /// returns an error directly
390 #[test]
ut_task_state_turning_to_running()391 fn ut_task_state_turning_to_running() {
392 let task_state = TaskState::new();
393 let mut test_task_state = INIT;
394 test_task_state &= !SCHEDULING;
395 test_task_state |= RUNNING;
396
397 match task_state.turning_to_running() {
398 StateAction::Success => {}
399 _ => panic!(),
400 }
401
402 match task_state.turning_to_running() {
403 StateAction::Failed(x) => assert_eq!(x, test_task_state),
404 _ => panic!(),
405 }
406 }
407
408 /// UT test cases for TaskState::turning_to_finish()
409 ///
410 /// # Brief
411 /// 1. cur & FINISHED == FINISHED == true, Represents the current state is
412 /// already the end state, the state does not information is not correct,
413 /// directly return failure
414 /// 2. cur & FINISHED == FINISHED == false, cur & RUNNING != RUNNING ==
415 /// true, means the current state is not running, and the status
416 /// information is not correct, so the error is returned directly
417 #[test]
ut_task_state_turning_to_finish()418 fn ut_task_state_turning_to_finish() {
419 let task_state = TaskState::new();
420 task_state.turning_to_running();
421 let mut test_task_state = INIT;
422 test_task_state &= !RUNNING;
423 test_task_state |= FINISHED;
424 test_task_state &= !SCHEDULING;
425 let ret = task_state.turning_to_finish().unwrap();
426 assert_eq!(ret, test_task_state);
427 assert!(task_state.turning_to_finish().is_err());
428 }
429
430 /// UT test cases for turning_to_idle
431 ///
432 /// # Brief
433 /// 1. Create a TaskState, set it to Canceled & Running
434 /// 2. Call turning_to_idle, check if return value equals to
435 /// StateAction::canceled
436 /// 3. Create a TaskState, set it to init
437 /// 4. Call turning_to_idle, check if return value equals to
438 /// StateAction::Failed
439 /// 5. Create a TaskState, set it to Running and not scheduling
440 /// 6. Call turning_to_idle, check if return value equals to
441 /// StateAction::Success
442 /// 7. Create a TaskState, set it to Running and scheduling
443 #[test]
ut_task_state_turning_to_idle()444 fn ut_task_state_turning_to_idle() {
445 let task_state = TaskState::new();
446 let mut next_state = task_state.0.load(Acquire);
447 next_state |= CANCELED;
448 next_state |= RUNNING;
449 task_state.0.store(next_state, Release);
450 match task_state.turning_to_idle() {
451 StateAction::Canceled(cur) => assert_eq!(cur, next_state),
452 _ => panic!(),
453 }
454
455 let task_state = TaskState::new();
456 match task_state.turning_to_idle() {
457 StateAction::Failed(cur) => assert_eq!(cur, INIT),
458 _ => panic!(),
459 }
460
461 let task_state = TaskState::new();
462 let mut next_state = task_state.0.load(Acquire);
463 next_state |= RUNNING;
464 next_state &= !SCHEDULING;
465 task_state.0.store(next_state, Release);
466 let mut test_state = next_state;
467 test_state &= !RUNNING;
468 match task_state.turning_to_idle() {
469 StateAction::Success => assert_eq!(task_state.0.load(Acquire), test_state),
470 _ => panic!(),
471 }
472
473 let task_state = TaskState::new();
474 let mut next_state = task_state.0.load(Acquire);
475 next_state |= RUNNING;
476 next_state |= SCHEDULING;
477 task_state.0.store(next_state, Release);
478 match task_state.turning_to_idle() {
479 StateAction::Enqueue => {}
480 _ => panic!(),
481 }
482 }
483
484 /// UT test cases for TaskState::turn_to_scheduling()
485 ///
486 /// # Brief
487 /// 1. Check if the state transition is SCHEDULING
488 #[test]
ut_task_state_turning_to_scheduling()489 fn ut_task_state_turning_to_scheduling() {
490 let task_state = TaskState::new();
491 let mut test_state = task_state.0.load(Acquire);
492 test_state |= SCHEDULING;
493 assert_eq!(task_state.turn_to_scheduling(), test_state);
494 }
495
496 /// UT test cases for TaskState::turn_to_un_set_waker()
497 ///
498 /// # Brief
499 /// 1. !is_care_join_handle(cur) || !is_set_waker(cur) == true, means that
500 /// the current state is neither focused on hooks nor set waker
501 /// 2. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
502 /// FINISHED == FINISHED == true, means the current status is FINISHED,
503 /// directly return failure
504 /// 3. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
505 /// FINISHED == FINISHED == false
506 #[test]
ut_task_state_turn_to_un_set_waker()507 fn ut_task_state_turn_to_un_set_waker() {
508 let task_state = TaskState::new();
509 let mut next_state = task_state.0.load(Acquire);
510 next_state &= !CARE_JOIN_HANDLE;
511 next_state &= !JOIN_WAKER;
512 task_state.0.store(next_state, Release);
513 assert!(task_state.turn_to_un_set_waker().is_err());
514
515 let task_state = TaskState::new();
516 let mut next_state = task_state.0.load(Acquire);
517 next_state |= CARE_JOIN_HANDLE;
518 next_state |= JOIN_WAKER;
519 next_state |= FINISHED;
520 task_state.0.store(next_state, Release);
521 assert!(task_state.turn_to_un_set_waker().is_err());
522
523 let task_state = TaskState::new();
524 let mut next_state = task_state.0.load(Acquire);
525 next_state |= CARE_JOIN_HANDLE;
526 next_state |= JOIN_WAKER;
527 next_state &= !FINISHED;
528 task_state.0.store(next_state, Release);
529 assert!(task_state.turn_to_un_set_waker().is_ok());
530 }
531
532 /// UT test cases for TaskState::turn_to_set_waker()
533 ///
534 /// # Brief
535 /// 1. !is_care_join_handle(cur) || is_set_waker(cur) == true, means that
536 /// the current state is neither concerned with hooks, has set waker
537 /// 2. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
538 /// FINISHED == FINISHED == true, means the current status is FINISHED,
539 /// directly return failure
540 /// 3. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
541 /// FINISHED == FINISHED == false
542 #[test]
ut_task_state_turn_to_set_waker()543 fn ut_task_state_turn_to_set_waker() {
544 let task_state = TaskState::new();
545 let mut next_state = task_state.0.load(Acquire);
546 next_state &= !CARE_JOIN_HANDLE;
547 next_state |= JOIN_WAKER;
548 task_state.0.store(next_state, Release);
549 assert!(task_state.turn_to_set_waker().is_err());
550
551 let task_state = TaskState::new();
552 let mut next_state = task_state.0.load(Acquire);
553 next_state |= CARE_JOIN_HANDLE;
554 next_state &= !JOIN_WAKER;
555 next_state |= FINISHED;
556 task_state.0.store(next_state, Release);
557 assert!(task_state.turn_to_set_waker().is_err());
558
559 let task_state = TaskState::new();
560 let mut next_state = task_state.0.load(Acquire);
561 next_state |= CARE_JOIN_HANDLE;
562 next_state &= !JOIN_WAKER;
563 next_state &= !FINISHED;
564 task_state.0.store(next_state, Release);
565 assert!(task_state.turn_to_set_waker().is_ok());
566 }
567
568 /// UT test cases for TaskState::turn_to_un_join_handle()
569 ///
570 /// # Brief
571 /// 1. cur & FINISHED == FINISHED == true, means the current state is
572 /// FINISHED state, directly return failure
573 /// 2. cur & FINISHED == FINISHED == false
574 #[test]
ut_task_state_turn_to_un_join_handle()575 fn ut_task_state_turn_to_un_join_handle() {
576 let task_state = TaskState::new();
577 let mut next_state = task_state.0.load(Acquire);
578 next_state |= FINISHED;
579 task_state.0.store(next_state, Release);
580 assert!(task_state.turn_to_un_join_handle().is_err());
581
582 let task_state = TaskState::new();
583 let mut next_state = task_state.0.load(Acquire);
584 next_state &= !FINISHED;
585 task_state.0.store(next_state, Release);
586 assert!(task_state.turn_to_un_join_handle().is_ok());
587 }
588
589 /// UT test cases for TaskState::try_turning_to_un_join_handle()
590 ///
591 /// # Brief
592 /// 1. After calling this function, check if the status is modified to
593 /// CARE_JOIN_HANDLE
594 #[test]
ut_task_state_turning_to_un_join_handle()595 fn ut_task_state_turning_to_un_join_handle() {
596 let task_state = TaskState::new();
597 assert!(task_state.try_turning_to_un_join_handle());
598 }
599 }
600