1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Unbounded channel
15
16 pub(crate) mod queue;
17
18 use std::task::{Context, Poll};
19
20 use crate::futures::poll_fn;
21 use crate::sync::error::{RecvError, SendError, TryRecvError};
22 use crate::sync::mpsc::unbounded::queue::Queue;
23 use crate::sync::mpsc::{channel, Container, Rx, Tx};
24
25 cfg_time!(
26 use crate::time::timeout;
27 use std::time::Duration;
28 use crate::sync::error::RecvTimeoutError;
29 );
30 /// The sender of unbounded channel.
31 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
32 /// the [`unbounded_channel`] function.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
38 /// async fn io_func() {
39 /// let (tx, mut rx) = unbounded_channel();
40 /// let tx2 = tx.clone();
41 /// assert!(tx.send(1).is_ok());
42 /// assert!(!tx.is_closed());
43 /// assert!(tx.is_same(&tx2));
44 /// let handle = ylong_runtime::spawn(async move {
45 /// assert_eq!(rx.recv().await, Ok(1));
46 /// });
47 /// }
48 /// ```
49 pub struct UnboundedSender<T> {
50 channel: Tx<Queue<T>>,
51 }
52
53 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self54 fn clone(&self) -> Self {
55 UnboundedSender {
56 channel: self.channel.clone(),
57 }
58 }
59 }
60
61 /// The receiver of unbounded channel.
62 /// A [`UnboundedSender`] and [`UnboundedReceiver`] handle pair are created by
63 /// the [`unbounded_channel`] function.
64 ///
65 /// # Examples
66 ///
67 /// ```
68 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
69 /// async fn io_func() {
70 /// let (tx, mut rx) = unbounded_channel();
71 /// assert!(rx.try_recv().is_err());
72 /// assert!(tx.send(1).is_ok());
73 /// let handle = ylong_runtime::spawn(async move {
74 /// assert_eq!(rx.len(), 1);
75 /// assert_eq!(rx.recv().await, Ok(1));
76 /// });
77 /// }
78 /// ```
79 pub struct UnboundedReceiver<T> {
80 channel: Rx<Queue<T>>,
81 }
82
83 /// Creates a new mpsc channel and returns a `Sender` and `Receiver` handle
84 /// pair.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
90 /// async fn io_func() {
91 /// let (tx, mut rx) = unbounded_channel();
92 /// let handle = ylong_runtime::spawn(async move {
93 /// assert_eq!(rx.recv().await, Ok(1));
94 /// });
95 /// assert!(tx.send(1).is_ok());
96 /// }
97 /// ```
unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)98 pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
99 let queue = Queue::new();
100 let (tx, rx) = channel(queue);
101 (UnboundedSender::new(tx), UnboundedReceiver::new(rx))
102 }
103
104 impl<T> UnboundedSender<T> {
new(channel: Tx<Queue<T>>) -> UnboundedSender<T>105 fn new(channel: Tx<Queue<T>>) -> UnboundedSender<T> {
106 UnboundedSender { channel }
107 }
108
109 /// Sends values to the associated receiver.
110 ///
111 /// An error containing the sent value would be returned if the receiver is
112 /// closed or dropped.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
118 /// let (tx, mut rx) = unbounded_channel();
119 /// assert!(tx.send(1).is_ok());
120 /// assert_eq!(rx.try_recv().unwrap(), 1);
121 /// ```
send(&self, value: T) -> Result<(), SendError<T>>122 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
123 self.channel.send(value)
124 }
125
126 /// Checks whether the channel is closed. If so, the sender could not
127 /// send values anymore. It returns true if the [`UnboundedReceiver`] is
128 /// dropped or calls the [`close`] method.
129 ///
130 /// [`close`]: UnboundedReceiver::close
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
136 /// let (tx, rx) = unbounded_channel::<isize>();
137 /// assert!(!tx.is_closed());
138 /// drop(rx);
139 /// assert!(tx.is_closed());
140 /// ```
is_closed(&self) -> bool141 pub fn is_closed(&self) -> bool {
142 self.channel.is_close()
143 }
144
145 /// Checks whether the sender and another sender belong to the same channel.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
151 /// let (tx, rx) = unbounded_channel::<isize>();
152 /// let tx2 = tx.clone();
153 /// assert!(tx.is_same(&tx2));
154 /// ```
is_same(&self, other: &Self) -> bool155 pub fn is_same(&self, other: &Self) -> bool {
156 self.channel.is_same(&other.channel)
157 }
158
159 /// Gets the number of values in the channel.
160 ///
161 /// # Examples
162 ///
163 /// ```
164 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
165 /// let (tx, rx) = unbounded_channel();
166 /// assert_eq!(tx.len(), 0);
167 /// tx.send(1).unwrap();
168 /// assert_eq!(tx.len(), 1);
169 /// ```
len(&self) -> usize170 pub fn len(&self) -> usize {
171 self.channel.len()
172 }
173
174 /// Returns `true` if the channel contains no elements.
175 ///
176 /// # Examples
177 ///
178 /// ```
179 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
180 /// let (tx, rx) = unbounded_channel();
181 /// assert!(tx.is_empty());
182 /// tx.send(1).unwrap();
183 /// assert!(!tx.is_empty());
184 /// ```
is_empty(&self) -> bool185 pub fn is_empty(&self) -> bool {
186 self.len() == 0
187 }
188 }
189
190 impl<T> Drop for UnboundedSender<T> {
drop(&mut self)191 fn drop(&mut self) {
192 self.channel.close();
193 }
194 }
195
196 impl<T> UnboundedReceiver<T> {
new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T>197 fn new(channel: Rx<Queue<T>>) -> UnboundedReceiver<T> {
198 UnboundedReceiver { channel }
199 }
200
201 /// Gets the number of values in the channel.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
207 /// let (tx, rx) = unbounded_channel();
208 /// tx.send(1).unwrap();
209 /// tx.send(2).unwrap();
210 /// assert_eq!(rx.len(), 2);
211 /// ```
len(&self) -> usize212 pub fn len(&self) -> usize {
213 self.channel.len()
214 }
215
216 /// Returns `true` if the channel contains no elements.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
222 /// let (tx, rx) = unbounded_channel();
223 /// assert!(rx.is_empty());
224 /// tx.send(1).unwrap();
225 /// assert!(!rx.is_empty());
226 /// ```
is_empty(&self) -> bool227 pub fn is_empty(&self) -> bool {
228 self.len() == 0
229 }
230
231 /// Attempts to receive a value from the associated [`UnboundedSender`].
232 ///
233 /// # Return value
234 /// * `Ok(T)` if receiving a value successfully.
235 /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
236 /// * `Err(TryRecvError::Closed)` if all senders have been dropped.
237 ///
238 /// # Examples
239 ///
240 /// ```
241 /// use ylong_runtime::sync::error::TryRecvError;
242 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
243 /// let (tx, mut rx) = unbounded_channel();
244 /// match rx.try_recv() {
245 /// Err(TryRecvError::Empty) => {}
246 /// _ => panic!("This won't happen"),
247 /// }
248 /// tx.send(1).unwrap();
249 /// match rx.try_recv() {
250 /// Ok(_) => {}
251 /// _ => panic!("This won't happen"),
252 /// }
253 /// drop(tx);
254 /// match rx.try_recv() {
255 /// Err(TryRecvError::Closed) => {}
256 /// _ => panic!("This won't happen"),
257 /// }
258 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>259 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
260 self.channel.try_recv()
261 }
262
263 /// Polls to receive a value from the associated [`UnboundedSender`].
264 ///
265 /// When the sender has not yet sent a message, calling this method will
266 /// return pending, and the waker from the Context will receive a
267 /// wakeup when the message arrives or when the channel is closed. Multiple
268 /// calls to this method, only the waker from the last call will receive a
269 /// wakeup.
270 ///
271 /// # Return value
272 /// * `Poll::Pending` if no messages in the channel now, but the channel is
273 /// not closed.
274 /// * `Poll::Ready(Ok(T))` if receiving a value successfully.
275 /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All
276 /// senders have been dropped or the channel is closed. 2. No messages
277 /// remaining.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use ylong_runtime::futures::poll_fn;
283 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
284 /// async fn io_func() {
285 /// let (tx, mut rx) = unbounded_channel();
286 /// let handle = ylong_runtime::spawn(async move {
287 /// let msg = poll_fn(|cx| rx.poll_recv(cx)).await;
288 /// assert_eq!(msg, Ok(1));
289 /// });
290 /// assert!(tx.send(1).is_ok());
291 /// }
292 /// ```
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>293 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
294 self.channel.poll_recv(cx)
295 }
296
297 /// Receives a value from the associated [`UnboundedSender`].
298 ///
299 /// The `receiver` can still receive all sent messages in the channel after
300 /// the channel is closed.
301 ///
302 /// # Return value
303 /// * `Ok(T)` if receiving a value successfully.
304 /// * `Err(RecvError)` in the following situations: 1. All senders have been
305 /// dropped or the channel is closed. 2. No messages remaining.
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
311 /// async fn io_func() {
312 /// let (tx, mut rx) = unbounded_channel();
313 /// let handle = ylong_runtime::spawn(async move {
314 /// assert_eq!(rx.recv().await, Ok(1));
315 /// });
316 /// assert!(tx.send(1).is_ok());
317 /// }
318 /// ```
recv(&mut self) -> Result<T, RecvError>319 pub async fn recv(&mut self) -> Result<T, RecvError> {
320 poll_fn(|cx| self.channel.poll_recv(cx)).await
321 }
322
323 /// Attempts to receive a value from the associated [`UnboundedSender`] in a
324 /// limited amount of time.
325 ///
326 /// The `receiver` can still receive all sent messages in the channel after
327 /// the channel is closed.
328 ///
329 /// # Return value
330 /// * `Ok(T)` if receiving a value successfully.
331 /// * `Err(RecvTimeoutError::Closed)` if all senders have been dropped.
332 /// * `Err(RecvTimeoutError::TimeOut)` if receiving timeout has elapsed.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// use std::time::Duration;
338 ///
339 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
340 /// async fn io_func() {
341 /// let (tx, mut rx) = unbounded_channel();
342 /// let handle = ylong_runtime::spawn(async move {
343 /// tx.send(1).unwrap();
344 /// assert_eq!(rx.recv_timeout(Duration::from_millis(10)).await, Ok(1));
345 /// });
346 /// }
347 /// ```
348 #[cfg(feature = "time")]
recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError>349 pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError> {
350 match timeout(time, self.recv()).await {
351 Ok(res) => res.map_err(|_| RecvTimeoutError::Closed),
352 Err(_) => Err(RecvTimeoutError::Timeout),
353 }
354 }
355
356 /// Closes the channel, prevents the `Sender` from sending more values.
357 ///
358 /// The `Sender` will fail to call [`send`] after the `Receiver` called
359 /// `close`. It will do nothing if the channel is already closed.
360 ///
361 /// [`send`]: UnboundedSender::send
362 ///
363 /// # Examples
364 /// ```
365 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
366 /// async fn io_func() {
367 /// let (tx, mut rx) = unbounded_channel();
368 /// assert!(!tx.is_closed());
369 ///
370 /// rx.close();
371 ///
372 /// assert!(tx.is_closed());
373 /// assert!(tx.send("no receive").is_err());
374 /// }
375 /// ```
376 ///
377 /// Receive a value sent **before** calling `close`
378 ///
379 /// ```
380 /// use ylong_runtime::sync::mpsc::unbounded::unbounded_channel;
381 /// async fn io_func() {
382 /// let (tx, mut rx) = unbounded_channel();
383 /// assert!(tx.send("Hello").is_ok());
384 ///
385 /// rx.close();
386 ///
387 /// let msg = rx.try_recv().unwrap();
388 /// assert_eq!(msg, "Hello");
389 /// }
390 /// ```
close(&mut self)391 pub fn close(&mut self) {
392 self.channel.close();
393 }
394 }
395
396 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)397 fn drop(&mut self) {
398 self.channel.close();
399 }
400 }
401