1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io::{Read, Write};
15 use std::{cmp, io};
16 
17 use crate::io::ReadBuf;
18 use crate::task::JoinHandle;
19 
20 const MAX_BUF: usize = 2 * 1024 * 1024;
21 
22 pub(crate) enum State<T> {
23     Idle(Option<BufInner>),
24     Poll(JoinHandle<(io::Result<usize>, BufInner, T)>),
25 }
26 
27 impl<T> State<T> {
init() -> Self28     pub(crate) fn init() -> Self {
29         State::Idle(Some(BufInner::new()))
30     }
31 }
32 
33 pub(crate) struct BufInner {
34     inner: Vec<u8>,
35     pos: usize,
36 }
37 
38 impl BufInner {
new() -> Self39     fn new() -> Self {
40         BufInner {
41             inner: Vec::with_capacity(0),
42             pos: 0,
43         }
44     }
45 
is_empty(&self) -> bool46     pub(crate) fn is_empty(&self) -> bool {
47         self.len() == 0
48     }
49 
len(&self) -> usize50     pub(crate) fn len(&self) -> usize {
51         self.inner.len() - self.pos
52     }
53 
bytes(&self) -> &[u8]54     fn bytes(&self) -> &[u8] {
55         &self.inner[self.pos..]
56     }
57 
set_len(&mut self, buf: &mut ReadBuf<'_>)58     pub(crate) fn set_len(&mut self, buf: &mut ReadBuf<'_>) {
59         let len = cmp::min(buf.remaining(), MAX_BUF);
60         if self.inner.len() < len {
61             self.inner.reserve(len - self.len());
62         }
63         unsafe {
64             self.inner.set_len(len);
65         }
66     }
67 
clone_from(&mut self, buf: &[u8]) -> usize68     pub(crate) fn clone_from(&mut self, buf: &[u8]) -> usize {
69         let n = cmp::min(buf.len(), MAX_BUF);
70         self.inner.extend_from_slice(&buf[..n]);
71         n
72     }
73 
clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize74     pub(crate) fn clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize {
75         let n = cmp::min(self.len(), buf.remaining());
76         buf.append(&self.bytes()[..n]);
77         self.pos += n;
78 
79         if self.pos == self.inner.len() {
80             self.inner.truncate(0);
81             self.pos = 0;
82         }
83         n
84     }
85 
read_from<T: Read>(&mut self, std: &mut T) -> io::Result<usize>86     pub(crate) fn read_from<T: Read>(&mut self, std: &mut T) -> io::Result<usize> {
87         let res = loop {
88             match std.read(&mut self.inner) {
89                 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
90                 res => break res,
91             }
92         };
93 
94         match res {
95             Ok(n) => self.inner.truncate(n),
96             Err(_) => self.inner.clear(),
97         }
98 
99         res
100     }
101 
write_into<T: Write>(&mut self, std: &mut T) -> io::Result<()>102     pub(crate) fn write_into<T: Write>(&mut self, std: &mut T) -> io::Result<()> {
103         let res = std.write_all(&self.inner);
104         self.inner.clear();
105         res
106     }
107 }
108 
109 macro_rules! std_async_write {
110     () => {
111         fn poll_write(
112             mut self: Pin<&mut Self>,
113             cx: &mut Context<'_>,
114             buf: &[u8],
115         ) -> Poll<io::Result<usize>> {
116             loop {
117                 match self.state {
118                     State::Idle(ref mut buf_op) => {
119                         let mut buf_inner = buf_op.take().unwrap();
120 
121                         if !buf_inner.is_empty() {
122                             return Poll::Ready(Err(io::Error::new(
123                                 io::ErrorKind::AlreadyExists,
124                                 "inner Buf must be empty before poll!",
125                             )));
126                         }
127 
128                         let n = buf_inner.clone_from(buf);
129 
130                         let mut std = self.std.take().unwrap();
131 
132                         let handle = spawn_blocking(move || {
133                             let res = buf_inner.write_into(&mut std).map(|_| n);
134 
135                             (res, buf_inner, std)
136                         });
137 
138                         self.state = State::Poll(handle);
139                         self.has_written = true;
140                     }
141                     State::Poll(ref mut join_handle) => {
142                         let (res, buf_inner, std) = match Pin::new(join_handle).poll(cx)? {
143                             Poll::Ready(t) => t,
144                             Poll::Pending => return Poll::Pending,
145                         };
146                         self.state = State::Idle(Some(buf_inner));
147                         self.std = Some(std);
148 
149                         let n = res?;
150                         return Poll::Ready(Ok(n));
151                     }
152                 }
153             }
154         }
155 
156         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
157             loop {
158                 let has_written = self.has_written;
159                 match self.state {
160                     State::Idle(ref mut buf_cell) => {
161                         if !has_written {
162                             return Poll::Ready(Ok(()));
163                         }
164                         let buf = buf_cell.take().unwrap();
165                         let mut inner = self.std.take().unwrap();
166 
167                         self.state = State::Poll(spawn_blocking(move || {
168                             let res = inner.flush().map(|_| 0);
169                             (res, buf, inner)
170                         }));
171 
172                         self.has_written = false;
173                     }
174                     State::Poll(ref mut join_handle) => {
175                         let (res, buf, std) = match Pin::new(join_handle).poll(cx)? {
176                             Poll::Ready(t) => t,
177                             Poll::Pending => return Poll::Pending,
178                         };
179                         self.state = State::Idle(Some(buf));
180                         self.std = Some(std);
181 
182                         res?;
183                     }
184                 }
185             }
186         }
187 
188         fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
189             Poll::Ready(Ok(()))
190         }
191     };
192 }
193 pub(crate) use std_async_write;
194 
195 #[cfg(test)]
196 mod test {
197     #[cfg(unix)]
198     use std::os::fd::{AsFd, AsRawFd};
199 
200     use crate::io::stdio::BufInner;
201     use crate::io::{AsyncWriteExt, ReadBuf};
202 
203     /// UT test cases for `stdout` and `stderr``.
204     ///
205     /// # Brief
206     /// 1. create a `stdout` and a `stderr`.
207     /// 2. write something into `stdout` and `stderr`.
208     /// 3. check operation is ok.
209     #[test]
ut_test_stdio_basic()210     fn ut_test_stdio_basic() {
211         let mut buf_inner = BufInner::new();
212         assert_eq!(buf_inner.pos, 0);
213         assert!(buf_inner.inner.is_empty());
214         assert!(buf_inner.is_empty());
215 
216         let mut buf = [1; 10];
217         let mut read_buf = ReadBuf::new(&mut buf);
218         buf_inner.set_len(&mut read_buf);
219         assert_eq!(buf_inner.len(), 10);
220 
221         let mut buf = [0; 20];
222         let mut read_buf = ReadBuf::new(&mut buf);
223         let n = buf_inner.clone_into(&mut read_buf);
224         assert_eq!(n, 10);
225     }
226 
227     /// UT test cases for `stdout` and `stderr``.
228     ///
229     /// # Brief
230     /// 1. create a `stdout` and a `stderr`.
231     /// 2. write something into `stdout` and `stderr`.
232     /// 3. check operation is ok.
233     #[test]
ut_test_stdio_write()234     fn ut_test_stdio_write() {
235         let handle = crate::spawn(async {
236             let mut stdout = crate::io::stdout();
237             #[cfg(unix)]
238             assert!(stdout.as_fd().as_raw_fd() >= 0);
239             #[cfg(unix)]
240             assert!(stdout.as_raw_fd() >= 0);
241             let res = stdout.write_all(b"something").await;
242             assert!(res.is_ok());
243             let res = stdout.flush().await;
244             assert!(res.is_ok());
245             let res = stdout.shutdown().await;
246             assert!(res.is_ok());
247 
248             let mut stderr = crate::io::stderr();
249             #[cfg(unix)]
250             assert!(stderr.as_fd().as_raw_fd() >= 0);
251             #[cfg(unix)]
252             assert!(stderr.as_raw_fd() >= 0);
253             let res = stderr.write_all(b"something").await;
254             assert!(res.is_ok());
255             let res = stderr.flush().await;
256             assert!(res.is_ok());
257             let res = stderr.shutdown().await;
258             assert!(res.is_ok());
259         });
260         let _ = crate::block_on(handle);
261     }
262 }
263