1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io; 15 use std::io::{Error, ErrorKind, Write}; 16 use std::mem::MaybeUninit; 17 use std::os::raw::c_int; 18 use std::sync::atomic::AtomicBool; 19 use std::sync::atomic::Ordering::{Acquire, Release, SeqCst}; 20 use std::sync::Once; 21 22 use ylong_io::UnixStream; 23 use ylong_signal::register_signal_action; 24 25 use crate::signal::SignalKind; 26 use crate::sync::watch::{channel, Receiver, Sender}; 27 28 pub(crate) struct Event { 29 inner: Sender<()>, 30 notify: AtomicBool, 31 once: Once, 32 is_registered: AtomicBool, 33 } 34 35 impl Default for Event { default() -> Self36 fn default() -> Self { 37 let (tx, _) = channel(()); 38 Self { 39 inner: tx, 40 notify: AtomicBool::new(false), 41 once: Once::new(), 42 is_registered: AtomicBool::new(false), 43 } 44 } 45 } 46 47 impl Event { register<F>(&self, signal_kind: c_int, f: F) -> io::Result<()> where F: Fn() + Sync + Send + 'static,48 pub(crate) fn register<F>(&self, signal_kind: c_int, f: F) -> io::Result<()> 49 where 50 F: Fn() + Sync + Send + 'static, 51 { 52 let mut register_res = Ok(()); 53 self.once.call_once(|| { 54 register_res = unsafe { register_signal_action(signal_kind, f) }; 55 if register_res.is_ok() { 56 self.is_registered.store(true, Release); 57 } 58 }); 59 register_res?; 60 if self.is_registered.load(Acquire) { 61 Ok(()) 62 } else { 63 Err(Error::new(ErrorKind::Other, "Failed to register signal")) 64 } 65 } 66 } 67 68 struct SignalStream { 69 sender: UnixStream, 70 receiver: UnixStream, 71 } 72 73 impl Default for SignalStream { default() -> Self74 fn default() -> Self { 75 let (sender, receiver) = UnixStream::pair() 76 .unwrap_or_else(|e| panic!("failed to create a pair of UnixStream, error: {e}")); 77 Self { sender, receiver } 78 } 79 } 80 81 pub(crate) struct Registry { 82 stream: SignalStream, 83 events: Vec<Event>, 84 } 85 86 impl Default for Registry { default() -> Self87 fn default() -> Self { 88 Self { 89 stream: SignalStream::default(), 90 events: (0..=SignalKind::get_max()) 91 .map(|_| Event::default()) 92 .collect(), 93 } 94 } 95 } 96 97 impl Registry { get_instance() -> &'static Registry98 pub(crate) fn get_instance() -> &'static Registry { 99 static mut REGISTRY: MaybeUninit<Registry> = MaybeUninit::uninit(); 100 static REGISTRY_ONCE: Once = Once::new(); 101 unsafe { 102 REGISTRY_ONCE.call_once(|| { 103 REGISTRY = MaybeUninit::new(Registry::default()); 104 }); 105 &*REGISTRY.as_ptr() 106 } 107 } 108 get_event(&self, event_id: usize) -> &Event109 pub(crate) fn get_event(&self, event_id: usize) -> &Event { 110 // Invalid signal kinds have been forbidden, the scope of signal kinds has been 111 // protected. 112 self.events 113 .get(event_id) 114 .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) 115 } 116 listen_to_event(&self, event_id: usize) -> Receiver<()>117 pub(crate) fn listen_to_event(&self, event_id: usize) -> Receiver<()> { 118 // Invalid signal kinds have been forbidden, the scope of signal kinds has been 119 // protected. 120 self.events 121 .get(event_id) 122 .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) 123 .inner 124 .subscribe() 125 } 126 notify_event(&self, event_id: usize)127 pub(crate) fn notify_event(&self, event_id: usize) { 128 if let Some(event) = self.events.get(event_id) { 129 event.notify.store(true, SeqCst); 130 } 131 } 132 broadcast(&self)133 pub(crate) fn broadcast(&self) { 134 for event in &self.events { 135 if event.notify.swap(false, SeqCst) { 136 let _ = event.inner.send(()); 137 } 138 } 139 } 140 write(&self, buf: &[u8]) -> io::Result<usize>141 pub(crate) fn write(&self, buf: &[u8]) -> io::Result<usize> { 142 let mut sender = &self.stream.sender; 143 sender.write(buf) 144 } 145 try_clone_stream(&self) -> io::Result<UnixStream>146 pub(crate) fn try_clone_stream(&self) -> io::Result<UnixStream> { 147 self.stream.receiver.try_clone() 148 } 149 } 150