1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::fmt::{Debug, Formatter}; 15 use std::io; 16 use std::io::{Read, Write}; 17 use std::mem::MaybeUninit; 18 use std::ops::Deref; 19 use std::sync::Arc; 20 use std::task::{Context, Poll}; 21 22 #[cfg(target_os = "linux")] 23 use libc::{gid_t, uid_t}; 24 use ylong_io::{Interest, Source}; 25 26 use crate::executor::Handle; 27 use crate::io::{poll_ready, ReadBuf}; 28 use crate::net::{ReadyEvent, ScheduleIO}; 29 use crate::util::slab::Ref; 30 31 /// Wrapper that turns a sync `Source` io into an async one. This struct 32 /// interacts with the reactor of the runtime. 33 pub(crate) struct AsyncSource<E: Source> { 34 /// Sync io that implements `Source` trait. 35 io: Option<E>, 36 37 /// Entry list of the runtime's reactor, `AsyncSource` object will be 38 /// registered into it when created. 39 pub(crate) entry: Ref<ScheduleIO>, 40 41 /// Handle to the IO Driver, used for deregistration 42 pub(crate) handle: Arc<Handle>, 43 } 44 45 impl<E: Source> AsyncSource<E> { 46 #[cfg(target_os = "linux")] fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()>47 pub fn fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()> { 48 syscall!(fchown(self.get_fd(), uid, gid))?; 49 Ok(()) 50 } 51 52 /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource` 53 /// object is created, it's fd will be registered into runtime's 54 /// reactor. 55 /// 56 /// If `interest` passed in is None, the interested event for fd 57 /// registration will be both readable and writable. 58 /// 59 /// # Error 60 /// 61 /// If no reactor is found or fd registration fails, an error will be 62 /// returned. new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>>63 pub fn new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>> { 64 let inner = Handle::get_handle()?; 65 66 let interest = interest.unwrap_or_else(|| Interest::READABLE | Interest::WRITABLE); 67 let entry = inner.io_register(&mut io, interest)?; 68 Ok(AsyncSource { 69 io: Some(io), 70 entry, 71 handle: inner, 72 }) 73 } 74 75 /// Asynchronously waits for events to happen. If the io returns 76 /// `EWOULDBLOCK`, the readiness of the io will be reset. Otherwise, the 77 /// corresponding event will be returned. async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R> where F: FnMut() -> io::Result<R>,78 pub(crate) async fn async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R> 79 where 80 F: FnMut() -> io::Result<R>, 81 { 82 loop { 83 let ready = self.entry.readiness(interest).await?; 84 match op() { 85 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 86 self.entry.clear_readiness(ready); 87 } 88 x => return x, 89 } 90 } 91 } 92 93 #[cfg(target_os = "linux")] 94 cfg_process! { 95 /// Deregisters the io and return it. 96 pub(crate) fn io_take(mut self) -> io::Result<E> { 97 // before AsyncSource drop, io is always Some(). 98 let mut io = self.io.take().unwrap(); 99 self.handle.io_deregister(&mut io)?; 100 Ok(io) 101 } 102 } 103 104 cfg_net! { 105 pub(crate) fn poll_ready( 106 &self, 107 cx: &mut Context<'_>, 108 interest: Interest, 109 ) -> Poll<io::Result<ReadyEvent>> { 110 let ready = self.entry.poll_readiness(cx, interest); 111 let x = match ready { 112 Poll::Ready(x) => x, 113 Poll::Pending => return Poll::Pending, 114 }; 115 116 Poll::Ready(Ok(x)) 117 } 118 119 pub(crate) fn poll_io<R>( 120 &self, 121 cx: &mut Context<'_>, 122 interest: Interest, 123 mut f: impl FnMut() -> io::Result<R>, 124 ) -> Poll<io::Result<R>> { 125 loop { 126 let ready = poll_ready!(self.poll_ready(cx, interest))?; 127 128 match f() { 129 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 130 self.entry.clear_readiness(ready); 131 } 132 x => return Poll::Ready(x), 133 } 134 } 135 } 136 137 pub(crate) fn try_io<R> ( 138 &self, 139 interest: Interest, 140 mut f: impl FnMut() -> io::Result<R>, 141 ) -> io::Result<R> { 142 let event = self.entry.get_readiness(interest); 143 144 if event.ready.is_empty() { 145 return Err(io::ErrorKind::WouldBlock.into()); 146 } 147 148 match f() { 149 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 150 self.entry.clear_readiness(event); 151 Err(io::ErrorKind::WouldBlock.into()) 152 } 153 res => res, 154 } 155 } 156 157 #[inline] 158 pub(crate) fn poll_read_io<R>( 159 &self, 160 cx: &mut Context<'_>, 161 f: impl FnMut() -> io::Result<R>, 162 ) -> Poll<io::Result<R>> { 163 self.poll_io(cx, Interest::READABLE, f) 164 } 165 166 #[inline] 167 pub(crate) fn poll_write_io<R>( 168 &self, 169 cx: &mut Context<'_>, 170 f: impl FnMut() -> io::Result<R>, 171 ) -> Poll<io::Result<R>> { 172 self.poll_io(cx, Interest::WRITABLE, f) 173 } 174 175 pub(crate) fn poll_read<'a>( 176 &'a self, 177 cx: &mut Context<'_>, 178 buf: &mut ReadBuf<'_>, 179 ) -> Poll<io::Result<()>> 180 where 181 &'a E: io::Read + 'a, 182 { 183 let ret = self.poll_read_io(cx, || unsafe { 184 let slice = &mut *(buf.unfilled_mut() as *mut [MaybeUninit<u8>] as *mut [u8]); 185 // before AsyncSource drop, io is always Some(). 186 self.io.as_ref().unwrap().read(slice) 187 }); 188 let r_len = match ret { 189 Poll::Ready(Ok(x)) => x, 190 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), 191 Poll::Pending => return Poll::Pending, 192 }; 193 buf.assume_init(r_len); 194 buf.advance(r_len); 195 196 Poll::Ready(Ok(())) 197 } 198 199 pub(crate) fn poll_write<'a>( 200 &'a self, 201 cx: &mut Context<'_>, 202 buf: &[u8], 203 ) -> Poll<io::Result<usize>> 204 where 205 &'a E: io::Write + 'a, 206 { 207 self.poll_write_io(cx, || { 208 // before AsyncSource drop, io is always Some(). 209 self.io.as_ref().unwrap().write(buf) 210 }) 211 } 212 213 pub(crate) fn poll_write_vectored<'a>( 214 &'a self, 215 cx: &mut Context<'_>, 216 bufs: &[io::IoSlice<'_>], 217 ) -> Poll<io::Result<usize>> 218 where 219 &'a E: io::Write + 'a, 220 { 221 self.poll_write_io(cx, || { 222 // before AsyncSource drop, io is always Some(). 223 self.io.as_ref().unwrap().write_vectored(bufs) 224 }) 225 } 226 } 227 } 228 229 impl<E: Source + Debug> Debug for AsyncSource<E> { fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 231 f.debug_struct("AsyncSource").field("io", &self.io).finish() 232 } 233 } 234 235 impl<E: Source> Deref for AsyncSource<E> { 236 type Target = E; 237 deref(&self) -> &Self::Target238 fn deref(&self) -> &Self::Target { 239 // before AsyncSource drop, io is always Some(). 240 self.io.as_ref().unwrap() 241 } 242 } 243 244 // Deregisters fd when the `AsyncSource` object get dropped. 245 impl<E: Source> Drop for AsyncSource<E> { drop(&mut self)246 fn drop(&mut self) { 247 if let Some(mut io) = self.io.take() { 248 let _ = self.handle.io_deregister(&mut io); 249 } 250 } 251 } 252