1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Unbounded channel
15 
16 pub(crate) mod queue;
17 
18 use std::task::{Context, Poll};
19 
20 use crate::futures::poll_fn;
21 use crate::sync::error::{RecvError, SendError, TryRecvError};
22 use crate::sync::mpsc::unbounded::queue::Queue;
23 use crate::sync::mpsc::{channel, Container, Rx, Tx};
24 
25 cfg_time!(
26     use crate::time::timeout;
27     use std::time::Duration;
28     use crate::sync::error::RecvTimeoutError;
29 );
30 /// The sender of unbounded channel.
31 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
32 /// the [`unbounded_channel`] function.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
38 /// async fn io_func() {
39 ///     let (tx, mut rx) = unbounded_channel();
40 ///     let tx2 = tx.clone();
41 ///     assert!(tx.send(1).is_ok());
42 ///     assert!(!tx.is_closed());
43 ///     assert!(tx.is_same(&tx2));
44 ///     let handle = ylong_runtime::spawn(async move {
45 ///         assert_eq!(rx.recv().await, Ok(1));
46 ///     });
47 /// }
48 /// ```
49 pub struct UnboundedSender<T> {
50     channel: Tx<Queue<T>>,
51 }
52 
53 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self54     fn clone(&self) -> Self {
55         UnboundedSender {
56             channel: self.channel.clone(),
57         }
58     }
59 }
60 
61 /// The receiver of unbounded channel.
62 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
63 /// the [`unbounded_channel`] function.
64 ///
65 /// # Examples
66 ///
67 /// ```
68 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
69 /// async fn io_func() {
70 ///     let (tx, mut rx) = unbounded_channel();
71 ///     assert!(rx.try_recv().is_err());
72 ///     assert!(tx.send(1).is_ok());
73 ///     let handle = ylong_runtime::spawn(async move {
74 ///         assert_eq!(rx.len(), 1);
75 ///         assert_eq!(rx.recv().await, Ok(1));
76 ///     });
77 /// }
78 /// ```
79 pub struct UnboundedReceiver<T> {
80     channel: Rx<Queue<T>>,
81 }
82 
83 /// Creates a new mpsc channel and returns a `Sender` and `Receiver` handle
84 /// pair.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
90 /// async fn io_func() {
91 ///     let (tx, mut rx) = unbounded_channel();
92 ///     let handle = ylong_runtime::spawn(async move {
93 ///         assert_eq!(rx.recv().await, Ok(1));
94 ///     });
95 ///     assert!(tx.send(1).is_ok());
96 /// }
97 /// ```
unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)98 pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
99     let queue = Queue::new();
100     let (tx, rx) = channel(queue);
101     (UnboundedSender::new(tx), UnboundedReceiver::new(rx))
102 }
103 
104 impl<T> UnboundedSender<T> {
new(channel: Tx<Queue<T>>) -> UnboundedSender<T>105     fn new(channel: Tx<Queue<T>>) -> UnboundedSender<T> {
106         UnboundedSender { channel }
107     }
108 
109     /// Sends values to the associated receiver.
110     ///
111     /// An error containing the sent value would be returned if the receiver is
112     /// closed or dropped.
113     ///
114     /// # Examples
115     ///
116     /// ```
117     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
118     /// let (tx, mut rx) = unbounded_channel();
119     /// assert!(tx.send(1).is_ok());
120     /// assert_eq!(rx.try_recv().unwrap(), 1);
121     /// ```
send(&self, value: T) -> Result<(), SendError<T>>122     pub fn send(&self, value: T) -> Result<(), SendError<T>> {
123         self.channel.send(value)
124     }
125 
126     /// Checks whether the channel is closed. If so, the sender could not
127     /// send values anymore. It returns true if the [`UnboundedReceiver`] is
128     /// dropped or calls the [`close`] method.
129     ///
130     /// [`close`]: UnboundedReceiver::close
131     ///
132     /// # Examples
133     ///
134     /// ```
135     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
136     /// let (tx, rx) = unbounded_channel::<isize>();
137     /// assert!(!tx.is_closed());
138     /// drop(rx);
139     /// assert!(tx.is_closed());
140     /// ```
is_closed(&self) -> bool141     pub fn is_closed(&self) -> bool {
142         self.channel.is_close()
143     }
144 
145     /// Checks whether the sender and another sender belong to the same channel.
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
151     /// let (tx, rx) = unbounded_channel::<isize>();
152     /// let tx2 = tx.clone();
153     /// assert!(tx.is_same(&tx2));
154     /// ```
is_same(&self, other: &Self) -> bool155     pub fn is_same(&self, other: &Self) -> bool {
156         self.channel.is_same(&other.channel)
157     }
158 
159     /// Gets the number of values in the channel.
160     ///
161     /// # Examples
162     ///
163     /// ```
164     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
165     /// let (tx, rx) = unbounded_channel();
166     /// assert_eq!(tx.len(), 0);
167     /// tx.send(1).unwrap();
168     /// assert_eq!(tx.len(), 1);
169     /// ```
len(&self) -> usize170     pub fn len(&self) -> usize {
171         self.channel.len()
172     }
173 
174     /// Returns `true` if the channel contains no elements.
175     ///
176     /// # Examples
177     ///
178     /// ```
179     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
180     /// let (tx, rx) = unbounded_channel();
181     /// assert!(tx.is_empty());
182     /// tx.send(1).unwrap();
183     /// assert!(!tx.is_empty());
184     /// ```
is_empty(&self) -> bool185     pub fn is_empty(&self) -> bool {
186         self.len() == 0
187     }
188 }
189 
190 impl<T> Drop for UnboundedSender<T> {
drop(&mut self)191     fn drop(&mut self) {
192         self.channel.close();
193     }
194 }
195 
196 impl<T> UnboundedReceiver<T> {
new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T>197     fn new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T> {
198         UnboundedReceiver { channel }
199     }
200 
201     /// Gets the number of values in the channel.
202     ///
203     /// # Examples
204     ///
205     /// ```
206     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
207     /// let (tx, rx) = unbounded_channel();
208     /// tx.send(1).unwrap();
209     /// tx.send(2).unwrap();
210     /// assert_eq!(rx.len(), 2);
211     /// ```
len(&self) -> usize212     pub fn len(&self) -> usize {
213         self.channel.len()
214     }
215 
216     /// Returns `true` if the channel contains no elements.
217     ///
218     /// # Examples
219     ///
220     /// ```
221     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
222     /// let (tx, rx) = unbounded_channel();
223     /// assert!(rx.is_empty());
224     /// tx.send(1).unwrap();
225     /// assert!(!rx.is_empty());
226     /// ```
is_empty(&self) -> bool227     pub fn is_empty(&self) -> bool {
228         self.len() == 0
229     }
230 
231     /// Attempts to receive a value from the associated [`UnboundedSender`].
232     ///
233     /// # Return value
234     /// * `Ok(T)` if receiving a value successfully.
235     /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
236     /// * `Err(TryRecvError::Closed)` if all senders have been dropped.
237     ///
238     /// # Examples
239     ///
240     /// ```
241     /// use ylong_runtime::sync::error::TryRecvError;
242     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
243     /// let (tx, mut rx) = unbounded_channel();
244     /// match rx.try_recv() {
245     ///     Err(TryRecvError::Empty) => {}
246     ///     _ => panic!("This won't happen"),
247     /// }
248     /// tx.send(1).unwrap();
249     /// match rx.try_recv() {
250     ///     Ok(_) => {}
251     ///     _ => panic!("This won't happen"),
252     /// }
253     /// drop(tx);
254     /// match rx.try_recv() {
255     ///     Err(TryRecvError::Closed) => {}
256     ///     _ => panic!("This won't happen"),
257     /// }
258     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>259     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
260         self.channel.try_recv()
261     }
262 
263     /// Polls to receive a value from the associated [`UnboundedSender`].
264     ///
265     /// When the sender has not yet sent a message, calling this method will
266     /// return pending, and the waker from the Context will receive a
267     /// wakeup when the message arrives or when the channel is closed. Multiple
268     /// calls to this method, only the waker from the last call will receive a
269     /// wakeup.
270     ///
271     /// # Return value
272     /// * `Poll::Pending` if no messages in the channel now, but the channel is
273     ///   not closed.
274     /// * `Poll::Ready(Ok(T))` if receiving a value successfully.
275     /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All
276     ///   senders have been dropped or the channel is closed. 2. No messages
277     ///   remaining.
278     ///
279     /// # Examples
280     ///
281     /// ```
282     /// use ylong_runtime::futures::poll_fn;
283     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
284     /// async fn io_func() {
285     ///     let (tx, mut rx) = unbounded_channel();
286     ///     let handle = ylong_runtime::spawn(async move {
287     ///         let msg = poll_fn(|cx| rx.poll_recv(cx)).await;
288     ///         assert_eq!(msg, Ok(1));
289     ///     });
290     ///     assert!(tx.send(1).is_ok());
291     /// }
292     /// ```
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>293     pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
294         self.channel.poll_recv(cx)
295     }
296 
297     /// Receives a value from the associated [`UnboundedSender`].
298     ///
299     /// The `receiver` can still receive all sent messages in the channel after
300     /// the channel is closed.
301     ///
302     /// # Return value
303     /// * `Ok(T)` if receiving a value successfully.
304     /// * `Err(RecvError)` in the following situations: 1. All senders have been
305     ///   dropped or the channel is closed. 2. No messages remaining.
306     ///
307     /// # Examples
308     ///
309     /// ```
310     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
311     /// async fn io_func() {
312     ///     let (tx, mut rx) = unbounded_channel();
313     ///     let handle = ylong_runtime::spawn(async move {
314     ///         assert_eq!(rx.recv().await, Ok(1));
315     ///     });
316     ///     assert!(tx.send(1).is_ok());
317     /// }
318     /// ```
recv(&mut self) -> Result<T, RecvError>319     pub async fn recv(&mut self) -> Result<T, RecvError> {
320         poll_fn(|cx| self.channel.poll_recv(cx)).await
321     }
322 
323     /// Attempts to receive a value from the associated [`UnboundedSender`] in a
324     /// limited amount of time.
325     ///
326     /// The `receiver` can still receive all sent messages in the channel after
327     /// the channel is closed.
328     ///
329     /// # Return value
330     /// * `Ok(T)` if receiving a value successfully.
331     /// * `Err(RecvTimeoutError::Closed)` if all senders have been dropped.
332     /// * `Err(RecvTimeoutError::TimeOut)` if receiving timeout has elapsed.
333     ///
334     /// # Examples
335     ///
336     /// ```
337     /// use std::time::Duration;
338     ///
339     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
340     /// async fn io_func() {
341     ///     let (tx, mut rx) = unbounded_channel();
342     ///     let handle = ylong_runtime::spawn(async move {
343     ///         tx.send(1).unwrap();
344     ///         assert_eq!(rx.recv_timeout(Duration::from_millis(10)).await, Ok(1));
345     ///     });
346     /// }
347     /// ```
348     #[cfg(feature = "time")]
recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError>349     pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError> {
350         match timeout(time, self.recv()).await {
351             Ok(res) => res.map_err(|_| RecvTimeoutError::Closed),
352             Err(_) => Err(RecvTimeoutError::Timeout),
353         }
354     }
355 
356     /// Closes the channel, prevents the `Sender` from sending more values.
357     ///
358     /// The `Sender` will fail to call [`send`] after the `Receiver` called
359     /// `close`. It will do nothing if the channel is already closed.
360     ///
361     /// [`send`]: UnboundedSender::send
362     ///
363     /// # Examples
364     /// ```
365     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
366     /// async fn io_func() {
367     ///     let (tx, mut rx) = unbounded_channel();
368     ///     assert!(!tx.is_closed());
369     ///
370     ///     rx.close();
371     ///
372     ///     assert!(tx.is_closed());
373     ///     assert!(tx.send("no receive").is_err());
374     /// }
375     /// ```
376     ///
377     /// Receive a value sent **before** calling `close`
378     ///
379     /// ```
380     /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
381     /// async fn io_func() {
382     ///     let (tx, mut rx) = unbounded_channel();
383     ///     assert!(tx.send("Hello").is_ok());
384     ///
385     ///     rx.close();
386     ///
387     ///     let msg = rx.try_recv().unwrap();
388     ///     assert_eq!(msg, "Hello");
389     /// }
390     /// ```
close(&mut self)391     pub fn close(&mut self) {
392         self.channel.close();
393     }
394 }
395 
396 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)397     fn drop(&mut self) {
398         self.channel.close();
399     }
400 }
401