1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io::{Read, Write}; 15 use std::{cmp, io}; 16 17 use crate::io::ReadBuf; 18 use crate::task::JoinHandle; 19 20 const MAX_BUF: usize = 2 * 1024 * 1024; 21 22 pub(crate) enum State<T> { 23 Idle(Option<BufInner>), 24 Poll(JoinHandle<(io::Result<usize>, BufInner, T)>), 25 } 26 27 impl<T> State<T> { init() -> Self28 pub(crate) fn init() -> Self { 29 State::Idle(Some(BufInner::new())) 30 } 31 } 32 33 pub(crate) struct BufInner { 34 inner: Vec<u8>, 35 pos: usize, 36 } 37 38 impl BufInner { new() -> Self39 fn new() -> Self { 40 BufInner { 41 inner: Vec::with_capacity(0), 42 pos: 0, 43 } 44 } 45 is_empty(&self) -> bool46 pub(crate) fn is_empty(&self) -> bool { 47 self.len() == 0 48 } 49 len(&self) -> usize50 pub(crate) fn len(&self) -> usize { 51 self.inner.len() - self.pos 52 } 53 bytes(&self) -> &[u8]54 fn bytes(&self) -> &[u8] { 55 &self.inner[self.pos..] 56 } 57 set_len(&mut self, buf: &mut ReadBuf<'_>)58 pub(crate) fn set_len(&mut self, buf: &mut ReadBuf<'_>) { 59 let len = cmp::min(buf.remaining(), MAX_BUF); 60 if self.inner.len() < len { 61 self.inner.reserve(len - self.len()); 62 } 63 unsafe { 64 self.inner.set_len(len); 65 } 66 } 67 clone_from(&mut self, buf: &[u8]) -> usize68 pub(crate) fn clone_from(&mut self, buf: &[u8]) -> usize { 69 let n = cmp::min(buf.len(), MAX_BUF); 70 self.inner.extend_from_slice(&buf[..n]); 71 n 72 } 73 clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize74 pub(crate) fn clone_into(&mut self, buf: &mut ReadBuf<'_>) -> usize { 75 let n = cmp::min(self.len(), buf.remaining()); 76 buf.append(&self.bytes()[..n]); 77 self.pos += n; 78 79 if self.pos == self.inner.len() { 80 self.inner.truncate(0); 81 self.pos = 0; 82 } 83 n 84 } 85 read_from<T: Read>(&mut self, std: &mut T) -> io::Result<usize>86 pub(crate) fn read_from<T: Read>(&mut self, std: &mut T) -> io::Result<usize> { 87 let res = loop { 88 match std.read(&mut self.inner) { 89 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} 90 res => break res, 91 } 92 }; 93 94 match res { 95 Ok(n) => self.inner.truncate(n), 96 Err(_) => self.inner.clear(), 97 } 98 99 res 100 } 101 write_into<T: Write>(&mut self, std: &mut T) -> io::Result<()>102 pub(crate) fn write_into<T: Write>(&mut self, std: &mut T) -> io::Result<()> { 103 let res = std.write_all(&self.inner); 104 self.inner.clear(); 105 res 106 } 107 } 108 109 macro_rules! std_async_write { 110 () => { 111 fn poll_write( 112 mut self: Pin<&mut Self>, 113 cx: &mut Context<'_>, 114 buf: &[u8], 115 ) -> Poll<io::Result<usize>> { 116 loop { 117 match self.state { 118 State::Idle(ref mut buf_op) => { 119 let mut buf_inner = buf_op.take().unwrap(); 120 121 if !buf_inner.is_empty() { 122 return Poll::Ready(Err(io::Error::new( 123 io::ErrorKind::AlreadyExists, 124 "inner Buf must be empty before poll!", 125 ))); 126 } 127 128 let n = buf_inner.clone_from(buf); 129 130 let mut std = self.std.take().unwrap(); 131 132 let handle = spawn_blocking(move || { 133 let res = buf_inner.write_into(&mut std).map(|_| n); 134 135 (res, buf_inner, std) 136 }); 137 138 self.state = State::Poll(handle); 139 self.has_written = true; 140 } 141 State::Poll(ref mut join_handle) => { 142 let (res, buf_inner, std) = match Pin::new(join_handle).poll(cx)? { 143 Poll::Ready(t) => t, 144 Poll::Pending => return Poll::Pending, 145 }; 146 self.state = State::Idle(Some(buf_inner)); 147 self.std = Some(std); 148 149 let n = res?; 150 return Poll::Ready(Ok(n)); 151 } 152 } 153 } 154 } 155 156 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 157 loop { 158 let has_written = self.has_written; 159 match self.state { 160 State::Idle(ref mut buf_cell) => { 161 if !has_written { 162 return Poll::Ready(Ok(())); 163 } 164 let buf = buf_cell.take().unwrap(); 165 let mut inner = self.std.take().unwrap(); 166 167 self.state = State::Poll(spawn_blocking(move || { 168 let res = inner.flush().map(|_| 0); 169 (res, buf, inner) 170 })); 171 172 self.has_written = false; 173 } 174 State::Poll(ref mut join_handle) => { 175 let (res, buf, std) = match Pin::new(join_handle).poll(cx)? { 176 Poll::Ready(t) => t, 177 Poll::Pending => return Poll::Pending, 178 }; 179 self.state = State::Idle(Some(buf)); 180 self.std = Some(std); 181 182 res?; 183 } 184 } 185 } 186 } 187 188 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 189 Poll::Ready(Ok(())) 190 } 191 }; 192 } 193 pub(crate) use std_async_write; 194 195 #[cfg(test)] 196 mod test { 197 #[cfg(unix)] 198 use std::os::fd::{AsFd, AsRawFd}; 199 200 use crate::io::stdio::BufInner; 201 use crate::io::{AsyncWriteExt, ReadBuf}; 202 203 /// UT test cases for `stdout` and `stderr``. 204 /// 205 /// # Brief 206 /// 1. create a `stdout` and a `stderr`. 207 /// 2. write something into `stdout` and `stderr`. 208 /// 3. check operation is ok. 209 #[test] ut_test_stdio_basic()210 fn ut_test_stdio_basic() { 211 let mut buf_inner = BufInner::new(); 212 assert_eq!(buf_inner.pos, 0); 213 assert!(buf_inner.inner.is_empty()); 214 assert!(buf_inner.is_empty()); 215 216 let mut buf = [1; 10]; 217 let mut read_buf = ReadBuf::new(&mut buf); 218 buf_inner.set_len(&mut read_buf); 219 assert_eq!(buf_inner.len(), 10); 220 221 let mut buf = [0; 20]; 222 let mut read_buf = ReadBuf::new(&mut buf); 223 let n = buf_inner.clone_into(&mut read_buf); 224 assert_eq!(n, 10); 225 } 226 227 /// UT test cases for `stdout` and `stderr``. 228 /// 229 /// # Brief 230 /// 1. create a `stdout` and a `stderr`. 231 /// 2. write something into `stdout` and `stderr`. 232 /// 3. check operation is ok. 233 #[test] ut_test_stdio_write()234 fn ut_test_stdio_write() { 235 let handle = crate::spawn(async { 236 let mut stdout = crate::io::stdout(); 237 #[cfg(unix)] 238 assert!(stdout.as_fd().as_raw_fd() >= 0); 239 #[cfg(unix)] 240 assert!(stdout.as_raw_fd() >= 0); 241 let res = stdout.write_all(b"something").await; 242 assert!(res.is_ok()); 243 let res = stdout.flush().await; 244 assert!(res.is_ok()); 245 let res = stdout.shutdown().await; 246 assert!(res.is_ok()); 247 248 let mut stderr = crate::io::stderr(); 249 #[cfg(unix)] 250 assert!(stderr.as_fd().as_raw_fd() >= 0); 251 #[cfg(unix)] 252 assert!(stderr.as_raw_fd() >= 0); 253 let res = stderr.write_all(b"something").await; 254 assert!(res.is_ok()); 255 let res = stderr.flush().await; 256 assert!(res.is_ok()); 257 let res = stderr.shutdown().await; 258 assert!(res.is_ok()); 259 }); 260 let _ = crate::block_on(handle); 261 } 262 } 263