1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::io::{Error, ErrorKind, Write};
16 use std::mem::MaybeUninit;
17 use std::os::raw::c_int;
18 use std::sync::atomic::AtomicBool;
19 use std::sync::atomic::Ordering::{Acquire, Release, SeqCst};
20 use std::sync::Once;
21 
22 use ylong_io::UnixStream;
23 use ylong_signal::register_signal_action;
24 
25 use crate::signal::SignalKind;
26 use crate::sync::watch::{channel, Receiver, Sender};
27 
28 pub(crate) struct Event {
29     inner: Sender<()>,
30     notify: AtomicBool,
31     once: Once,
32     is_registered: AtomicBool,
33 }
34 
35 impl Default for Event {
default() -> Self36     fn default() -> Self {
37         let (tx, _) = channel(());
38         Self {
39             inner: tx,
40             notify: AtomicBool::new(false),
41             once: Once::new(),
42             is_registered: AtomicBool::new(false),
43         }
44     }
45 }
46 
47 impl Event {
register<F>(&self, signal_kind: c_int, f: F) -> io::Result<()> where F: Fn() + Sync + Send + 'static,48     pub(crate) fn register<F>(&self, signal_kind: c_int, f: F) -> io::Result<()>
49     where
50         F: Fn() + Sync + Send + 'static,
51     {
52         let mut register_res = Ok(());
53         self.once.call_once(|| {
54             register_res = unsafe { register_signal_action(signal_kind, f) };
55             if register_res.is_ok() {
56                 self.is_registered.store(true, Release);
57             }
58         });
59         register_res?;
60         if self.is_registered.load(Acquire) {
61             Ok(())
62         } else {
63             Err(Error::new(ErrorKind::Other, "Failed to register signal"))
64         }
65     }
66 }
67 
68 struct SignalStream {
69     sender: UnixStream,
70     receiver: UnixStream,
71 }
72 
73 impl Default for SignalStream {
default() -> Self74     fn default() -> Self {
75         let (sender, receiver) = UnixStream::pair()
76             .unwrap_or_else(|e| panic!("failed to create a pair of UnixStream, error: {e}"));
77         Self { sender, receiver }
78     }
79 }
80 
81 pub(crate) struct Registry {
82     stream: SignalStream,
83     events: Vec<Event>,
84 }
85 
86 impl Default for Registry {
default() -> Self87     fn default() -> Self {
88         Self {
89             stream: SignalStream::default(),
90             events: (0..=SignalKind::get_max())
91                 .map(|_| Event::default())
92                 .collect(),
93         }
94     }
95 }
96 
97 impl Registry {
get_instance() -> &'static Registry98     pub(crate) fn get_instance() -> &'static Registry {
99         static mut REGISTRY: MaybeUninit<Registry> = MaybeUninit::uninit();
100         static REGISTRY_ONCE: Once = Once::new();
101         unsafe {
102             REGISTRY_ONCE.call_once(|| {
103                 REGISTRY = MaybeUninit::new(Registry::default());
104             });
105             &*REGISTRY.as_ptr()
106         }
107     }
108 
get_event(&self, event_id: usize) -> &Event109     pub(crate) fn get_event(&self, event_id: usize) -> &Event {
110         // Invalid signal kinds have been forbidden, the scope of signal kinds has been
111         // protected.
112         self.events
113             .get(event_id)
114             .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
115     }
116 
listen_to_event(&self, event_id: usize) -> Receiver<()>117     pub(crate) fn listen_to_event(&self, event_id: usize) -> Receiver<()> {
118         // Invalid signal kinds have been forbidden, the scope of signal kinds has been
119         // protected.
120         self.events
121             .get(event_id)
122             .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
123             .inner
124             .subscribe()
125     }
126 
notify_event(&self, event_id: usize)127     pub(crate) fn notify_event(&self, event_id: usize) {
128         if let Some(event) = self.events.get(event_id) {
129             event.notify.store(true, SeqCst);
130         }
131     }
132 
broadcast(&self)133     pub(crate) fn broadcast(&self) {
134         for event in &self.events {
135             if event.notify.swap(false, SeqCst) {
136                 let _ = event.inner.send(());
137             }
138         }
139     }
140 
write(&self, buf: &[u8]) -> io::Result<usize>141     pub(crate) fn write(&self, buf: &[u8]) -> io::Result<usize> {
142         let mut sender = &self.stream.sender;
143         sender.write(buf)
144     }
145 
try_clone_stream(&self) -> io::Result<UnixStream>146     pub(crate) fn try_clone_stream(&self) -> io::Result<UnixStream> {
147         self.stream.receiver.try_clone()
148     }
149 }
150