tokio/sync/mpsc/bounded.rs
1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// # #[tokio::main(flavor = "current_thread")]
44/// # async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// # }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79 chan: &'a chan::Tx<T, Semaphore>,
80 n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96 chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107 /// The channel receiver.
108 chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages. Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0, or too large. Currently the maximum
131/// capacity is [`Semaphore::MAX_PERMITS`].
132///
133/// [`Semaphore::MAX_PERMITS`]: crate::sync::Semaphore::MAX_PERMITS
134///
135/// # Examples
136///
137/// ```rust
138/// use tokio::sync::mpsc;
139///
140/// # #[tokio::main(flavor = "current_thread")]
141/// # async fn main() {
142/// let (tx, mut rx) = mpsc::channel(100);
143///
144/// tokio::spawn(async move {
145/// for i in 0..10 {
146/// if let Err(_) = tx.send(i).await {
147/// println!("receiver dropped");
148/// return;
149/// }
150/// }
151/// });
152///
153/// while let Some(i) = rx.recv().await {
154/// println!("got = {}", i);
155/// }
156/// # }
157/// ```
158#[track_caller]
159pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
160 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
161 let semaphore = Semaphore {
162 semaphore: semaphore::Semaphore::new(buffer),
163 bound: buffer,
164 };
165 let (tx, rx) = chan::channel(semaphore);
166
167 let tx = Sender::new(tx);
168 let rx = Receiver::new(rx);
169
170 (tx, rx)
171}
172
173/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
174/// representing the channel bound.
175#[derive(Debug)]
176pub(crate) struct Semaphore {
177 pub(crate) semaphore: semaphore::Semaphore,
178 pub(crate) bound: usize,
179}
180
181impl<T> Receiver<T> {
182 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
183 Receiver { chan }
184 }
185
186 /// Receives the next value for this receiver.
187 ///
188 /// This method returns `None` if the channel has been closed and there are
189 /// no remaining messages in the channel's buffer. This indicates that no
190 /// further values can ever be received from this `Receiver`. The channel is
191 /// closed when all senders have been dropped, or when [`close`] is called.
192 ///
193 /// If there are no messages in the channel's buffer, but the channel has
194 /// not yet been closed, this method will sleep until a message is sent or
195 /// the channel is closed. Note that if [`close`] is called, but there are
196 /// still outstanding [`Permits`] from before it was closed, the channel is
197 /// not considered closed by `recv` until the permits are released.
198 ///
199 /// # Cancel safety
200 ///
201 /// This method is cancel safe. If `recv` is used as the event in a
202 /// [`tokio::select!`](crate::select) statement and some other branch
203 /// completes first, it is guaranteed that no messages were received on this
204 /// channel.
205 ///
206 /// [`close`]: Self::close
207 /// [`Permits`]: struct@crate::sync::mpsc::Permit
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use tokio::sync::mpsc;
213 ///
214 /// # #[tokio::main(flavor = "current_thread")]
215 /// # async fn main() {
216 /// let (tx, mut rx) = mpsc::channel(100);
217 ///
218 /// tokio::spawn(async move {
219 /// tx.send("hello").await.unwrap();
220 /// });
221 ///
222 /// assert_eq!(Some("hello"), rx.recv().await);
223 /// assert_eq!(None, rx.recv().await);
224 /// # }
225 /// ```
226 ///
227 /// Values are buffered:
228 ///
229 /// ```
230 /// use tokio::sync::mpsc;
231 ///
232 /// # #[tokio::main(flavor = "current_thread")]
233 /// # async fn main() {
234 /// let (tx, mut rx) = mpsc::channel(100);
235 ///
236 /// tx.send("hello").await.unwrap();
237 /// tx.send("world").await.unwrap();
238 ///
239 /// assert_eq!(Some("hello"), rx.recv().await);
240 /// assert_eq!(Some("world"), rx.recv().await);
241 /// # }
242 /// ```
243 pub async fn recv(&mut self) -> Option<T> {
244 use std::future::poll_fn;
245 poll_fn(|cx| self.chan.recv(cx)).await
246 }
247
248 /// Receives the next values for this receiver and extends `buffer`.
249 ///
250 /// This method extends `buffer` by no more than a fixed number of values
251 /// as specified by `limit`. If `limit` is zero, the function immediately
252 /// returns `0`. The return value is the number of values added to `buffer`.
253 ///
254 /// For `limit > 0`, if there are no messages in the channel's queue, but
255 /// the channel has not yet been closed, this method will sleep until a
256 /// message is sent or the channel is closed. Note that if [`close`] is
257 /// called, but there are still outstanding [`Permits`] from before it was
258 /// closed, the channel is not considered closed by `recv_many` until the
259 /// permits are released.
260 ///
261 /// For non-zero values of `limit`, this method will never return `0` unless
262 /// the channel has been closed and there are no remaining messages in the
263 /// channel's queue. This indicates that no further values can ever be
264 /// received from this `Receiver`. The channel is closed when all senders
265 /// have been dropped, or when [`close`] is called.
266 ///
267 /// The capacity of `buffer` is increased as needed.
268 ///
269 /// # Cancel safety
270 ///
271 /// This method is cancel safe. If `recv_many` is used as the event in a
272 /// [`tokio::select!`](crate::select) statement and some other branch
273 /// completes first, it is guaranteed that no messages were received on this
274 /// channel.
275 ///
276 /// [`close`]: Self::close
277 /// [`Permits`]: struct@crate::sync::mpsc::Permit
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use tokio::sync::mpsc;
283 ///
284 /// # #[tokio::main(flavor = "current_thread")]
285 /// # async fn main() {
286 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
287 /// let limit = 2;
288 /// let (tx, mut rx) = mpsc::channel(100);
289 /// let tx2 = tx.clone();
290 /// tx2.send("first").await.unwrap();
291 /// tx2.send("second").await.unwrap();
292 /// tx2.send("third").await.unwrap();
293 ///
294 /// // Call `recv_many` to receive up to `limit` (2) values.
295 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
296 /// assert_eq!(vec!["first", "second"], buffer);
297 ///
298 /// // If the buffer is full, the next call to `recv_many`
299 /// // reserves additional capacity.
300 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
301 ///
302 /// tokio::spawn(async move {
303 /// tx.send("fourth").await.unwrap();
304 /// });
305 ///
306 /// // 'tx' is dropped, but `recv_many`
307 /// // is guaranteed not to return 0 as the channel
308 /// // is not yet closed.
309 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
310 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
311 ///
312 /// // Once the last sender is dropped, the channel is
313 /// // closed and `recv_many` returns 0, capacity unchanged.
314 /// drop(tx2);
315 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
316 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
317 /// # }
318 /// ```
319 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
320 use std::future::poll_fn;
321 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
322 }
323
324 /// Tries to receive the next value for this receiver.
325 ///
326 /// This method returns the [`Empty`] error if the channel is currently
327 /// empty, but there are still outstanding [senders] or [permits].
328 ///
329 /// This method returns the [`Disconnected`] error if the channel is
330 /// currently empty, and there are no outstanding [senders] or [permits].
331 ///
332 /// Unlike the [`poll_recv`] method, this method will never return an
333 /// [`Empty`] error spuriously.
334 ///
335 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
336 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
337 /// [`poll_recv`]: Self::poll_recv
338 /// [senders]: crate::sync::mpsc::Sender
339 /// [permits]: crate::sync::mpsc::Permit
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use tokio::sync::mpsc;
345 /// use tokio::sync::mpsc::error::TryRecvError;
346 ///
347 /// # #[tokio::main(flavor = "current_thread")]
348 /// # async fn main() {
349 /// let (tx, mut rx) = mpsc::channel(100);
350 ///
351 /// tx.send("hello").await.unwrap();
352 ///
353 /// assert_eq!(Ok("hello"), rx.try_recv());
354 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
355 ///
356 /// tx.send("hello").await.unwrap();
357 /// // Drop the last sender, closing the channel.
358 /// drop(tx);
359 ///
360 /// assert_eq!(Ok("hello"), rx.try_recv());
361 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
362 /// # }
363 /// ```
364 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
365 self.chan.try_recv()
366 }
367
368 /// Blocking receive to call outside of asynchronous contexts.
369 ///
370 /// This method returns `None` if the channel has been closed and there are
371 /// no remaining messages in the channel's buffer. This indicates that no
372 /// further values can ever be received from this `Receiver`. The channel is
373 /// closed when all senders have been dropped, or when [`close`] is called.
374 ///
375 /// If there are no messages in the channel's buffer, but the channel has
376 /// not yet been closed, this method will block until a message is sent or
377 /// the channel is closed.
378 ///
379 /// This method is intended for use cases where you are sending from
380 /// asynchronous code to synchronous code, and will work even if the sender
381 /// is not using [`blocking_send`] to send the message.
382 ///
383 /// Note that if [`close`] is called, but there are still outstanding
384 /// [`Permits`] from before it was closed, the channel is not considered
385 /// closed by `blocking_recv` until the permits are released.
386 ///
387 /// [`close`]: Self::close
388 /// [`Permits`]: struct@crate::sync::mpsc::Permit
389 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
390 ///
391 /// # Panics
392 ///
393 /// This function panics if called within an asynchronous execution
394 /// context.
395 ///
396 /// # Examples
397 ///
398 /// ```
399 /// # #[cfg(not(target_family = "wasm"))]
400 /// # {
401 /// use std::thread;
402 /// use tokio::runtime::Runtime;
403 /// use tokio::sync::mpsc;
404 ///
405 /// fn main() {
406 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
407 ///
408 /// let sync_code = thread::spawn(move || {
409 /// assert_eq!(Some(10), rx.blocking_recv());
410 /// });
411 ///
412 /// Runtime::new()
413 /// .unwrap()
414 /// .block_on(async move {
415 /// let _ = tx.send(10).await;
416 /// });
417 /// sync_code.join().unwrap()
418 /// }
419 /// # }
420 /// ```
421 #[track_caller]
422 #[cfg(feature = "sync")]
423 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
424 pub fn blocking_recv(&mut self) -> Option<T> {
425 crate::future::block_on(self.recv())
426 }
427
428 /// Variant of [`Self::recv_many`] for blocking contexts.
429 ///
430 /// The same conditions as in [`Self::blocking_recv`] apply.
431 #[track_caller]
432 #[cfg(feature = "sync")]
433 #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
434 pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
435 crate::future::block_on(self.recv_many(buffer, limit))
436 }
437
438 /// Closes the receiving half of a channel without dropping it.
439 ///
440 /// This prevents any further messages from being sent on the channel while
441 /// still enabling the receiver to drain messages that are buffered. Any
442 /// outstanding [`Permit`] values will still be able to send messages.
443 ///
444 /// To guarantee that no messages are dropped, after calling `close()`,
445 /// `recv()` must be called until `None` is returned. If there are
446 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
447 /// not return `None` until those are released.
448 ///
449 /// [`Permit`]: Permit
450 /// [`OwnedPermit`]: OwnedPermit
451 ///
452 /// # Examples
453 ///
454 /// ```
455 /// use tokio::sync::mpsc;
456 ///
457 /// # #[tokio::main(flavor = "current_thread")]
458 /// # async fn main() {
459 /// let (tx, mut rx) = mpsc::channel(20);
460 ///
461 /// tokio::spawn(async move {
462 /// let mut i = 0;
463 /// while let Ok(permit) = tx.reserve().await {
464 /// permit.send(i);
465 /// i += 1;
466 /// }
467 /// });
468 ///
469 /// rx.close();
470 ///
471 /// while let Some(msg) = rx.recv().await {
472 /// println!("got {}", msg);
473 /// }
474 ///
475 /// // Channel closed and no messages are lost.
476 /// # }
477 /// ```
478 pub fn close(&mut self) {
479 self.chan.close();
480 }
481
482 /// Checks if a channel is closed.
483 ///
484 /// This method returns `true` if the channel has been closed. The channel is closed
485 /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
486 ///
487 /// [`Sender`]: crate::sync::mpsc::Sender
488 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
489 ///
490 /// # Examples
491 /// ```
492 /// use tokio::sync::mpsc;
493 ///
494 /// # #[tokio::main(flavor = "current_thread")]
495 /// # async fn main() {
496 /// let (_tx, mut rx) = mpsc::channel::<()>(10);
497 /// assert!(!rx.is_closed());
498 ///
499 /// rx.close();
500 ///
501 /// assert!(rx.is_closed());
502 /// # }
503 /// ```
504 pub fn is_closed(&self) -> bool {
505 self.chan.is_closed()
506 }
507
508 /// Checks if a channel is empty.
509 ///
510 /// This method returns `true` if the channel has no messages.
511 ///
512 /// # Examples
513 /// ```
514 /// use tokio::sync::mpsc;
515 ///
516 /// # #[tokio::main(flavor = "current_thread")]
517 /// # async fn main() {
518 /// let (tx, rx) = mpsc::channel(10);
519 /// assert!(rx.is_empty());
520 ///
521 /// tx.send(0).await.unwrap();
522 /// assert!(!rx.is_empty());
523 /// # }
524 ///
525 /// ```
526 pub fn is_empty(&self) -> bool {
527 self.chan.is_empty()
528 }
529
530 /// Returns the number of messages in the channel.
531 ///
532 /// # Examples
533 /// ```
534 /// use tokio::sync::mpsc;
535 ///
536 /// # #[tokio::main(flavor = "current_thread")]
537 /// # async fn main() {
538 /// let (tx, rx) = mpsc::channel(10);
539 /// assert_eq!(0, rx.len());
540 ///
541 /// tx.send(0).await.unwrap();
542 /// assert_eq!(1, rx.len());
543 /// # }
544 /// ```
545 pub fn len(&self) -> usize {
546 self.chan.len()
547 }
548
549 /// Returns the current number of reservations which can immediately be
550 /// returned from the channel.
551 ///
552 /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
553 /// capacity with [`Sender::reserve`].
554 ///
555 /// The capacity goes up when values are received, unless there are
556 /// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`]
557 /// which have returned [`Poll::Pending`]. While those calls exist, reading
558 /// values from the [`Receiver`] transfers access to that capacity directly to
559 /// those callers, in FIFO order.
560 ///
561 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
562 /// specified when calling [`channel`].
563 ///
564 /// # Examples
565 ///
566 /// ```
567 /// use tokio::sync::mpsc;
568 ///
569 /// # #[tokio::main(flavor = "current_thread")]
570 /// # async fn main() {
571 /// let (tx, mut rx) = mpsc::channel::<()>(5);
572 ///
573 /// assert_eq!(rx.capacity(), 5);
574 ///
575 /// // Making a reservation drops the capacity by one.
576 /// let permit = tx.reserve().await.unwrap();
577 /// assert_eq!(rx.capacity(), 4);
578 /// assert_eq!(rx.len(), 0);
579 ///
580 /// // Sending and receiving a value increases the capacity by one.
581 /// permit.send(());
582 /// assert_eq!(rx.len(), 1);
583 /// rx.recv().await.unwrap();
584 /// assert_eq!(rx.capacity(), 5);
585 ///
586 /// // Directly sending a message drops the capacity by one.
587 /// tx.send(()).await.unwrap();
588 /// assert_eq!(rx.capacity(), 4);
589 /// assert_eq!(rx.len(), 1);
590 ///
591 /// // Receiving the message increases the capacity by one.
592 /// rx.recv().await.unwrap();
593 /// assert_eq!(rx.capacity(), 5);
594 /// assert_eq!(rx.len(), 0);
595 /// # }
596 /// ```
597 /// [`capacity`]: Receiver::capacity
598 /// [`max_capacity`]: Receiver::max_capacity
599 pub fn capacity(&self) -> usize {
600 self.chan.semaphore().semaphore.available_permits()
601 }
602
603 /// Returns the maximum buffer capacity of the channel.
604 ///
605 /// The maximum capacity is the buffer capacity initially specified when calling
606 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
607 /// available buffer capacity: as messages are sent and received, the value
608 /// returned by [`capacity`] will go up or down, whereas the value
609 /// returned by [`max_capacity`] will remain constant.
610 ///
611 /// # Examples
612 ///
613 /// ```
614 /// use tokio::sync::mpsc;
615 ///
616 /// # #[tokio::main(flavor = "current_thread")]
617 /// # async fn main() {
618 /// let (tx, rx) = mpsc::channel::<()>(5);
619 ///
620 /// // both max capacity and capacity are the same at first
621 /// assert_eq!(rx.max_capacity(), 5);
622 /// assert_eq!(rx.capacity(), 5);
623 ///
624 /// // Making a reservation doesn't change the max capacity.
625 /// let permit = tx.reserve().await.unwrap();
626 /// assert_eq!(rx.max_capacity(), 5);
627 /// // but drops the capacity by one
628 /// assert_eq!(rx.capacity(), 4);
629 /// # }
630 /// ```
631 /// [`capacity`]: Receiver::capacity
632 /// [`max_capacity`]: Receiver::max_capacity
633 pub fn max_capacity(&self) -> usize {
634 self.chan.semaphore().bound
635 }
636
637 /// Polls to receive the next message on this channel.
638 ///
639 /// This method returns:
640 ///
641 /// * `Poll::Pending` if no messages are available but the channel is not
642 /// closed, or if a spurious failure happens.
643 /// * `Poll::Ready(Some(message))` if a message is available.
644 /// * `Poll::Ready(None)` if the channel has been closed and all messages
645 /// sent before it was closed have been received.
646 ///
647 /// When the method returns `Poll::Pending`, the `Waker` in the provided
648 /// `Context` is scheduled to receive a wakeup when a message is sent on any
649 /// receiver, or when the channel is closed. Note that on multiple calls to
650 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
651 /// passed to the most recent call is scheduled to receive a wakeup.
652 ///
653 /// If this method returns `Poll::Pending` due to a spurious failure, then
654 /// the `Waker` will be notified when the situation causing the spurious
655 /// failure has been resolved. Note that receiving such a wakeup does not
656 /// guarantee that the next call will succeed — it could fail with another
657 /// spurious failure.
658 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
659 self.chan.recv(cx)
660 }
661
662 /// Polls to receive multiple messages on this channel, extending the provided buffer.
663 ///
664 /// This method returns:
665 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
666 /// spurious failure happens.
667 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
668 /// stored in `buffer`. This can be less than, or equal to, `limit`.
669 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
670 ///
671 /// When the method returns `Poll::Pending`, the `Waker` in the provided
672 /// `Context` is scheduled to receive a wakeup when a message is sent on any
673 /// receiver, or when the channel is closed. Note that on multiple calls to
674 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
675 /// passed to the most recent call is scheduled to receive a wakeup.
676 ///
677 /// Note that this method does not guarantee that exactly `limit` messages
678 /// are received. Rather, if at least one message is available, it returns
679 /// as many messages as it can up to the given limit. This method returns
680 /// zero only if the channel is closed (or if `limit` is zero).
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// use std::task::{Context, Poll};
686 /// use std::pin::Pin;
687 /// use tokio::sync::mpsc;
688 /// use futures::Future;
689 ///
690 /// struct MyReceiverFuture<'a> {
691 /// receiver: mpsc::Receiver<i32>,
692 /// buffer: &'a mut Vec<i32>,
693 /// limit: usize,
694 /// }
695 ///
696 /// impl<'a> Future for MyReceiverFuture<'a> {
697 /// type Output = usize; // Number of messages received
698 ///
699 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
700 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
701 ///
702 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
703 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
704 /// Poll::Pending => Poll::Pending,
705 /// Poll::Ready(count) => Poll::Ready(count),
706 /// }
707 /// }
708 /// }
709 ///
710 /// # #[tokio::main(flavor = "current_thread")]
711 /// # async fn main() {
712 /// let (tx, rx) = mpsc::channel(32);
713 /// let mut buffer = Vec::new();
714 ///
715 /// let my_receiver_future = MyReceiverFuture {
716 /// receiver: rx,
717 /// buffer: &mut buffer,
718 /// limit: 3,
719 /// };
720 ///
721 /// for i in 0..10 {
722 /// tx.send(i).await.unwrap();
723 /// }
724 ///
725 /// let count = my_receiver_future.await;
726 /// assert_eq!(count, 3);
727 /// assert_eq!(buffer, vec![0,1,2])
728 /// # }
729 /// ```
730 pub fn poll_recv_many(
731 &mut self,
732 cx: &mut Context<'_>,
733 buffer: &mut Vec<T>,
734 limit: usize,
735 ) -> Poll<usize> {
736 self.chan.recv_many(cx, buffer, limit)
737 }
738
739 /// Returns the number of [`Sender`] handles.
740 pub fn sender_strong_count(&self) -> usize {
741 self.chan.sender_strong_count()
742 }
743
744 /// Returns the number of [`WeakSender`] handles.
745 pub fn sender_weak_count(&self) -> usize {
746 self.chan.sender_weak_count()
747 }
748}
749
750impl<T> fmt::Debug for Receiver<T> {
751 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
752 fmt.debug_struct("Receiver")
753 .field("chan", &self.chan)
754 .finish()
755 }
756}
757
758impl<T> Unpin for Receiver<T> {}
759
760impl<T> Sender<T> {
761 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
762 Sender { chan }
763 }
764
765 /// Sends a value, waiting until there is capacity.
766 ///
767 /// A successful send occurs when it is determined that the other end of the
768 /// channel has not hung up already. An unsuccessful send would be one where
769 /// the corresponding receiver has already been closed. Note that a return
770 /// value of `Err` means that the data will never be received, but a return
771 /// value of `Ok` does not mean that the data will be received. It is
772 /// possible for the corresponding receiver to hang up immediately after
773 /// this function returns `Ok`.
774 ///
775 /// # Errors
776 ///
777 /// If the receive half of the channel is closed, either due to [`close`]
778 /// being called or the [`Receiver`] handle dropping, the function returns
779 /// an error. The error includes the value passed to `send`.
780 ///
781 /// [`close`]: Receiver::close
782 /// [`Receiver`]: Receiver
783 ///
784 /// # Cancel safety
785 ///
786 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
787 /// statement and some other branch completes first, then it is guaranteed
788 /// that the message was not sent. **However, in that case, the message
789 /// is dropped and will be lost.**
790 ///
791 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
792 /// capacity, then use the returned [`Permit`] to send the message.
793 ///
794 /// This channel uses a queue to ensure that calls to `send` and `reserve`
795 /// are granted capacity in the order they were requested. Cancelling a
796 /// call to `send` makes you lose your place in the queue.
797 ///
798 /// # Examples
799 ///
800 /// In the following example, each call to `send` will block until the
801 /// previously sent value was received.
802 ///
803 /// ```rust
804 /// use tokio::sync::mpsc;
805 ///
806 /// # #[tokio::main(flavor = "current_thread")]
807 /// # async fn main() {
808 /// let (tx, mut rx) = mpsc::channel(1);
809 ///
810 /// tokio::spawn(async move {
811 /// for i in 0..10 {
812 /// if let Err(_) = tx.send(i).await {
813 /// println!("receiver dropped");
814 /// return;
815 /// }
816 /// }
817 /// });
818 ///
819 /// while let Some(i) = rx.recv().await {
820 /// println!("got = {}", i);
821 /// }
822 /// # }
823 /// ```
824 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
825 match self.reserve().await {
826 Ok(permit) => {
827 permit.send(value);
828 Ok(())
829 }
830 Err(_) => Err(SendError(value)),
831 }
832 }
833
834 /// Completes when the receiver has dropped.
835 ///
836 /// This allows the producers to get notified when interest in the produced
837 /// values is canceled and immediately stop doing work.
838 ///
839 /// # Cancel safety
840 ///
841 /// This method is cancel safe. Once the channel is closed, it stays closed
842 /// forever and all future calls to `closed` will return immediately.
843 ///
844 /// # Examples
845 ///
846 /// ```
847 /// use tokio::sync::mpsc;
848 ///
849 /// # #[tokio::main(flavor = "current_thread")]
850 /// # async fn main() {
851 /// let (tx1, rx) = mpsc::channel::<()>(1);
852 /// let tx2 = tx1.clone();
853 /// let tx3 = tx1.clone();
854 /// let tx4 = tx1.clone();
855 /// let tx5 = tx1.clone();
856 /// tokio::spawn(async move {
857 /// drop(rx);
858 /// });
859 ///
860 /// futures::join!(
861 /// tx1.closed(),
862 /// tx2.closed(),
863 /// tx3.closed(),
864 /// tx4.closed(),
865 /// tx5.closed()
866 /// );
867 /// println!("Receiver dropped");
868 /// # }
869 /// ```
870 pub async fn closed(&self) {
871 self.chan.closed().await;
872 }
873
874 /// Attempts to immediately send a message on this `Sender`
875 ///
876 /// This method differs from [`send`] by returning immediately if the channel's
877 /// buffer is full or no receiver is waiting to acquire some data. Compared
878 /// with [`send`], this function has two failure cases instead of one (one for
879 /// disconnection, one for a full buffer).
880 ///
881 /// # Errors
882 ///
883 /// If the channel capacity has been reached, i.e., the channel has `n`
884 /// buffered values where `n` is the argument passed to [`channel`], then an
885 /// error is returned.
886 ///
887 /// If the receive half of the channel is closed, either due to [`close`]
888 /// being called or the [`Receiver`] handle dropping, the function returns
889 /// an error. The error includes the value passed to `send`.
890 ///
891 /// [`send`]: Sender::send
892 /// [`channel`]: channel
893 /// [`close`]: Receiver::close
894 ///
895 /// # Examples
896 ///
897 /// ```
898 /// use tokio::sync::mpsc;
899 ///
900 /// # #[tokio::main(flavor = "current_thread")]
901 /// # async fn main() {
902 /// // Create a channel with buffer size 1
903 /// let (tx1, mut rx) = mpsc::channel(1);
904 /// let tx2 = tx1.clone();
905 ///
906 /// tokio::spawn(async move {
907 /// tx1.send(1).await.unwrap();
908 /// tx1.send(2).await.unwrap();
909 /// // task waits until the receiver receives a value.
910 /// });
911 ///
912 /// tokio::spawn(async move {
913 /// // This will return an error and send
914 /// // no message if the buffer is full
915 /// let _ = tx2.try_send(3);
916 /// });
917 ///
918 /// let mut msg;
919 /// msg = rx.recv().await.unwrap();
920 /// println!("message {} received", msg);
921 ///
922 /// msg = rx.recv().await.unwrap();
923 /// println!("message {} received", msg);
924 ///
925 /// // Third message may have never been sent
926 /// match rx.recv().await {
927 /// Some(msg) => println!("message {} received", msg),
928 /// None => println!("the third message was never sent"),
929 /// }
930 /// # }
931 /// ```
932 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
933 match self.chan.semaphore().semaphore.try_acquire(1) {
934 Ok(()) => {}
935 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
936 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
937 }
938
939 // Send the message
940 self.chan.send(message);
941 Ok(())
942 }
943
944 /// Sends a value, waiting until there is capacity, but only for a limited time.
945 ///
946 /// Shares the same success and error conditions as [`send`], adding one more
947 /// condition for an unsuccessful send, which is when the provided timeout has
948 /// elapsed, and there is no capacity available.
949 ///
950 /// [`send`]: Sender::send
951 ///
952 /// # Errors
953 ///
954 /// If the receive half of the channel is closed, either due to [`close`]
955 /// being called or the [`Receiver`] having been dropped,
956 /// the function returns an error. The error includes the value passed to `send`.
957 ///
958 /// [`close`]: Receiver::close
959 /// [`Receiver`]: Receiver
960 ///
961 /// # Panics
962 ///
963 /// This function panics if it is called outside the context of a Tokio
964 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
965 ///
966 /// # Examples
967 ///
968 /// In the following example, each call to `send_timeout` will block until the
969 /// previously sent value was received, unless the timeout has elapsed.
970 ///
971 /// ```rust
972 /// use tokio::sync::mpsc;
973 /// use tokio::time::{sleep, Duration};
974 ///
975 /// # #[tokio::main(flavor = "current_thread")]
976 /// # async fn main() {
977 /// let (tx, mut rx) = mpsc::channel(1);
978 ///
979 /// tokio::spawn(async move {
980 /// for i in 0..10 {
981 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
982 /// println!("send error: #{:?}", e);
983 /// return;
984 /// }
985 /// }
986 /// });
987 ///
988 /// while let Some(i) = rx.recv().await {
989 /// println!("got = {}", i);
990 /// sleep(Duration::from_millis(200)).await;
991 /// }
992 /// # }
993 /// ```
994 #[cfg(feature = "time")]
995 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
996 pub async fn send_timeout(
997 &self,
998 value: T,
999 timeout: Duration,
1000 ) -> Result<(), SendTimeoutError<T>> {
1001 let permit = match crate::time::timeout(timeout, self.reserve()).await {
1002 Err(_) => {
1003 return Err(SendTimeoutError::Timeout(value));
1004 }
1005 Ok(Err(_)) => {
1006 return Err(SendTimeoutError::Closed(value));
1007 }
1008 Ok(Ok(permit)) => permit,
1009 };
1010
1011 permit.send(value);
1012 Ok(())
1013 }
1014
1015 /// Blocking send to call outside of asynchronous contexts.
1016 ///
1017 /// This method is intended for use cases where you are sending from
1018 /// synchronous code to asynchronous code, and will work even if the
1019 /// receiver is not using [`blocking_recv`] to receive the message.
1020 ///
1021 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1022 ///
1023 /// # Panics
1024 ///
1025 /// This function panics if called within an asynchronous execution
1026 /// context.
1027 ///
1028 /// # Examples
1029 ///
1030 /// ```
1031 /// # #[cfg(not(target_family = "wasm"))]
1032 /// # {
1033 /// use std::thread;
1034 /// use tokio::runtime::Runtime;
1035 /// use tokio::sync::mpsc;
1036 ///
1037 /// fn main() {
1038 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
1039 ///
1040 /// let sync_code = thread::spawn(move || {
1041 /// tx.blocking_send(10).unwrap();
1042 /// });
1043 ///
1044 /// Runtime::new().unwrap().block_on(async move {
1045 /// assert_eq!(Some(10), rx.recv().await);
1046 /// });
1047 /// sync_code.join().unwrap()
1048 /// }
1049 /// # }
1050 /// ```
1051 #[track_caller]
1052 #[cfg(feature = "sync")]
1053 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1054 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1055 crate::future::block_on(self.send(value))
1056 }
1057
1058 /// Checks if the channel has been closed. This happens when the
1059 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1060 /// called.
1061 ///
1062 /// [`Receiver`]: crate::sync::mpsc::Receiver
1063 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1064 ///
1065 /// ```
1066 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1067 /// assert!(!tx.is_closed());
1068 ///
1069 /// let tx2 = tx.clone();
1070 /// assert!(!tx2.is_closed());
1071 ///
1072 /// drop(rx);
1073 /// assert!(tx.is_closed());
1074 /// assert!(tx2.is_closed());
1075 /// ```
1076 pub fn is_closed(&self) -> bool {
1077 self.chan.is_closed()
1078 }
1079
1080 /// Waits for channel capacity. Once capacity to send one message is
1081 /// available, it is reserved for the caller.
1082 ///
1083 /// If the channel is full, the function waits for the number of unreceived
1084 /// messages to become less than the channel capacity. Capacity to send one
1085 /// message is reserved for the caller. A [`Permit`] is returned to track
1086 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1087 /// reserved capacity.
1088 ///
1089 /// Dropping [`Permit`] without sending a message releases the capacity back
1090 /// to the channel.
1091 ///
1092 /// [`Permit`]: Permit
1093 /// [`send`]: Permit::send
1094 ///
1095 /// # Cancel safety
1096 ///
1097 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1098 /// are granted capacity in the order they were requested. Cancelling a
1099 /// call to `reserve` makes you lose your place in the queue.
1100 ///
1101 /// # Examples
1102 ///
1103 /// ```
1104 /// use tokio::sync::mpsc;
1105 ///
1106 /// # #[tokio::main(flavor = "current_thread")]
1107 /// # async fn main() {
1108 /// let (tx, mut rx) = mpsc::channel(1);
1109 ///
1110 /// // Reserve capacity
1111 /// let permit = tx.reserve().await.unwrap();
1112 ///
1113 /// // Trying to send directly on the `tx` will fail due to no
1114 /// // available capacity.
1115 /// assert!(tx.try_send(123).is_err());
1116 ///
1117 /// // Sending on the permit succeeds
1118 /// permit.send(456);
1119 ///
1120 /// // The value sent on the permit is received
1121 /// assert_eq!(rx.recv().await.unwrap(), 456);
1122 /// # }
1123 /// ```
1124 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1125 self.reserve_inner(1).await?;
1126 Ok(Permit { chan: &self.chan })
1127 }
1128
1129 /// Waits for channel capacity. Once capacity to send `n` messages is
1130 /// available, it is reserved for the caller.
1131 ///
1132 /// If the channel is full or if there are fewer than `n` permits available, the function waits
1133 /// for the number of unreceived messages to become `n` less than the channel capacity.
1134 /// Capacity to send `n` message is then reserved for the caller.
1135 ///
1136 /// A [`PermitIterator`] is returned to track the reserved capacity.
1137 /// You can call this [`Iterator`] until it is exhausted to
1138 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1139 /// [`try_reserve_many`] except it awaits for the slots to become available.
1140 ///
1141 /// If the channel is closed, the function returns a [`SendError`].
1142 ///
1143 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1144 /// permits back to the channel.
1145 ///
1146 /// [`PermitIterator`]: PermitIterator
1147 /// [`Permit`]: Permit
1148 /// [`send`]: Permit::send
1149 /// [`try_reserve_many`]: Sender::try_reserve_many
1150 ///
1151 /// # Cancel safety
1152 ///
1153 /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1154 /// are granted capacity in the order they were requested. Cancelling a call to
1155 /// `reserve_many` makes you lose your place in the queue.
1156 ///
1157 /// # Examples
1158 ///
1159 /// ```
1160 /// use tokio::sync::mpsc;
1161 ///
1162 /// # #[tokio::main(flavor = "current_thread")]
1163 /// # async fn main() {
1164 /// let (tx, mut rx) = mpsc::channel(2);
1165 ///
1166 /// // Reserve capacity
1167 /// let mut permit = tx.reserve_many(2).await.unwrap();
1168 ///
1169 /// // Trying to send directly on the `tx` will fail due to no
1170 /// // available capacity.
1171 /// assert!(tx.try_send(123).is_err());
1172 ///
1173 /// // Sending with the permit iterator succeeds
1174 /// permit.next().unwrap().send(456);
1175 /// permit.next().unwrap().send(457);
1176 ///
1177 /// // The iterator should now be exhausted
1178 /// assert!(permit.next().is_none());
1179 ///
1180 /// // The value sent on the permit is received
1181 /// assert_eq!(rx.recv().await.unwrap(), 456);
1182 /// assert_eq!(rx.recv().await.unwrap(), 457);
1183 /// # }
1184 /// ```
1185 pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1186 self.reserve_inner(n).await?;
1187 Ok(PermitIterator {
1188 chan: &self.chan,
1189 n,
1190 })
1191 }
1192
1193 /// Waits for channel capacity, moving the `Sender` and returning an owned
1194 /// permit. Once capacity to send one message is available, it is reserved
1195 /// for the caller.
1196 ///
1197 /// This moves the sender _by value_, and returns an owned permit that can
1198 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1199 /// this method may be used in cases where the permit must be valid for the
1200 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1201 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1202 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1203 /// moved, it can be cloned prior to calling `reserve_owned`.
1204 ///
1205 /// If the channel is full, the function waits for the number of unreceived
1206 /// messages to become less than the channel capacity. Capacity to send one
1207 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1208 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1209 /// consumes the reserved capacity.
1210 ///
1211 /// Dropping the [`OwnedPermit`] without sending a message releases the
1212 /// capacity back to the channel.
1213 ///
1214 /// # Cancel safety
1215 ///
1216 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1217 /// are granted capacity in the order they were requested. Cancelling a
1218 /// call to `reserve_owned` makes you lose your place in the queue.
1219 ///
1220 /// # Examples
1221 /// Sending a message using an [`OwnedPermit`]:
1222 /// ```
1223 /// use tokio::sync::mpsc;
1224 ///
1225 /// # #[tokio::main(flavor = "current_thread")]
1226 /// # async fn main() {
1227 /// let (tx, mut rx) = mpsc::channel(1);
1228 ///
1229 /// // Reserve capacity, moving the sender.
1230 /// let permit = tx.reserve_owned().await.unwrap();
1231 ///
1232 /// // Send a message, consuming the permit and returning
1233 /// // the moved sender.
1234 /// let tx = permit.send(123);
1235 ///
1236 /// // The value sent on the permit is received.
1237 /// assert_eq!(rx.recv().await.unwrap(), 123);
1238 ///
1239 /// // The sender can now be used again.
1240 /// tx.send(456).await.unwrap();
1241 /// # }
1242 /// ```
1243 ///
1244 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1245 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1246 ///
1247 /// ```
1248 /// use tokio::sync::mpsc;
1249 ///
1250 /// # #[tokio::main(flavor = "current_thread")]
1251 /// # async fn main() {
1252 /// let (tx, mut rx) = mpsc::channel(1);
1253 ///
1254 /// // Clone the sender and reserve capacity.
1255 /// let permit = tx.clone().reserve_owned().await.unwrap();
1256 ///
1257 /// // Trying to send directly on the `tx` will fail due to no
1258 /// // available capacity.
1259 /// assert!(tx.try_send(123).is_err());
1260 ///
1261 /// // Sending on the permit succeeds.
1262 /// permit.send(456);
1263 ///
1264 /// // The value sent on the permit is received
1265 /// assert_eq!(rx.recv().await.unwrap(), 456);
1266 /// # }
1267 /// ```
1268 ///
1269 /// [`Sender::reserve`]: Sender::reserve
1270 /// [`OwnedPermit`]: OwnedPermit
1271 /// [`send`]: OwnedPermit::send
1272 /// [`Arc::clone`]: std::sync::Arc::clone
1273 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1274 self.reserve_inner(1).await?;
1275 Ok(OwnedPermit {
1276 chan: Some(self.chan),
1277 })
1278 }
1279
1280 async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1281 crate::trace::async_trace_leaf().await;
1282
1283 if n > self.max_capacity() {
1284 return Err(SendError(()));
1285 }
1286 match self.chan.semaphore().semaphore.acquire(n).await {
1287 Ok(()) => Ok(()),
1288 Err(_) => Err(SendError(())),
1289 }
1290 }
1291
1292 /// Tries to acquire a slot in the channel without waiting for the slot to become
1293 /// available.
1294 ///
1295 /// If the channel is full this function will return [`TrySendError`], otherwise
1296 /// if there is a slot available it will return a [`Permit`] that will then allow you
1297 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1298 /// [`reserve`] except it does not await for the slot to become available.
1299 ///
1300 /// Dropping [`Permit`] without sending a message releases the capacity back
1301 /// to the channel.
1302 ///
1303 /// [`Permit`]: Permit
1304 /// [`send`]: Permit::send
1305 /// [`reserve`]: Sender::reserve
1306 ///
1307 /// # Examples
1308 ///
1309 /// ```
1310 /// use tokio::sync::mpsc;
1311 ///
1312 /// # #[tokio::main(flavor = "current_thread")]
1313 /// # async fn main() {
1314 /// let (tx, mut rx) = mpsc::channel(1);
1315 ///
1316 /// // Reserve capacity
1317 /// let permit = tx.try_reserve().unwrap();
1318 ///
1319 /// // Trying to send directly on the `tx` will fail due to no
1320 /// // available capacity.
1321 /// assert!(tx.try_send(123).is_err());
1322 ///
1323 /// // Trying to reserve an additional slot on the `tx` will
1324 /// // fail because there is no capacity.
1325 /// assert!(tx.try_reserve().is_err());
1326 ///
1327 /// // Sending on the permit succeeds
1328 /// permit.send(456);
1329 ///
1330 /// // The value sent on the permit is received
1331 /// assert_eq!(rx.recv().await.unwrap(), 456);
1332 ///
1333 /// # }
1334 /// ```
1335 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1336 match self.chan.semaphore().semaphore.try_acquire(1) {
1337 Ok(()) => {}
1338 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1339 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1340 }
1341
1342 Ok(Permit { chan: &self.chan })
1343 }
1344
1345 /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1346 /// available.
1347 ///
1348 /// A [`PermitIterator`] is returned to track the reserved capacity.
1349 /// You can call this [`Iterator`] until it is exhausted to
1350 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1351 /// [`reserve_many`] except it does not await for the slots to become available.
1352 ///
1353 /// If there are fewer than `n` permits available on the channel, then
1354 /// this function will return a [`TrySendError::Full`]. If the channel is closed
1355 /// this function will return a [`TrySendError::Closed`].
1356 ///
1357 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1358 /// permits back to the channel.
1359 ///
1360 /// [`PermitIterator`]: PermitIterator
1361 /// [`send`]: Permit::send
1362 /// [`reserve_many`]: Sender::reserve_many
1363 ///
1364 /// # Examples
1365 ///
1366 /// ```
1367 /// use tokio::sync::mpsc;
1368 ///
1369 /// # #[tokio::main(flavor = "current_thread")]
1370 /// # async fn main() {
1371 /// let (tx, mut rx) = mpsc::channel(2);
1372 ///
1373 /// // Reserve capacity
1374 /// let mut permit = tx.try_reserve_many(2).unwrap();
1375 ///
1376 /// // Trying to send directly on the `tx` will fail due to no
1377 /// // available capacity.
1378 /// assert!(tx.try_send(123).is_err());
1379 ///
1380 /// // Trying to reserve an additional slot on the `tx` will
1381 /// // fail because there is no capacity.
1382 /// assert!(tx.try_reserve().is_err());
1383 ///
1384 /// // Sending with the permit iterator succeeds
1385 /// permit.next().unwrap().send(456);
1386 /// permit.next().unwrap().send(457);
1387 ///
1388 /// // The iterator should now be exhausted
1389 /// assert!(permit.next().is_none());
1390 ///
1391 /// // The value sent on the permit is received
1392 /// assert_eq!(rx.recv().await.unwrap(), 456);
1393 /// assert_eq!(rx.recv().await.unwrap(), 457);
1394 ///
1395 /// // Trying to call try_reserve_many with 0 will return an empty iterator
1396 /// let mut permit = tx.try_reserve_many(0).unwrap();
1397 /// assert!(permit.next().is_none());
1398 ///
1399 /// // Trying to call try_reserve_many with a number greater than the channel
1400 /// // capacity will return an error
1401 /// let permit = tx.try_reserve_many(3);
1402 /// assert!(permit.is_err());
1403 ///
1404 /// // Trying to call try_reserve_many on a closed channel will return an error
1405 /// drop(rx);
1406 /// let permit = tx.try_reserve_many(1);
1407 /// assert!(permit.is_err());
1408 ///
1409 /// let permit = tx.try_reserve_many(0);
1410 /// assert!(permit.is_err());
1411 /// # }
1412 /// ```
1413 pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1414 if n > self.max_capacity() {
1415 return Err(TrySendError::Full(()));
1416 }
1417
1418 match self.chan.semaphore().semaphore.try_acquire(n) {
1419 Ok(()) => {}
1420 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1421 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1422 }
1423
1424 Ok(PermitIterator {
1425 chan: &self.chan,
1426 n,
1427 })
1428 }
1429
1430 /// Tries to acquire a slot in the channel without waiting for the slot to become
1431 /// available, returning an owned permit.
1432 ///
1433 /// This moves the sender _by value_, and returns an owned permit that can
1434 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1435 /// this method may be used in cases where the permit must be valid for the
1436 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1437 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1438 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1439 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1440 ///
1441 /// If the channel is full this function will return a [`TrySendError`].
1442 /// Since the sender is taken by value, the `TrySendError` returned in this
1443 /// case contains the sender, so that it may be used again. Otherwise, if
1444 /// there is a slot available, this method will return an [`OwnedPermit`]
1445 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1446 /// This function is similar to [`reserve_owned`] except it does not await
1447 /// for the slot to become available.
1448 ///
1449 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1450 /// to the channel.
1451 ///
1452 /// [`OwnedPermit`]: OwnedPermit
1453 /// [`send`]: OwnedPermit::send
1454 /// [`reserve_owned`]: Sender::reserve_owned
1455 /// [`Arc::clone`]: std::sync::Arc::clone
1456 ///
1457 /// # Examples
1458 ///
1459 /// ```
1460 /// use tokio::sync::mpsc;
1461 ///
1462 /// # #[tokio::main(flavor = "current_thread")]
1463 /// # async fn main() {
1464 /// let (tx, mut rx) = mpsc::channel(1);
1465 ///
1466 /// // Reserve capacity
1467 /// let permit = tx.clone().try_reserve_owned().unwrap();
1468 ///
1469 /// // Trying to send directly on the `tx` will fail due to no
1470 /// // available capacity.
1471 /// assert!(tx.try_send(123).is_err());
1472 ///
1473 /// // Trying to reserve an additional slot on the `tx` will
1474 /// // fail because there is no capacity.
1475 /// assert!(tx.try_reserve().is_err());
1476 ///
1477 /// // Sending on the permit succeeds
1478 /// permit.send(456);
1479 ///
1480 /// // The value sent on the permit is received
1481 /// assert_eq!(rx.recv().await.unwrap(), 456);
1482 ///
1483 /// # }
1484 /// ```
1485 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1486 match self.chan.semaphore().semaphore.try_acquire(1) {
1487 Ok(()) => {}
1488 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1489 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1490 }
1491
1492 Ok(OwnedPermit {
1493 chan: Some(self.chan),
1494 })
1495 }
1496
1497 /// Returns `true` if senders belong to the same channel.
1498 ///
1499 /// # Examples
1500 ///
1501 /// ```
1502 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1503 /// let tx2 = tx.clone();
1504 /// assert!(tx.same_channel(&tx2));
1505 ///
1506 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1507 /// assert!(!tx3.same_channel(&tx2));
1508 /// ```
1509 pub fn same_channel(&self, other: &Self) -> bool {
1510 self.chan.same_channel(&other.chan)
1511 }
1512
1513 /// Returns the current number of reservations which can immediately be
1514 /// returned from the channel.
1515 ///
1516 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1517 /// with [`reserve`].
1518 ///
1519 /// The capacity goes up when values are received, unless there are
1520 /// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`]
1521 /// which have returned [`Poll::Pending`]. While those calls exist, reading
1522 /// values from the [`Receiver`] transfers access to that capacity directly to
1523 /// those callers, in FIFO order.
1524 ///
1525 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1526 /// specified when calling [`channel`].
1527 ///
1528 /// # Examples
1529 ///
1530 /// ```
1531 /// use tokio::sync::mpsc;
1532 ///
1533 /// # #[tokio::main(flavor = "current_thread")]
1534 /// # async fn main() {
1535 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1536 ///
1537 /// assert_eq!(tx.capacity(), 5);
1538 ///
1539 /// // Making a reservation drops the capacity by one.
1540 /// let permit = tx.reserve().await.unwrap();
1541 /// assert_eq!(tx.capacity(), 4);
1542 ///
1543 /// // Sending and receiving a value increases the capacity by one.
1544 /// permit.send(());
1545 /// rx.recv().await.unwrap();
1546 /// assert_eq!(tx.capacity(), 5);
1547 /// # }
1548 /// ```
1549 ///
1550 /// [`send`]: Sender::send
1551 /// [`reserve`]: Sender::reserve
1552 /// [`channel`]: channel
1553 /// [`max_capacity`]: Sender::max_capacity
1554 pub fn capacity(&self) -> usize {
1555 self.chan.semaphore().semaphore.available_permits()
1556 }
1557
1558 /// Converts the `Sender` to a [`WeakSender`] that does not count
1559 /// towards RAII semantics, i.e. if all `Sender` instances of the
1560 /// channel were dropped and only `WeakSender` instances remain,
1561 /// the channel is closed.
1562 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1563 pub fn downgrade(&self) -> WeakSender<T> {
1564 WeakSender {
1565 chan: self.chan.downgrade(),
1566 }
1567 }
1568
1569 /// Returns the maximum buffer capacity of the channel.
1570 ///
1571 /// The maximum capacity is the buffer capacity initially specified when calling
1572 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1573 /// available buffer capacity: as messages are sent and received, the
1574 /// value returned by [`capacity`] will go up or down, whereas the value
1575 /// returned by [`max_capacity`] will remain constant.
1576 ///
1577 /// # Examples
1578 ///
1579 /// ```
1580 /// use tokio::sync::mpsc;
1581 ///
1582 /// # #[tokio::main(flavor = "current_thread")]
1583 /// # async fn main() {
1584 /// let (tx, _rx) = mpsc::channel::<()>(5);
1585 ///
1586 /// // both max capacity and capacity are the same at first
1587 /// assert_eq!(tx.max_capacity(), 5);
1588 /// assert_eq!(tx.capacity(), 5);
1589 ///
1590 /// // Making a reservation doesn't change the max capacity.
1591 /// let permit = tx.reserve().await.unwrap();
1592 /// assert_eq!(tx.max_capacity(), 5);
1593 /// // but drops the capacity by one
1594 /// assert_eq!(tx.capacity(), 4);
1595 /// # }
1596 /// ```
1597 ///
1598 /// [`channel`]: channel
1599 /// [`max_capacity`]: Sender::max_capacity
1600 /// [`capacity`]: Sender::capacity
1601 pub fn max_capacity(&self) -> usize {
1602 self.chan.semaphore().bound
1603 }
1604
1605 /// Returns the number of [`Sender`] handles.
1606 pub fn strong_count(&self) -> usize {
1607 self.chan.strong_count()
1608 }
1609
1610 /// Returns the number of [`WeakSender`] handles.
1611 pub fn weak_count(&self) -> usize {
1612 self.chan.weak_count()
1613 }
1614}
1615
1616impl<T> Clone for Sender<T> {
1617 fn clone(&self) -> Self {
1618 Sender {
1619 chan: self.chan.clone(),
1620 }
1621 }
1622}
1623
1624impl<T> fmt::Debug for Sender<T> {
1625 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1626 fmt.debug_struct("Sender")
1627 .field("chan", &self.chan)
1628 .finish()
1629 }
1630}
1631
1632impl<T> Clone for WeakSender<T> {
1633 fn clone(&self) -> Self {
1634 self.chan.increment_weak_count();
1635
1636 WeakSender {
1637 chan: self.chan.clone(),
1638 }
1639 }
1640}
1641
1642impl<T> Drop for WeakSender<T> {
1643 fn drop(&mut self) {
1644 self.chan.decrement_weak_count();
1645 }
1646}
1647
1648impl<T> WeakSender<T> {
1649 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1650 /// if there are other `Sender` instances alive and the channel wasn't
1651 /// previously dropped, otherwise `None` is returned.
1652 pub fn upgrade(&self) -> Option<Sender<T>> {
1653 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1654 }
1655
1656 /// Returns the number of [`Sender`] handles.
1657 pub fn strong_count(&self) -> usize {
1658 self.chan.strong_count()
1659 }
1660
1661 /// Returns the number of [`WeakSender`] handles.
1662 pub fn weak_count(&self) -> usize {
1663 self.chan.weak_count()
1664 }
1665}
1666
1667impl<T> fmt::Debug for WeakSender<T> {
1668 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1669 fmt.debug_struct("WeakSender").finish()
1670 }
1671}
1672
1673// ===== impl Permit =====
1674
1675impl<T> Permit<'_, T> {
1676 /// Sends a value using the reserved capacity.
1677 ///
1678 /// Capacity for the message has already been reserved. The message is sent
1679 /// to the receiver and the permit is consumed. The operation will succeed
1680 /// even if the receiver half has been closed. See [`Receiver::close`] for
1681 /// more details on performing a clean shutdown.
1682 ///
1683 /// [`Receiver::close`]: Receiver::close
1684 ///
1685 /// # Examples
1686 ///
1687 /// ```
1688 /// use tokio::sync::mpsc;
1689 ///
1690 /// # #[tokio::main(flavor = "current_thread")]
1691 /// # async fn main() {
1692 /// let (tx, mut rx) = mpsc::channel(1);
1693 ///
1694 /// // Reserve capacity
1695 /// let permit = tx.reserve().await.unwrap();
1696 ///
1697 /// // Trying to send directly on the `tx` will fail due to no
1698 /// // available capacity.
1699 /// assert!(tx.try_send(123).is_err());
1700 ///
1701 /// // Send a message on the permit
1702 /// permit.send(456);
1703 ///
1704 /// // The value sent on the permit is received
1705 /// assert_eq!(rx.recv().await.unwrap(), 456);
1706 /// # }
1707 /// ```
1708 pub fn send(self, value: T) {
1709 use std::mem;
1710
1711 self.chan.send(value);
1712
1713 // Avoid the drop logic
1714 mem::forget(self);
1715 }
1716}
1717
1718impl<T> Drop for Permit<'_, T> {
1719 fn drop(&mut self) {
1720 use chan::Semaphore;
1721
1722 let semaphore = self.chan.semaphore();
1723
1724 // Add the permit back to the semaphore
1725 semaphore.add_permit();
1726
1727 // If this is the last sender for this channel, wake the receiver so
1728 // that it can be notified that the channel is closed.
1729 if semaphore.is_closed() && semaphore.is_idle() {
1730 self.chan.wake_rx();
1731 }
1732 }
1733}
1734
1735impl<T> fmt::Debug for Permit<'_, T> {
1736 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1737 fmt.debug_struct("Permit")
1738 .field("chan", &self.chan)
1739 .finish()
1740 }
1741}
1742
1743// ===== impl PermitIterator =====
1744
1745impl<'a, T> Iterator for PermitIterator<'a, T> {
1746 type Item = Permit<'a, T>;
1747
1748 fn next(&mut self) -> Option<Self::Item> {
1749 if self.n == 0 {
1750 return None;
1751 }
1752
1753 self.n -= 1;
1754 Some(Permit { chan: self.chan })
1755 }
1756
1757 fn size_hint(&self) -> (usize, Option<usize>) {
1758 let n = self.n;
1759 (n, Some(n))
1760 }
1761}
1762impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1763impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1764
1765impl<T> Drop for PermitIterator<'_, T> {
1766 fn drop(&mut self) {
1767 use chan::Semaphore;
1768
1769 if self.n == 0 {
1770 return;
1771 }
1772
1773 let semaphore = self.chan.semaphore();
1774
1775 // Add the remaining permits back to the semaphore
1776 semaphore.add_permits(self.n);
1777
1778 // If this is the last sender for this channel, wake the receiver so
1779 // that it can be notified that the channel is closed.
1780 if semaphore.is_closed() && semaphore.is_idle() {
1781 self.chan.wake_rx();
1782 }
1783 }
1784}
1785
1786impl<T> fmt::Debug for PermitIterator<'_, T> {
1787 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1788 fmt.debug_struct("PermitIterator")
1789 .field("chan", &self.chan)
1790 .field("capacity", &self.n)
1791 .finish()
1792 }
1793}
1794
1795// ===== impl Permit =====
1796
1797impl<T> OwnedPermit<T> {
1798 /// Sends a value using the reserved capacity.
1799 ///
1800 /// Capacity for the message has already been reserved. The message is sent
1801 /// to the receiver and the permit is consumed. The operation will succeed
1802 /// even if the receiver half has been closed. See [`Receiver::close`] for
1803 /// more details on performing a clean shutdown.
1804 ///
1805 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1806 /// the `OwnedPermit` was reserved.
1807 ///
1808 /// [`Receiver::close`]: Receiver::close
1809 ///
1810 /// # Examples
1811 ///
1812 /// ```
1813 /// use tokio::sync::mpsc;
1814 ///
1815 /// # #[tokio::main(flavor = "current_thread")]
1816 /// # async fn main() {
1817 /// let (tx, mut rx) = mpsc::channel(1);
1818 ///
1819 /// // Reserve capacity
1820 /// let permit = tx.reserve_owned().await.unwrap();
1821 ///
1822 /// // Send a message on the permit, returning the sender.
1823 /// let tx = permit.send(456);
1824 ///
1825 /// // The value sent on the permit is received
1826 /// assert_eq!(rx.recv().await.unwrap(), 456);
1827 ///
1828 /// // We may now reuse `tx` to send another message.
1829 /// tx.send(789).await.unwrap();
1830 /// # }
1831 /// ```
1832 pub fn send(mut self, value: T) -> Sender<T> {
1833 let chan = self.chan.take().unwrap_or_else(|| {
1834 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1835 });
1836 chan.send(value);
1837
1838 Sender { chan }
1839 }
1840
1841 /// Releases the reserved capacity *without* sending a message, returning the
1842 /// [`Sender`].
1843 ///
1844 /// # Examples
1845 ///
1846 /// ```
1847 /// use tokio::sync::mpsc;
1848 ///
1849 /// # #[tokio::main(flavor = "current_thread")]
1850 /// # async fn main() {
1851 /// let (tx, rx) = mpsc::channel(1);
1852 ///
1853 /// // Clone the sender and reserve capacity
1854 /// let permit = tx.clone().reserve_owned().await.unwrap();
1855 ///
1856 /// // Trying to send on the original `tx` will fail, since the `permit`
1857 /// // has reserved all the available capacity.
1858 /// assert!(tx.try_send(123).is_err());
1859 ///
1860 /// // Release the permit without sending a message, returning the clone
1861 /// // of the sender.
1862 /// let tx2 = permit.release();
1863 ///
1864 /// // We may now reuse `tx` to send another message.
1865 /// tx.send(789).await.unwrap();
1866 /// # drop(rx); drop(tx2);
1867 /// # }
1868 /// ```
1869 ///
1870 /// [`Sender`]: Sender
1871 pub fn release(mut self) -> Sender<T> {
1872 use chan::Semaphore;
1873
1874 let chan = self.chan.take().unwrap_or_else(|| {
1875 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1876 });
1877
1878 // Add the permit back to the semaphore
1879 chan.semaphore().add_permit();
1880 Sender { chan }
1881 }
1882
1883 /// Returns `true` if permits belong to the same channel.
1884 ///
1885 /// # Examples
1886 ///
1887 /// ```
1888 /// use tokio::sync::mpsc;
1889 ///
1890 /// # #[tokio::main(flavor = "current_thread")]
1891 /// # async fn main() {
1892 /// let (tx, rx) = mpsc::channel::<()>(2);
1893 ///
1894 /// let permit1 = tx.clone().reserve_owned().await.unwrap();
1895 /// let permit2 = tx.clone().reserve_owned().await.unwrap();
1896 /// assert!(permit1.same_channel(&permit2));
1897 ///
1898 /// let (tx2, rx2) = mpsc::channel::<()>(1);
1899 ///
1900 /// let permit3 = tx2.clone().reserve_owned().await.unwrap();
1901 /// assert!(!permit3.same_channel(&permit2));
1902 /// # }
1903 /// ```
1904 pub fn same_channel(&self, other: &Self) -> bool {
1905 self.chan
1906 .as_ref()
1907 .zip(other.chan.as_ref())
1908 .is_some_and(|(a, b)| a.same_channel(b))
1909 }
1910
1911 /// Returns `true` if this permit belongs to the same channel as the given [`Sender`].
1912 ///
1913 /// # Examples
1914 ///
1915 /// ```
1916 /// use tokio::sync::mpsc;
1917 ///
1918 /// # #[tokio::main(flavor = "current_thread")]
1919 /// # async fn main() {
1920 /// let (tx, rx) = mpsc::channel::<()>(1);
1921 ///
1922 /// let permit = tx.clone().reserve_owned().await.unwrap();
1923 /// assert!(permit.same_channel_as_sender(&tx));
1924 ///
1925 /// let (tx2, rx2) = mpsc::channel::<()>(1);
1926 /// assert!(!permit.same_channel_as_sender(&tx2));
1927 /// # }
1928 /// ```
1929 pub fn same_channel_as_sender(&self, sender: &Sender<T>) -> bool {
1930 self.chan
1931 .as_ref()
1932 .is_some_and(|chan| chan.same_channel(&sender.chan))
1933 }
1934}
1935
1936impl<T> Drop for OwnedPermit<T> {
1937 fn drop(&mut self) {
1938 use chan::Semaphore;
1939
1940 // Are we still holding onto the sender?
1941 if let Some(chan) = self.chan.take() {
1942 let semaphore = chan.semaphore();
1943
1944 // Add the permit back to the semaphore
1945 semaphore.add_permit();
1946
1947 // If this `OwnedPermit` is holding the last sender for this
1948 // channel, wake the receiver so that it can be notified that the
1949 // channel is closed.
1950 if semaphore.is_closed() && semaphore.is_idle() {
1951 chan.wake_rx();
1952 }
1953 }
1954
1955 // Otherwise, do nothing.
1956 }
1957}
1958
1959impl<T> fmt::Debug for OwnedPermit<T> {
1960 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1961 fmt.debug_struct("OwnedPermit")
1962 .field("chan", &self.chan)
1963 .finish()
1964 }
1965}