1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::fmt::{Debug, Formatter};
15 use std::io;
16 use std::io::{Read, Write};
17 use std::mem::MaybeUninit;
18 use std::ops::Deref;
19 use std::sync::Arc;
20 use std::task::{Context, Poll};
21 
22 #[cfg(target_os = "linux")]
23 use libc::{gid_t, uid_t};
24 use ylong_io::{Interest, Source};
25 
26 use crate::executor::Handle;
27 use crate::io::{poll_ready, ReadBuf};
28 use crate::net::{ReadyEvent, ScheduleIO};
29 use crate::util::slab::Ref;
30 
31 /// Wrapper that turns a sync `Source` io into an async one. This struct
32 /// interacts with the reactor of the runtime.
33 pub(crate) struct AsyncSource<E: Source> {
34     /// Sync io that implements `Source` trait.
35     io: Option<E>,
36 
37     /// Entry list of the runtime's reactor, `AsyncSource` object will be
38     /// registered into it when created.
39     pub(crate) entry: Ref<ScheduleIO>,
40 
41     /// Handle to the IO Driver, used for deregistration
42     pub(crate) handle: Arc<Handle>,
43 }
44 
45 impl<E: Source> AsyncSource<E> {
46     #[cfg(target_os = "linux")]
fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()>47     pub fn fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()> {
48         syscall!(fchown(self.get_fd(), uid, gid))?;
49         Ok(())
50     }
51 
52     /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource`
53     /// object is created, it's fd will be registered into runtime's
54     /// reactor.
55     ///
56     /// If `interest` passed in is None, the interested event for fd
57     /// registration will be both readable and writable.
58     ///
59     /// # Error
60     ///
61     /// If no reactor is found or fd registration fails, an error will be
62     /// returned.
new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>>63     pub fn new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>> {
64         let inner = Handle::get_handle()?;
65 
66         let interest = interest.unwrap_or_else(|| Interest::READABLE | Interest::WRITABLE);
67         let entry = inner.io_register(&mut io, interest)?;
68         Ok(AsyncSource {
69             io: Some(io),
70             entry,
71             handle: inner,
72         })
73     }
74 
75     /// Asynchronously waits for events to happen. If the io returns
76     /// `EWOULDBLOCK`, the readiness of the io will be reset. Otherwise, the
77     /// corresponding event will be returned.
async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R> where F: FnMut() -> io::Result<R>,78     pub(crate) async fn async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R>
79     where
80         F: FnMut() -> io::Result<R>,
81     {
82         loop {
83             let ready = self.entry.readiness(interest).await?;
84             match op() {
85                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
86                     self.entry.clear_readiness(ready);
87                 }
88                 x => return x,
89             }
90         }
91     }
92 
93     #[cfg(target_os = "linux")]
94     cfg_process! {
95         /// Deregisters the io and return it.
96         pub(crate) fn io_take(mut self) -> io::Result<E> {
97             // before AsyncSource drop, io is always Some().
98             let mut io = self.io.take().unwrap();
99             self.handle.io_deregister(&mut io)?;
100             Ok(io)
101         }
102     }
103 
104     cfg_net! {
105         pub(crate) fn poll_ready(
106             &self,
107             cx: &mut Context<'_>,
108             interest: Interest,
109         ) -> Poll<io::Result<ReadyEvent>> {
110             let ready = self.entry.poll_readiness(cx, interest);
111             let x = match ready {
112                 Poll::Ready(x) => x,
113                 Poll::Pending => return Poll::Pending,
114             };
115 
116             Poll::Ready(Ok(x))
117         }
118 
119         pub(crate) fn poll_io<R>(
120             &self,
121             cx: &mut Context<'_>,
122             interest: Interest,
123             mut f: impl FnMut() -> io::Result<R>,
124         ) -> Poll<io::Result<R>> {
125             loop {
126                 let ready = poll_ready!(self.poll_ready(cx, interest))?;
127 
128                 match f() {
129                     Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
130                         self.entry.clear_readiness(ready);
131                     }
132                     x => return Poll::Ready(x),
133                 }
134             }
135         }
136 
137         pub(crate) fn try_io<R> (
138             &self,
139             interest: Interest,
140             mut f: impl FnMut() -> io::Result<R>,
141         ) -> io::Result<R> {
142             let event = self.entry.get_readiness(interest);
143 
144             if event.ready.is_empty() {
145                 return Err(io::ErrorKind::WouldBlock.into());
146             }
147 
148             match f() {
149                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
150                     self.entry.clear_readiness(event);
151                     Err(io::ErrorKind::WouldBlock.into())
152                 }
153                 res => res,
154             }
155         }
156 
157         #[inline]
158         pub(crate) fn poll_read_io<R>(
159             &self,
160             cx: &mut Context<'_>,
161             f: impl FnMut() -> io::Result<R>,
162         ) -> Poll<io::Result<R>> {
163             self.poll_io(cx, Interest::READABLE, f)
164         }
165 
166         #[inline]
167         pub(crate) fn poll_write_io<R>(
168             &self,
169             cx: &mut Context<'_>,
170             f: impl FnMut() -> io::Result<R>,
171         ) -> Poll<io::Result<R>> {
172             self.poll_io(cx, Interest::WRITABLE, f)
173         }
174 
175         pub(crate) fn poll_read<'a>(
176             &'a self,
177             cx: &mut Context<'_>,
178             buf: &mut ReadBuf<'_>,
179         ) -> Poll<io::Result<()>>
180         where
181             &'a E: io::Read + 'a,
182         {
183             let ret = self.poll_read_io(cx, || unsafe {
184                 let slice = &mut *(buf.unfilled_mut() as *mut [MaybeUninit<u8>] as *mut [u8]);
185                 // before AsyncSource drop, io is always Some().
186                 self.io.as_ref().unwrap().read(slice)
187             });
188             let r_len = match ret {
189                 Poll::Ready(Ok(x)) => x,
190                 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
191                 Poll::Pending => return Poll::Pending,
192             };
193             buf.assume_init(r_len);
194             buf.advance(r_len);
195 
196             Poll::Ready(Ok(()))
197         }
198 
199         pub(crate) fn poll_write<'a>(
200             &'a self,
201             cx: &mut Context<'_>,
202             buf: &[u8],
203         ) -> Poll<io::Result<usize>>
204         where
205             &'a E: io::Write + 'a,
206         {
207             self.poll_write_io(cx, || {
208                 // before AsyncSource drop, io is always Some().
209                 self.io.as_ref().unwrap().write(buf)
210             })
211         }
212 
213         pub(crate) fn poll_write_vectored<'a>(
214             &'a self,
215             cx: &mut Context<'_>,
216             bufs: &[io::IoSlice<'_>],
217         ) -> Poll<io::Result<usize>>
218         where
219             &'a E: io::Write + 'a,
220         {
221             self.poll_write_io(cx, || {
222                 // before AsyncSource drop, io is always Some().
223                 self.io.as_ref().unwrap().write_vectored(bufs)
224             })
225         }
226     }
227 }
228 
229 impl<E: Source + Debug> Debug for AsyncSource<E> {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result230     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231         f.debug_struct("AsyncSource").field("io", &self.io).finish()
232     }
233 }
234 
235 impl<E: Source> Deref for AsyncSource<E> {
236     type Target = E;
237 
deref(&self) -> &Self::Target238     fn deref(&self) -> &Self::Target {
239         // before AsyncSource drop, io is always Some().
240         self.io.as_ref().unwrap()
241     }
242 }
243 
244 // Deregisters fd when the `AsyncSource` object get dropped.
245 impl<E: Source> Drop for AsyncSource<E> {
drop(&mut self)246     fn drop(&mut self) {
247         if let Some(mut io) = self.io.take() {
248             let _ = self.handle.io_deregister(&mut io);
249         }
250     }
251 }
252