1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::ops::DerefMut;
16 use std::pin::Pin;
17 use std::task::{Context, Poll};
18 
19 use crate::io::read_task::{LinesTask, ReadLineTask, ReadUtilTask, SplitTask};
20 use crate::io::AsyncRead;
21 
22 /// It is an asynchronous version of [`std::io::BufRead`].
23 ///
24 /// A `AsyncBufRead` is a type of `AsyncRead`er which has an internal buffer,
25 /// allowing it to perform extra ways of reading, such as `read_line`.
26 pub trait AsyncBufRead: AsyncRead {
27     /// Returns the contents of the internal buffer, trying to fill it with more
28     /// data from the inner reader if it is empty.
29     ///
30     /// This method is non-blocking. If the underlying reader is unable to
31     /// perform a read at the time, this method would return a
32     /// `Poll::Pending`. If there is data inside the buffer or the read is
33     /// successfully performed, then it would return a `Poll::Ready(&[u8])`.
34     ///
35     /// This method is a low-level call. It needs to be paired up with calls to
36     /// [`Self::consume`] method to function properly.
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>37     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
38 
39     /// Tells this buffer that `amt` bytes have been consumed from the buffer,
40     /// so they should no longer be returned in calls to `read`.
41     ///
42     /// This method is a low-level call. It has to be called after a call to
43     /// [`Self::poll_fill_buf`] in order to function properly.
44     ///
45     /// The `amt` must be `<=` the number of bytes in the buffer returned by
46     /// [`Self::poll_fill_buf`]
consume(self: Pin<&mut Self>, amt: usize)47     fn consume(self: Pin<&mut Self>, amt: usize);
48 }
49 
50 // Auto-implementation for Box object
51 impl<T: AsyncBufRead + Unpin + ?Sized> AsyncBufRead for Box<T> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>52     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
53         Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
54     }
55 
consume(mut self: Pin<&mut Self>, amt: usize)56     fn consume(mut self: Pin<&mut Self>, amt: usize) {
57         Pin::new(&mut **self).consume(amt)
58     }
59 }
60 
61 // Auto-implementation for mutable reference.
62 impl<T: AsyncBufRead + Unpin + ?Sized> AsyncBufRead for &mut T {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>63     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
64         Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
65     }
66 
consume(mut self: Pin<&mut Self>, amt: usize)67     fn consume(mut self: Pin<&mut Self>, amt: usize) {
68         Pin::new(&mut **self).consume(amt)
69     }
70 }
71 
72 // Auto-implementation for Pinned object.
73 impl<T> AsyncBufRead for Pin<T>
74 where
75     T: DerefMut + Unpin,
76     T::Target: AsyncBufRead,
77 {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>78     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
79         self.get_mut().as_mut().poll_fill_buf(cx)
80     }
81 
consume(self: Pin<&mut Self>, amt: usize)82     fn consume(self: Pin<&mut Self>, amt: usize) {
83         self.get_mut().as_mut().consume(amt)
84     }
85 }
86 
87 /// An external trait that is automatically implemented for any object that has
88 /// the AsyncBufRead trait. Provides std-like reading methods such as
89 /// `read_until`, `read_line`, `split`, `lines`. Every method in this trait
90 /// returns a future object. Awaits on the future will complete the task, but it
91 /// doesn't guarantee whether the task will finished immediately or
92 /// asynchronously.
93 pub trait AsyncBufReadExt: AsyncBufRead {
94     /// Asynchronously reads data from the underlying stream into the `buf`
95     /// until the desired delimiter appears or EOF is reached.
96     ///
97     /// If successful, this function will return the total number of bytes read.
98     ///
99     /// # Examples
100     /// ```no run
101     /// let mut file = File::open("foo.txt").await?;
102     /// let mut res = vec![];
103     /// let mut buf_reader = AsyncBufReader::new(file);
104     /// let ret = buf_reader.read_until(b':', &mut res).await?;
105     /// ```
read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, Self> where Self: Unpin,106     fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, Self>
107     where
108         Self: Unpin,
109     {
110         ReadUtilTask::new(self, byte, buf)
111     }
112 
113     /// Asynchronously reads data from the underlying stream into the `buf`
114     /// until the delimiter '\n' appears or EOF is reached.
115     ///
116     /// If successful, this function will return the total number of bytes read.
117     ///
118     /// # Examples
119     /// ```no run
120     /// let mut file = File::open("foo.txt").await?;
121     /// let mut res = String::new();
122     /// let mut buf_reader = AsyncBufReader::new(file);
123     /// let ret = buf_reader.read_line(&mut res).await?;
124     /// ```
read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineTask<'a, Self> where Self: Unpin,125     fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineTask<'a, Self>
126     where
127         Self: Unpin,
128     {
129         ReadLineTask::new(self, buf)
130     }
131 
132     /// Asynchronously reads data from the underlying stream until EOF is
133     /// reached and splits it on a delimiter.
134     ///
135     /// # Examples
136     /// ```no run
137     /// let mut file = File::open("foo.txt").await?;
138     /// let mut buf_reader = AsyncBufReader::new(file);
139     /// let mut segments = buf_reader.split(b'-');
140     /// assert!(segments.next().await?.is_some());
141     /// ```
split(self, byte: u8) -> SplitTask<Self> where Self: Sized + Unpin,142     fn split(self, byte: u8) -> SplitTask<Self>
143     where
144         Self: Sized + Unpin,
145     {
146         SplitTask::new(self, byte)
147     }
148 
149     /// Asynchronously reads data from the underlying stream until EOF is
150     /// reached and splits it on a delimiter.
151     ///
152     /// # Examples
153     /// ```no run
154     /// let mut file = File::open("foo.txt").await?;
155     /// let mut buf_reader = AsyncBufReader::new(file);
156     /// let mut segments = buf_reader.lines();
157     /// assert!(segments.next_line().await?.is_some());
158     /// ```
lines(self) -> LinesTask<Self> where Self: Sized,159     fn lines(self) -> LinesTask<Self>
160     where
161         Self: Sized,
162     {
163         LinesTask::new(self)
164     }
165 }
166 
167 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
168