tokio/sync/mpsc/
bounded.rs

1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7    use crate::sync::mpsc::error::SendTimeoutError;
8    use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23    chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// # #[tokio::main(flavor = "current_thread")]
44/// # async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// # }
55/// ```
56pub struct WeakSender<T> {
57    chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68    chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79    chan: &'a chan::Tx<T, Semaphore>,
80    n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96    chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107    /// The channel receiver.
108    chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages.  Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0, or too large. Currently the maximum
131/// capacity is [`Semaphore::MAX_PERMITS`].
132///
133/// [`Semaphore::MAX_PERMITS`]: crate::sync::Semaphore::MAX_PERMITS
134///
135/// # Examples
136///
137/// ```rust
138/// use tokio::sync::mpsc;
139///
140/// # #[tokio::main(flavor = "current_thread")]
141/// # async fn main() {
142/// let (tx, mut rx) = mpsc::channel(100);
143///
144/// tokio::spawn(async move {
145///     for i in 0..10 {
146///         if let Err(_) = tx.send(i).await {
147///             println!("receiver dropped");
148///             return;
149///         }
150///     }
151/// });
152///
153/// while let Some(i) = rx.recv().await {
154///      println!("got = {}", i);
155/// }
156/// # }
157/// ```
158#[track_caller]
159pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
160    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
161    let semaphore = Semaphore {
162        semaphore: semaphore::Semaphore::new(buffer),
163        bound: buffer,
164    };
165    let (tx, rx) = chan::channel(semaphore);
166
167    let tx = Sender::new(tx);
168    let rx = Receiver::new(rx);
169
170    (tx, rx)
171}
172
173/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
174/// representing the channel bound.
175#[derive(Debug)]
176pub(crate) struct Semaphore {
177    pub(crate) semaphore: semaphore::Semaphore,
178    pub(crate) bound: usize,
179}
180
181impl<T> Receiver<T> {
182    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
183        Receiver { chan }
184    }
185
186    /// Receives the next value for this receiver.
187    ///
188    /// This method returns `None` if the channel has been closed and there are
189    /// no remaining messages in the channel's buffer. This indicates that no
190    /// further values can ever be received from this `Receiver`. The channel is
191    /// closed when all senders have been dropped, or when [`close`] is called.
192    ///
193    /// If there are no messages in the channel's buffer, but the channel has
194    /// not yet been closed, this method will sleep until a message is sent or
195    /// the channel is closed.  Note that if [`close`] is called, but there are
196    /// still outstanding [`Permits`] from before it was closed, the channel is
197    /// not considered closed by `recv` until the permits are released.
198    ///
199    /// # Cancel safety
200    ///
201    /// This method is cancel safe. If `recv` is used as the event in a
202    /// [`tokio::select!`](crate::select) statement and some other branch
203    /// completes first, it is guaranteed that no messages were received on this
204    /// channel.
205    ///
206    /// [`close`]: Self::close
207    /// [`Permits`]: struct@crate::sync::mpsc::Permit
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use tokio::sync::mpsc;
213    ///
214    /// # #[tokio::main(flavor = "current_thread")]
215    /// # async fn main() {
216    /// let (tx, mut rx) = mpsc::channel(100);
217    ///
218    /// tokio::spawn(async move {
219    ///     tx.send("hello").await.unwrap();
220    /// });
221    ///
222    /// assert_eq!(Some("hello"), rx.recv().await);
223    /// assert_eq!(None, rx.recv().await);
224    /// # }
225    /// ```
226    ///
227    /// Values are buffered:
228    ///
229    /// ```
230    /// use tokio::sync::mpsc;
231    ///
232    /// # #[tokio::main(flavor = "current_thread")]
233    /// # async fn main() {
234    /// let (tx, mut rx) = mpsc::channel(100);
235    ///
236    /// tx.send("hello").await.unwrap();
237    /// tx.send("world").await.unwrap();
238    ///
239    /// assert_eq!(Some("hello"), rx.recv().await);
240    /// assert_eq!(Some("world"), rx.recv().await);
241    /// # }
242    /// ```
243    pub async fn recv(&mut self) -> Option<T> {
244        use std::future::poll_fn;
245        poll_fn(|cx| self.chan.recv(cx)).await
246    }
247
248    /// Receives the next values for this receiver and extends `buffer`.
249    ///
250    /// This method extends `buffer` by no more than a fixed number of values
251    /// as specified by `limit`. If `limit` is zero, the function immediately
252    /// returns `0`. The return value is the number of values added to `buffer`.
253    ///
254    /// For `limit > 0`, if there are no messages in the channel's queue, but
255    /// the channel has not yet been closed, this method will sleep until a
256    /// message is sent or the channel is closed. Note that if [`close`] is
257    /// called, but there are still outstanding [`Permits`] from before it was
258    /// closed, the channel is not considered closed by `recv_many` until the
259    /// permits are released.
260    ///
261    /// For non-zero values of `limit`, this method will never return `0` unless
262    /// the channel has been closed and there are no remaining messages in the
263    /// channel's queue. This indicates that no further values can ever be
264    /// received from this `Receiver`. The channel is closed when all senders
265    /// have been dropped, or when [`close`] is called.
266    ///
267    /// The capacity of `buffer` is increased as needed.
268    ///
269    /// # Cancel safety
270    ///
271    /// This method is cancel safe. If `recv_many` is used as the event in a
272    /// [`tokio::select!`](crate::select) statement and some other branch
273    /// completes first, it is guaranteed that no messages were received on this
274    /// channel.
275    ///
276    /// [`close`]: Self::close
277    /// [`Permits`]: struct@crate::sync::mpsc::Permit
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// use tokio::sync::mpsc;
283    ///
284    /// # #[tokio::main(flavor = "current_thread")]
285    /// # async fn main() {
286    /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
287    /// let limit = 2;
288    /// let (tx, mut rx) = mpsc::channel(100);
289    /// let tx2 = tx.clone();
290    /// tx2.send("first").await.unwrap();
291    /// tx2.send("second").await.unwrap();
292    /// tx2.send("third").await.unwrap();
293    ///
294    /// // Call `recv_many` to receive up to `limit` (2) values.
295    /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
296    /// assert_eq!(vec!["first", "second"], buffer);
297    ///
298    /// // If the buffer is full, the next call to `recv_many`
299    /// // reserves additional capacity.
300    /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
301    ///
302    /// tokio::spawn(async move {
303    ///     tx.send("fourth").await.unwrap();
304    /// });
305    ///
306    /// // 'tx' is dropped, but `recv_many`
307    /// // is guaranteed not to return 0 as the channel
308    /// // is not yet closed.
309    /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
310    /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
311    ///
312    /// // Once the last sender is dropped, the channel is
313    /// // closed and `recv_many` returns 0, capacity unchanged.
314    /// drop(tx2);
315    /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
316    /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
317    /// # }
318    /// ```
319    pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
320        use std::future::poll_fn;
321        poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
322    }
323
324    /// Tries to receive the next value for this receiver.
325    ///
326    /// This method returns the [`Empty`] error if the channel is currently
327    /// empty, but there are still outstanding [senders] or [permits].
328    ///
329    /// This method returns the [`Disconnected`] error if the channel is
330    /// currently empty, and there are no outstanding [senders] or [permits].
331    ///
332    /// Unlike the [`poll_recv`] method, this method will never return an
333    /// [`Empty`] error spuriously.
334    ///
335    /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
336    /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
337    /// [`poll_recv`]: Self::poll_recv
338    /// [senders]: crate::sync::mpsc::Sender
339    /// [permits]: crate::sync::mpsc::Permit
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use tokio::sync::mpsc;
345    /// use tokio::sync::mpsc::error::TryRecvError;
346    ///
347    /// # #[tokio::main(flavor = "current_thread")]
348    /// # async fn main() {
349    /// let (tx, mut rx) = mpsc::channel(100);
350    ///
351    /// tx.send("hello").await.unwrap();
352    ///
353    /// assert_eq!(Ok("hello"), rx.try_recv());
354    /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
355    ///
356    /// tx.send("hello").await.unwrap();
357    /// // Drop the last sender, closing the channel.
358    /// drop(tx);
359    ///
360    /// assert_eq!(Ok("hello"), rx.try_recv());
361    /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
362    /// # }
363    /// ```
364    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
365        self.chan.try_recv()
366    }
367
368    /// Blocking receive to call outside of asynchronous contexts.
369    ///
370    /// This method returns `None` if the channel has been closed and there are
371    /// no remaining messages in the channel's buffer. This indicates that no
372    /// further values can ever be received from this `Receiver`. The channel is
373    /// closed when all senders have been dropped, or when [`close`] is called.
374    ///
375    /// If there are no messages in the channel's buffer, but the channel has
376    /// not yet been closed, this method will block until a message is sent or
377    /// the channel is closed.
378    ///
379    /// This method is intended for use cases where you are sending from
380    /// asynchronous code to synchronous code, and will work even if the sender
381    /// is not using [`blocking_send`] to send the message.
382    ///
383    /// Note that if [`close`] is called, but there are still outstanding
384    /// [`Permits`] from before it was closed, the channel is not considered
385    /// closed by `blocking_recv` until the permits are released.
386    ///
387    /// [`close`]: Self::close
388    /// [`Permits`]: struct@crate::sync::mpsc::Permit
389    /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
390    ///
391    /// # Panics
392    ///
393    /// This function panics if called within an asynchronous execution
394    /// context.
395    ///
396    /// # Examples
397    ///
398    /// ```
399    /// # #[cfg(not(target_family = "wasm"))]
400    /// # {
401    /// use std::thread;
402    /// use tokio::runtime::Runtime;
403    /// use tokio::sync::mpsc;
404    ///
405    /// fn main() {
406    ///     let (tx, mut rx) = mpsc::channel::<u8>(10);
407    ///
408    ///     let sync_code = thread::spawn(move || {
409    ///         assert_eq!(Some(10), rx.blocking_recv());
410    ///     });
411    ///
412    ///     Runtime::new()
413    ///         .unwrap()
414    ///         .block_on(async move {
415    ///             let _ = tx.send(10).await;
416    ///         });
417    ///     sync_code.join().unwrap()
418    /// }
419    /// # }
420    /// ```
421    #[track_caller]
422    #[cfg(feature = "sync")]
423    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
424    pub fn blocking_recv(&mut self) -> Option<T> {
425        crate::future::block_on(self.recv())
426    }
427
428    /// Variant of [`Self::recv_many`] for blocking contexts.
429    ///
430    /// The same conditions as in [`Self::blocking_recv`] apply.
431    #[track_caller]
432    #[cfg(feature = "sync")]
433    #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
434    pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
435        crate::future::block_on(self.recv_many(buffer, limit))
436    }
437
438    /// Closes the receiving half of a channel without dropping it.
439    ///
440    /// This prevents any further messages from being sent on the channel while
441    /// still enabling the receiver to drain messages that are buffered. Any
442    /// outstanding [`Permit`] values will still be able to send messages.
443    ///
444    /// To guarantee that no messages are dropped, after calling `close()`,
445    /// `recv()` must be called until `None` is returned. If there are
446    /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
447    /// not return `None` until those are released.
448    ///
449    /// [`Permit`]: Permit
450    /// [`OwnedPermit`]: OwnedPermit
451    ///
452    /// # Examples
453    ///
454    /// ```
455    /// use tokio::sync::mpsc;
456    ///
457    /// # #[tokio::main(flavor = "current_thread")]
458    /// # async fn main() {
459    /// let (tx, mut rx) = mpsc::channel(20);
460    ///
461    /// tokio::spawn(async move {
462    ///     let mut i = 0;
463    ///     while let Ok(permit) = tx.reserve().await {
464    ///         permit.send(i);
465    ///         i += 1;
466    ///     }
467    /// });
468    ///
469    /// rx.close();
470    ///
471    /// while let Some(msg) = rx.recv().await {
472    ///     println!("got {}", msg);
473    /// }
474    ///
475    /// // Channel closed and no messages are lost.
476    /// # }
477    /// ```
478    pub fn close(&mut self) {
479        self.chan.close();
480    }
481
482    /// Checks if a channel is closed.
483    ///
484    /// This method returns `true` if the channel has been closed. The channel is closed
485    /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
486    ///
487    /// [`Sender`]: crate::sync::mpsc::Sender
488    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
489    ///
490    /// # Examples
491    /// ```
492    /// use tokio::sync::mpsc;
493    ///
494    /// # #[tokio::main(flavor = "current_thread")]
495    /// # async fn main() {
496    /// let (_tx, mut rx) = mpsc::channel::<()>(10);
497    /// assert!(!rx.is_closed());
498    ///
499    /// rx.close();
500    ///
501    /// assert!(rx.is_closed());
502    /// # }
503    /// ```
504    pub fn is_closed(&self) -> bool {
505        self.chan.is_closed()
506    }
507
508    /// Checks if a channel is empty.
509    ///
510    /// This method returns `true` if the channel has no messages.
511    ///
512    /// # Examples
513    /// ```
514    /// use tokio::sync::mpsc;
515    ///
516    /// # #[tokio::main(flavor = "current_thread")]
517    /// # async fn main() {
518    /// let (tx, rx) = mpsc::channel(10);
519    /// assert!(rx.is_empty());
520    ///
521    /// tx.send(0).await.unwrap();
522    /// assert!(!rx.is_empty());
523    /// # }
524    ///
525    /// ```
526    pub fn is_empty(&self) -> bool {
527        self.chan.is_empty()
528    }
529
530    /// Returns the number of messages in the channel.
531    ///
532    /// # Examples
533    /// ```
534    /// use tokio::sync::mpsc;
535    ///
536    /// # #[tokio::main(flavor = "current_thread")]
537    /// # async fn main() {
538    /// let (tx, rx) = mpsc::channel(10);
539    /// assert_eq!(0, rx.len());
540    ///
541    /// tx.send(0).await.unwrap();
542    /// assert_eq!(1, rx.len());
543    /// # }
544    /// ```
545    pub fn len(&self) -> usize {
546        self.chan.len()
547    }
548
549    /// Returns the current number of reservations which can immediately be
550    /// returned from the channel.
551    ///
552    /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
553    /// capacity with [`Sender::reserve`].
554    ///
555    /// The capacity goes up when values are received, unless there are
556    /// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`]
557    /// which have returned [`Poll::Pending`]. While those calls exist, reading
558    /// values from the [`Receiver`] transfers access to that capacity directly to
559    /// those callers, in FIFO order.
560    ///
561    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
562    /// specified when calling [`channel`].
563    ///
564    /// # Examples
565    ///
566    /// ```
567    /// use tokio::sync::mpsc;
568    ///
569    /// # #[tokio::main(flavor = "current_thread")]
570    /// # async fn main() {
571    /// let (tx, mut rx) = mpsc::channel::<()>(5);
572    ///
573    /// assert_eq!(rx.capacity(), 5);
574    ///
575    /// // Making a reservation drops the capacity by one.
576    /// let permit = tx.reserve().await.unwrap();
577    /// assert_eq!(rx.capacity(), 4);
578    /// assert_eq!(rx.len(), 0);
579    ///
580    /// // Sending and receiving a value increases the capacity by one.
581    /// permit.send(());
582    /// assert_eq!(rx.len(), 1);
583    /// rx.recv().await.unwrap();
584    /// assert_eq!(rx.capacity(), 5);
585    ///
586    /// // Directly sending a message drops the capacity by one.
587    /// tx.send(()).await.unwrap();
588    /// assert_eq!(rx.capacity(), 4);
589    /// assert_eq!(rx.len(), 1);
590    ///
591    /// // Receiving the message increases the capacity by one.
592    /// rx.recv().await.unwrap();
593    /// assert_eq!(rx.capacity(), 5);
594    /// assert_eq!(rx.len(), 0);
595    /// # }
596    /// ```
597    /// [`capacity`]: Receiver::capacity
598    /// [`max_capacity`]: Receiver::max_capacity
599    pub fn capacity(&self) -> usize {
600        self.chan.semaphore().semaphore.available_permits()
601    }
602
603    /// Returns the maximum buffer capacity of the channel.
604    ///
605    /// The maximum capacity is the buffer capacity initially specified when calling
606    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
607    /// available buffer capacity: as messages are sent and received, the value
608    /// returned by [`capacity`] will go up or down, whereas the value
609    /// returned by [`max_capacity`] will remain constant.
610    ///
611    /// # Examples
612    ///
613    /// ```
614    /// use tokio::sync::mpsc;
615    ///
616    /// # #[tokio::main(flavor = "current_thread")]
617    /// # async fn main() {
618    /// let (tx, rx) = mpsc::channel::<()>(5);
619    ///
620    /// // both max capacity and capacity are the same at first
621    /// assert_eq!(rx.max_capacity(), 5);
622    /// assert_eq!(rx.capacity(), 5);
623    ///
624    /// // Making a reservation doesn't change the max capacity.
625    /// let permit = tx.reserve().await.unwrap();
626    /// assert_eq!(rx.max_capacity(), 5);
627    /// // but drops the capacity by one
628    /// assert_eq!(rx.capacity(), 4);
629    /// # }
630    /// ```
631    /// [`capacity`]: Receiver::capacity
632    /// [`max_capacity`]: Receiver::max_capacity
633    pub fn max_capacity(&self) -> usize {
634        self.chan.semaphore().bound
635    }
636
637    /// Polls to receive the next message on this channel.
638    ///
639    /// This method returns:
640    ///
641    ///  * `Poll::Pending` if no messages are available but the channel is not
642    ///    closed, or if a spurious failure happens.
643    ///  * `Poll::Ready(Some(message))` if a message is available.
644    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
645    ///    sent before it was closed have been received.
646    ///
647    /// When the method returns `Poll::Pending`, the `Waker` in the provided
648    /// `Context` is scheduled to receive a wakeup when a message is sent on any
649    /// receiver, or when the channel is closed.  Note that on multiple calls to
650    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
651    /// passed to the most recent call is scheduled to receive a wakeup.
652    ///
653    /// If this method returns `Poll::Pending` due to a spurious failure, then
654    /// the `Waker` will be notified when the situation causing the spurious
655    /// failure has been resolved. Note that receiving such a wakeup does not
656    /// guarantee that the next call will succeed — it could fail with another
657    /// spurious failure.
658    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
659        self.chan.recv(cx)
660    }
661
662    /// Polls to receive multiple messages on this channel, extending the provided buffer.
663    ///
664    /// This method returns:
665    /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
666    ///   spurious failure happens.
667    /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
668    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
669    /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
670    ///
671    /// When the method returns `Poll::Pending`, the `Waker` in the provided
672    /// `Context` is scheduled to receive a wakeup when a message is sent on any
673    /// receiver, or when the channel is closed.  Note that on multiple calls to
674    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
675    /// passed to the most recent call is scheduled to receive a wakeup.
676    ///
677    /// Note that this method does not guarantee that exactly `limit` messages
678    /// are received. Rather, if at least one message is available, it returns
679    /// as many messages as it can up to the given limit. This method returns
680    /// zero only if the channel is closed (or if `limit` is zero).
681    ///
682    /// # Examples
683    ///
684    /// ```
685    /// use std::task::{Context, Poll};
686    /// use std::pin::Pin;
687    /// use tokio::sync::mpsc;
688    /// use futures::Future;
689    ///
690    /// struct MyReceiverFuture<'a> {
691    ///     receiver: mpsc::Receiver<i32>,
692    ///     buffer: &'a mut Vec<i32>,
693    ///     limit: usize,
694    /// }
695    ///
696    /// impl<'a> Future for MyReceiverFuture<'a> {
697    ///     type Output = usize; // Number of messages received
698    ///
699    ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
700    ///         let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
701    ///
702    ///         // Now `receiver` and `buffer` are mutable references, and `limit` is copied
703    ///         match receiver.poll_recv_many(cx, *buffer, *limit) {
704    ///             Poll::Pending => Poll::Pending,
705    ///             Poll::Ready(count) => Poll::Ready(count),
706    ///         }
707    ///     }
708    /// }
709    ///
710    /// # #[tokio::main(flavor = "current_thread")]
711    /// # async fn main() {
712    /// let (tx, rx) = mpsc::channel(32);
713    /// let mut buffer = Vec::new();
714    ///
715    /// let my_receiver_future = MyReceiverFuture {
716    ///     receiver: rx,
717    ///     buffer: &mut buffer,
718    ///     limit: 3,
719    /// };
720    ///
721    /// for i in 0..10 {
722    ///     tx.send(i).await.unwrap();
723    /// }
724    ///
725    /// let count = my_receiver_future.await;
726    /// assert_eq!(count, 3);
727    /// assert_eq!(buffer, vec![0,1,2])
728    /// # }
729    /// ```
730    pub fn poll_recv_many(
731        &mut self,
732        cx: &mut Context<'_>,
733        buffer: &mut Vec<T>,
734        limit: usize,
735    ) -> Poll<usize> {
736        self.chan.recv_many(cx, buffer, limit)
737    }
738
739    /// Returns the number of [`Sender`] handles.
740    pub fn sender_strong_count(&self) -> usize {
741        self.chan.sender_strong_count()
742    }
743
744    /// Returns the number of [`WeakSender`] handles.
745    pub fn sender_weak_count(&self) -> usize {
746        self.chan.sender_weak_count()
747    }
748}
749
750impl<T> fmt::Debug for Receiver<T> {
751    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
752        fmt.debug_struct("Receiver")
753            .field("chan", &self.chan)
754            .finish()
755    }
756}
757
758impl<T> Unpin for Receiver<T> {}
759
760impl<T> Sender<T> {
761    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
762        Sender { chan }
763    }
764
765    /// Sends a value, waiting until there is capacity.
766    ///
767    /// A successful send occurs when it is determined that the other end of the
768    /// channel has not hung up already. An unsuccessful send would be one where
769    /// the corresponding receiver has already been closed. Note that a return
770    /// value of `Err` means that the data will never be received, but a return
771    /// value of `Ok` does not mean that the data will be received. It is
772    /// possible for the corresponding receiver to hang up immediately after
773    /// this function returns `Ok`.
774    ///
775    /// # Errors
776    ///
777    /// If the receive half of the channel is closed, either due to [`close`]
778    /// being called or the [`Receiver`] handle dropping, the function returns
779    /// an error. The error includes the value passed to `send`.
780    ///
781    /// [`close`]: Receiver::close
782    /// [`Receiver`]: Receiver
783    ///
784    /// # Cancel safety
785    ///
786    /// If `send` is used as the event in a [`tokio::select!`](crate::select)
787    /// statement and some other branch completes first, then it is guaranteed
788    /// that the message was not sent. **However, in that case, the message
789    /// is dropped and will be lost.**
790    ///
791    /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
792    /// capacity, then use the returned [`Permit`] to send the message.
793    ///
794    /// This channel uses a queue to ensure that calls to `send` and `reserve`
795    /// are granted capacity in the order they were requested. Cancelling a
796    /// call to `send` makes you lose your place in the queue.
797    ///
798    /// # Examples
799    ///
800    /// In the following example, each call to `send` will block until the
801    /// previously sent value was received.
802    ///
803    /// ```rust
804    /// use tokio::sync::mpsc;
805    ///
806    /// # #[tokio::main(flavor = "current_thread")]
807    /// # async fn main() {
808    /// let (tx, mut rx) = mpsc::channel(1);
809    ///
810    /// tokio::spawn(async move {
811    ///     for i in 0..10 {
812    ///         if let Err(_) = tx.send(i).await {
813    ///             println!("receiver dropped");
814    ///             return;
815    ///         }
816    ///     }
817    /// });
818    ///
819    /// while let Some(i) = rx.recv().await {
820    ///     println!("got = {}", i);
821    /// }
822    /// # }
823    /// ```
824    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
825        match self.reserve().await {
826            Ok(permit) => {
827                permit.send(value);
828                Ok(())
829            }
830            Err(_) => Err(SendError(value)),
831        }
832    }
833
834    /// Completes when the receiver has dropped.
835    ///
836    /// This allows the producers to get notified when interest in the produced
837    /// values is canceled and immediately stop doing work.
838    ///
839    /// # Cancel safety
840    ///
841    /// This method is cancel safe. Once the channel is closed, it stays closed
842    /// forever and all future calls to `closed` will return immediately.
843    ///
844    /// # Examples
845    ///
846    /// ```
847    /// use tokio::sync::mpsc;
848    ///
849    /// # #[tokio::main(flavor = "current_thread")]
850    /// # async fn main() {
851    /// let (tx1, rx) = mpsc::channel::<()>(1);
852    /// let tx2 = tx1.clone();
853    /// let tx3 = tx1.clone();
854    /// let tx4 = tx1.clone();
855    /// let tx5 = tx1.clone();
856    /// tokio::spawn(async move {
857    ///     drop(rx);
858    /// });
859    ///
860    /// futures::join!(
861    ///     tx1.closed(),
862    ///     tx2.closed(),
863    ///     tx3.closed(),
864    ///     tx4.closed(),
865    ///     tx5.closed()
866    /// );
867    /// println!("Receiver dropped");
868    /// # }
869    /// ```
870    pub async fn closed(&self) {
871        self.chan.closed().await;
872    }
873
874    /// Attempts to immediately send a message on this `Sender`
875    ///
876    /// This method differs from [`send`] by returning immediately if the channel's
877    /// buffer is full or no receiver is waiting to acquire some data. Compared
878    /// with [`send`], this function has two failure cases instead of one (one for
879    /// disconnection, one for a full buffer).
880    ///
881    /// # Errors
882    ///
883    /// If the channel capacity has been reached, i.e., the channel has `n`
884    /// buffered values where `n` is the argument passed to [`channel`], then an
885    /// error is returned.
886    ///
887    /// If the receive half of the channel is closed, either due to [`close`]
888    /// being called or the [`Receiver`] handle dropping, the function returns
889    /// an error. The error includes the value passed to `send`.
890    ///
891    /// [`send`]: Sender::send
892    /// [`channel`]: channel
893    /// [`close`]: Receiver::close
894    ///
895    /// # Examples
896    ///
897    /// ```
898    /// use tokio::sync::mpsc;
899    ///
900    /// # #[tokio::main(flavor = "current_thread")]
901    /// # async fn main() {
902    /// // Create a channel with buffer size 1
903    /// let (tx1, mut rx) = mpsc::channel(1);
904    /// let tx2 = tx1.clone();
905    ///
906    /// tokio::spawn(async move {
907    ///     tx1.send(1).await.unwrap();
908    ///     tx1.send(2).await.unwrap();
909    ///     // task waits until the receiver receives a value.
910    /// });
911    ///
912    /// tokio::spawn(async move {
913    ///     // This will return an error and send
914    ///     // no message if the buffer is full
915    ///     let _ = tx2.try_send(3);
916    /// });
917    ///
918    /// let mut msg;
919    /// msg = rx.recv().await.unwrap();
920    /// println!("message {} received", msg);
921    ///
922    /// msg = rx.recv().await.unwrap();
923    /// println!("message {} received", msg);
924    ///
925    /// // Third message may have never been sent
926    /// match rx.recv().await {
927    ///     Some(msg) => println!("message {} received", msg),
928    ///     None => println!("the third message was never sent"),
929    /// }
930    /// # }
931    /// ```
932    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
933        match self.chan.semaphore().semaphore.try_acquire(1) {
934            Ok(()) => {}
935            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
936            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
937        }
938
939        // Send the message
940        self.chan.send(message);
941        Ok(())
942    }
943
944    /// Sends a value, waiting until there is capacity, but only for a limited time.
945    ///
946    /// Shares the same success and error conditions as [`send`], adding one more
947    /// condition for an unsuccessful send, which is when the provided timeout has
948    /// elapsed, and there is no capacity available.
949    ///
950    /// [`send`]: Sender::send
951    ///
952    /// # Errors
953    ///
954    /// If the receive half of the channel is closed, either due to [`close`]
955    /// being called or the [`Receiver`] having been dropped,
956    /// the function returns an error. The error includes the value passed to `send`.
957    ///
958    /// [`close`]: Receiver::close
959    /// [`Receiver`]: Receiver
960    ///
961    /// # Panics
962    ///
963    /// This function panics if it is called outside the context of a Tokio
964    /// runtime [with time enabled](crate::runtime::Builder::enable_time).
965    ///
966    /// # Examples
967    ///
968    /// In the following example, each call to `send_timeout` will block until the
969    /// previously sent value was received, unless the timeout has elapsed.
970    ///
971    /// ```rust
972    /// use tokio::sync::mpsc;
973    /// use tokio::time::{sleep, Duration};
974    ///
975    /// # #[tokio::main(flavor = "current_thread")]
976    /// # async fn main() {
977    /// let (tx, mut rx) = mpsc::channel(1);
978    ///
979    /// tokio::spawn(async move {
980    ///     for i in 0..10 {
981    ///         if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
982    ///             println!("send error: #{:?}", e);
983    ///             return;
984    ///         }
985    ///     }
986    /// });
987    ///
988    /// while let Some(i) = rx.recv().await {
989    ///     println!("got = {}", i);
990    ///     sleep(Duration::from_millis(200)).await;
991    /// }
992    /// # }
993    /// ```
994    #[cfg(feature = "time")]
995    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
996    pub async fn send_timeout(
997        &self,
998        value: T,
999        timeout: Duration,
1000    ) -> Result<(), SendTimeoutError<T>> {
1001        let permit = match crate::time::timeout(timeout, self.reserve()).await {
1002            Err(_) => {
1003                return Err(SendTimeoutError::Timeout(value));
1004            }
1005            Ok(Err(_)) => {
1006                return Err(SendTimeoutError::Closed(value));
1007            }
1008            Ok(Ok(permit)) => permit,
1009        };
1010
1011        permit.send(value);
1012        Ok(())
1013    }
1014
1015    /// Blocking send to call outside of asynchronous contexts.
1016    ///
1017    /// This method is intended for use cases where you are sending from
1018    /// synchronous code to asynchronous code, and will work even if the
1019    /// receiver is not using [`blocking_recv`] to receive the message.
1020    ///
1021    /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1022    ///
1023    /// # Panics
1024    ///
1025    /// This function panics if called within an asynchronous execution
1026    /// context.
1027    ///
1028    /// # Examples
1029    ///
1030    /// ```
1031    /// # #[cfg(not(target_family = "wasm"))]
1032    /// # {
1033    /// use std::thread;
1034    /// use tokio::runtime::Runtime;
1035    /// use tokio::sync::mpsc;
1036    ///
1037    /// fn main() {
1038    ///     let (tx, mut rx) = mpsc::channel::<u8>(1);
1039    ///
1040    ///     let sync_code = thread::spawn(move || {
1041    ///         tx.blocking_send(10).unwrap();
1042    ///     });
1043    ///
1044    ///     Runtime::new().unwrap().block_on(async move {
1045    ///         assert_eq!(Some(10), rx.recv().await);
1046    ///     });
1047    ///     sync_code.join().unwrap()
1048    /// }
1049    /// # }
1050    /// ```
1051    #[track_caller]
1052    #[cfg(feature = "sync")]
1053    #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1054    pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1055        crate::future::block_on(self.send(value))
1056    }
1057
1058    /// Checks if the channel has been closed. This happens when the
1059    /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1060    /// called.
1061    ///
1062    /// [`Receiver`]: crate::sync::mpsc::Receiver
1063    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1064    ///
1065    /// ```
1066    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1067    /// assert!(!tx.is_closed());
1068    ///
1069    /// let tx2 = tx.clone();
1070    /// assert!(!tx2.is_closed());
1071    ///
1072    /// drop(rx);
1073    /// assert!(tx.is_closed());
1074    /// assert!(tx2.is_closed());
1075    /// ```
1076    pub fn is_closed(&self) -> bool {
1077        self.chan.is_closed()
1078    }
1079
1080    /// Waits for channel capacity. Once capacity to send one message is
1081    /// available, it is reserved for the caller.
1082    ///
1083    /// If the channel is full, the function waits for the number of unreceived
1084    /// messages to become less than the channel capacity. Capacity to send one
1085    /// message is reserved for the caller. A [`Permit`] is returned to track
1086    /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1087    /// reserved capacity.
1088    ///
1089    /// Dropping [`Permit`] without sending a message releases the capacity back
1090    /// to the channel.
1091    ///
1092    /// [`Permit`]: Permit
1093    /// [`send`]: Permit::send
1094    ///
1095    /// # Cancel safety
1096    ///
1097    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1098    /// are granted capacity in the order they were requested. Cancelling a
1099    /// call to `reserve` makes you lose your place in the queue.
1100    ///
1101    /// # Examples
1102    ///
1103    /// ```
1104    /// use tokio::sync::mpsc;
1105    ///
1106    /// # #[tokio::main(flavor = "current_thread")]
1107    /// # async fn main() {
1108    /// let (tx, mut rx) = mpsc::channel(1);
1109    ///
1110    /// // Reserve capacity
1111    /// let permit = tx.reserve().await.unwrap();
1112    ///
1113    /// // Trying to send directly on the `tx` will fail due to no
1114    /// // available capacity.
1115    /// assert!(tx.try_send(123).is_err());
1116    ///
1117    /// // Sending on the permit succeeds
1118    /// permit.send(456);
1119    ///
1120    /// // The value sent on the permit is received
1121    /// assert_eq!(rx.recv().await.unwrap(), 456);
1122    /// # }
1123    /// ```
1124    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1125        self.reserve_inner(1).await?;
1126        Ok(Permit { chan: &self.chan })
1127    }
1128
1129    /// Waits for channel capacity. Once capacity to send `n` messages is
1130    /// available, it is reserved for the caller.
1131    ///
1132    /// If the channel is full or if there are fewer than `n` permits available, the function waits
1133    /// for the number of unreceived messages to become `n` less than the channel capacity.
1134    /// Capacity to send `n` message is then reserved for the caller.
1135    ///
1136    /// A [`PermitIterator`] is returned to track the reserved capacity.
1137    /// You can call this [`Iterator`] until it is exhausted to
1138    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1139    /// [`try_reserve_many`] except it awaits for the slots to become available.
1140    ///
1141    /// If the channel is closed, the function returns a [`SendError`].
1142    ///
1143    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1144    /// permits back to the channel.
1145    ///
1146    /// [`PermitIterator`]: PermitIterator
1147    /// [`Permit`]: Permit
1148    /// [`send`]: Permit::send
1149    /// [`try_reserve_many`]: Sender::try_reserve_many
1150    ///
1151    /// # Cancel safety
1152    ///
1153    /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1154    /// are granted capacity in the order they were requested. Cancelling a call to
1155    /// `reserve_many` makes you lose your place in the queue.
1156    ///
1157    /// # Examples
1158    ///
1159    /// ```
1160    /// use tokio::sync::mpsc;
1161    ///
1162    /// # #[tokio::main(flavor = "current_thread")]
1163    /// # async fn main() {
1164    /// let (tx, mut rx) = mpsc::channel(2);
1165    ///
1166    /// // Reserve capacity
1167    /// let mut permit = tx.reserve_many(2).await.unwrap();
1168    ///
1169    /// // Trying to send directly on the `tx` will fail due to no
1170    /// // available capacity.
1171    /// assert!(tx.try_send(123).is_err());
1172    ///
1173    /// // Sending with the permit iterator succeeds
1174    /// permit.next().unwrap().send(456);
1175    /// permit.next().unwrap().send(457);
1176    ///
1177    /// // The iterator should now be exhausted
1178    /// assert!(permit.next().is_none());
1179    ///
1180    /// // The value sent on the permit is received
1181    /// assert_eq!(rx.recv().await.unwrap(), 456);
1182    /// assert_eq!(rx.recv().await.unwrap(), 457);
1183    /// # }
1184    /// ```
1185    pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1186        self.reserve_inner(n).await?;
1187        Ok(PermitIterator {
1188            chan: &self.chan,
1189            n,
1190        })
1191    }
1192
1193    /// Waits for channel capacity, moving the `Sender` and returning an owned
1194    /// permit. Once capacity to send one message is available, it is reserved
1195    /// for the caller.
1196    ///
1197    /// This moves the sender _by value_, and returns an owned permit that can
1198    /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1199    /// this method may be used in cases where the permit must be valid for the
1200    /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1201    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1202    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1203    /// moved, it can be cloned prior to calling `reserve_owned`.
1204    ///
1205    /// If the channel is full, the function waits for the number of unreceived
1206    /// messages to become less than the channel capacity. Capacity to send one
1207    /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1208    /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1209    /// consumes the reserved capacity.
1210    ///
1211    /// Dropping the [`OwnedPermit`] without sending a message releases the
1212    /// capacity back to the channel.
1213    ///
1214    /// # Cancel safety
1215    ///
1216    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1217    /// are granted capacity in the order they were requested. Cancelling a
1218    /// call to `reserve_owned` makes you lose your place in the queue.
1219    ///
1220    /// # Examples
1221    /// Sending a message using an [`OwnedPermit`]:
1222    /// ```
1223    /// use tokio::sync::mpsc;
1224    ///
1225    /// # #[tokio::main(flavor = "current_thread")]
1226    /// # async fn main() {
1227    /// let (tx, mut rx) = mpsc::channel(1);
1228    ///
1229    /// // Reserve capacity, moving the sender.
1230    /// let permit = tx.reserve_owned().await.unwrap();
1231    ///
1232    /// // Send a message, consuming the permit and returning
1233    /// // the moved sender.
1234    /// let tx = permit.send(123);
1235    ///
1236    /// // The value sent on the permit is received.
1237    /// assert_eq!(rx.recv().await.unwrap(), 123);
1238    ///
1239    /// // The sender can now be used again.
1240    /// tx.send(456).await.unwrap();
1241    /// # }
1242    /// ```
1243    ///
1244    /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1245    /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1246    ///
1247    /// ```
1248    /// use tokio::sync::mpsc;
1249    ///
1250    /// # #[tokio::main(flavor = "current_thread")]
1251    /// # async fn main() {
1252    /// let (tx, mut rx) = mpsc::channel(1);
1253    ///
1254    /// // Clone the sender and reserve capacity.
1255    /// let permit = tx.clone().reserve_owned().await.unwrap();
1256    ///
1257    /// // Trying to send directly on the `tx` will fail due to no
1258    /// // available capacity.
1259    /// assert!(tx.try_send(123).is_err());
1260    ///
1261    /// // Sending on the permit succeeds.
1262    /// permit.send(456);
1263    ///
1264    /// // The value sent on the permit is received
1265    /// assert_eq!(rx.recv().await.unwrap(), 456);
1266    /// # }
1267    /// ```
1268    ///
1269    /// [`Sender::reserve`]: Sender::reserve
1270    /// [`OwnedPermit`]: OwnedPermit
1271    /// [`send`]: OwnedPermit::send
1272    /// [`Arc::clone`]: std::sync::Arc::clone
1273    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1274        self.reserve_inner(1).await?;
1275        Ok(OwnedPermit {
1276            chan: Some(self.chan),
1277        })
1278    }
1279
1280    async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1281        crate::trace::async_trace_leaf().await;
1282
1283        if n > self.max_capacity() {
1284            return Err(SendError(()));
1285        }
1286        match self.chan.semaphore().semaphore.acquire(n).await {
1287            Ok(()) => Ok(()),
1288            Err(_) => Err(SendError(())),
1289        }
1290    }
1291
1292    /// Tries to acquire a slot in the channel without waiting for the slot to become
1293    /// available.
1294    ///
1295    /// If the channel is full this function will return [`TrySendError`], otherwise
1296    /// if there is a slot available it will return a [`Permit`] that will then allow you
1297    /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1298    /// [`reserve`] except it does not await for the slot to become available.
1299    ///
1300    /// Dropping [`Permit`] without sending a message releases the capacity back
1301    /// to the channel.
1302    ///
1303    /// [`Permit`]: Permit
1304    /// [`send`]: Permit::send
1305    /// [`reserve`]: Sender::reserve
1306    ///
1307    /// # Examples
1308    ///
1309    /// ```
1310    /// use tokio::sync::mpsc;
1311    ///
1312    /// # #[tokio::main(flavor = "current_thread")]
1313    /// # async fn main() {
1314    /// let (tx, mut rx) = mpsc::channel(1);
1315    ///
1316    /// // Reserve capacity
1317    /// let permit = tx.try_reserve().unwrap();
1318    ///
1319    /// // Trying to send directly on the `tx` will fail due to no
1320    /// // available capacity.
1321    /// assert!(tx.try_send(123).is_err());
1322    ///
1323    /// // Trying to reserve an additional slot on the `tx` will
1324    /// // fail because there is no capacity.
1325    /// assert!(tx.try_reserve().is_err());
1326    ///
1327    /// // Sending on the permit succeeds
1328    /// permit.send(456);
1329    ///
1330    /// // The value sent on the permit is received
1331    /// assert_eq!(rx.recv().await.unwrap(), 456);
1332    ///
1333    /// # }
1334    /// ```
1335    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1336        match self.chan.semaphore().semaphore.try_acquire(1) {
1337            Ok(()) => {}
1338            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1339            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1340        }
1341
1342        Ok(Permit { chan: &self.chan })
1343    }
1344
1345    /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1346    /// available.
1347    ///
1348    /// A [`PermitIterator`] is returned to track the reserved capacity.
1349    /// You can call this [`Iterator`] until it is exhausted to
1350    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1351    /// [`reserve_many`] except it does not await for the slots to become available.
1352    ///
1353    /// If there are fewer than `n` permits available on the channel, then
1354    /// this function will return a [`TrySendError::Full`]. If the channel is closed
1355    /// this function will return a [`TrySendError::Closed`].
1356    ///
1357    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1358    /// permits back to the channel.
1359    ///
1360    /// [`PermitIterator`]: PermitIterator
1361    /// [`send`]: Permit::send
1362    /// [`reserve_many`]: Sender::reserve_many
1363    ///
1364    /// # Examples
1365    ///
1366    /// ```
1367    /// use tokio::sync::mpsc;
1368    ///
1369    /// # #[tokio::main(flavor = "current_thread")]
1370    /// # async fn main() {
1371    /// let (tx, mut rx) = mpsc::channel(2);
1372    ///
1373    /// // Reserve capacity
1374    /// let mut permit = tx.try_reserve_many(2).unwrap();
1375    ///
1376    /// // Trying to send directly on the `tx` will fail due to no
1377    /// // available capacity.
1378    /// assert!(tx.try_send(123).is_err());
1379    ///
1380    /// // Trying to reserve an additional slot on the `tx` will
1381    /// // fail because there is no capacity.
1382    /// assert!(tx.try_reserve().is_err());
1383    ///
1384    /// // Sending with the permit iterator succeeds
1385    /// permit.next().unwrap().send(456);
1386    /// permit.next().unwrap().send(457);
1387    ///
1388    /// // The iterator should now be exhausted
1389    /// assert!(permit.next().is_none());
1390    ///
1391    /// // The value sent on the permit is received
1392    /// assert_eq!(rx.recv().await.unwrap(), 456);
1393    /// assert_eq!(rx.recv().await.unwrap(), 457);
1394    ///
1395    /// // Trying to call try_reserve_many with 0 will return an empty iterator
1396    /// let mut permit = tx.try_reserve_many(0).unwrap();
1397    /// assert!(permit.next().is_none());
1398    ///
1399    /// // Trying to call try_reserve_many with a number greater than the channel
1400    /// // capacity will return an error
1401    /// let permit = tx.try_reserve_many(3);
1402    /// assert!(permit.is_err());
1403    ///
1404    /// // Trying to call try_reserve_many on a closed channel will return an error
1405    /// drop(rx);
1406    /// let permit = tx.try_reserve_many(1);
1407    /// assert!(permit.is_err());
1408    ///
1409    /// let permit = tx.try_reserve_many(0);
1410    /// assert!(permit.is_err());
1411    /// # }
1412    /// ```
1413    pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1414        if n > self.max_capacity() {
1415            return Err(TrySendError::Full(()));
1416        }
1417
1418        match self.chan.semaphore().semaphore.try_acquire(n) {
1419            Ok(()) => {}
1420            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1421            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1422        }
1423
1424        Ok(PermitIterator {
1425            chan: &self.chan,
1426            n,
1427        })
1428    }
1429
1430    /// Tries to acquire a slot in the channel without waiting for the slot to become
1431    /// available, returning an owned permit.
1432    ///
1433    /// This moves the sender _by value_, and returns an owned permit that can
1434    /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1435    /// this method may be used in cases where the permit must be valid for the
1436    /// `'static` lifetime.  `Sender`s may be cloned cheaply (`Sender::clone` is
1437    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1438    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1439    /// moved, it can be cloned prior to calling `try_reserve_owned`.
1440    ///
1441    /// If the channel is full this function will return a [`TrySendError`].
1442    /// Since the sender is taken by value, the `TrySendError` returned in this
1443    /// case contains the sender, so that it may be used again. Otherwise, if
1444    /// there is a slot available, this method will return an [`OwnedPermit`]
1445    /// that can then be used to [`send`] on the channel with a guaranteed slot.
1446    /// This function is similar to  [`reserve_owned`] except it does not await
1447    /// for the slot to become available.
1448    ///
1449    /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1450    /// to the channel.
1451    ///
1452    /// [`OwnedPermit`]: OwnedPermit
1453    /// [`send`]: OwnedPermit::send
1454    /// [`reserve_owned`]: Sender::reserve_owned
1455    /// [`Arc::clone`]: std::sync::Arc::clone
1456    ///
1457    /// # Examples
1458    ///
1459    /// ```
1460    /// use tokio::sync::mpsc;
1461    ///
1462    /// # #[tokio::main(flavor = "current_thread")]
1463    /// # async fn main() {
1464    /// let (tx, mut rx) = mpsc::channel(1);
1465    ///
1466    /// // Reserve capacity
1467    /// let permit = tx.clone().try_reserve_owned().unwrap();
1468    ///
1469    /// // Trying to send directly on the `tx` will fail due to no
1470    /// // available capacity.
1471    /// assert!(tx.try_send(123).is_err());
1472    ///
1473    /// // Trying to reserve an additional slot on the `tx` will
1474    /// // fail because there is no capacity.
1475    /// assert!(tx.try_reserve().is_err());
1476    ///
1477    /// // Sending on the permit succeeds
1478    /// permit.send(456);
1479    ///
1480    /// // The value sent on the permit is received
1481    /// assert_eq!(rx.recv().await.unwrap(), 456);
1482    ///
1483    /// # }
1484    /// ```
1485    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1486        match self.chan.semaphore().semaphore.try_acquire(1) {
1487            Ok(()) => {}
1488            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1489            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1490        }
1491
1492        Ok(OwnedPermit {
1493            chan: Some(self.chan),
1494        })
1495    }
1496
1497    /// Returns `true` if senders belong to the same channel.
1498    ///
1499    /// # Examples
1500    ///
1501    /// ```
1502    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1503    /// let  tx2 = tx.clone();
1504    /// assert!(tx.same_channel(&tx2));
1505    ///
1506    /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1507    /// assert!(!tx3.same_channel(&tx2));
1508    /// ```
1509    pub fn same_channel(&self, other: &Self) -> bool {
1510        self.chan.same_channel(&other.chan)
1511    }
1512
1513    /// Returns the current number of reservations which can immediately be
1514    /// returned from the channel.
1515    ///
1516    /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1517    /// with [`reserve`].
1518    ///
1519    /// The capacity goes up when values are received, unless there are
1520    /// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`]
1521    /// which have returned [`Poll::Pending`]. While those calls exist, reading
1522    /// values from the [`Receiver`] transfers access to that capacity directly to
1523    /// those callers, in FIFO order.
1524    ///
1525    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1526    /// specified when calling [`channel`].
1527    ///
1528    /// # Examples
1529    ///
1530    /// ```
1531    /// use tokio::sync::mpsc;
1532    ///
1533    /// # #[tokio::main(flavor = "current_thread")]
1534    /// # async fn main() {
1535    /// let (tx, mut rx) = mpsc::channel::<()>(5);
1536    ///
1537    /// assert_eq!(tx.capacity(), 5);
1538    ///
1539    /// // Making a reservation drops the capacity by one.
1540    /// let permit = tx.reserve().await.unwrap();
1541    /// assert_eq!(tx.capacity(), 4);
1542    ///
1543    /// // Sending and receiving a value increases the capacity by one.
1544    /// permit.send(());
1545    /// rx.recv().await.unwrap();
1546    /// assert_eq!(tx.capacity(), 5);
1547    /// # }
1548    /// ```
1549    ///
1550    /// [`send`]: Sender::send
1551    /// [`reserve`]: Sender::reserve
1552    /// [`channel`]: channel
1553    /// [`max_capacity`]: Sender::max_capacity
1554    pub fn capacity(&self) -> usize {
1555        self.chan.semaphore().semaphore.available_permits()
1556    }
1557
1558    /// Converts the `Sender` to a [`WeakSender`] that does not count
1559    /// towards RAII semantics, i.e. if all `Sender` instances of the
1560    /// channel were dropped and only `WeakSender` instances remain,
1561    /// the channel is closed.
1562    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1563    pub fn downgrade(&self) -> WeakSender<T> {
1564        WeakSender {
1565            chan: self.chan.downgrade(),
1566        }
1567    }
1568
1569    /// Returns the maximum buffer capacity of the channel.
1570    ///
1571    /// The maximum capacity is the buffer capacity initially specified when calling
1572    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1573    /// available buffer capacity: as messages are sent and received, the
1574    /// value returned by [`capacity`] will go up or down, whereas the value
1575    /// returned by [`max_capacity`] will remain constant.
1576    ///
1577    /// # Examples
1578    ///
1579    /// ```
1580    /// use tokio::sync::mpsc;
1581    ///
1582    /// # #[tokio::main(flavor = "current_thread")]
1583    /// # async fn main() {
1584    /// let (tx, _rx) = mpsc::channel::<()>(5);
1585    ///
1586    /// // both max capacity and capacity are the same at first
1587    /// assert_eq!(tx.max_capacity(), 5);
1588    /// assert_eq!(tx.capacity(), 5);
1589    ///
1590    /// // Making a reservation doesn't change the max capacity.
1591    /// let permit = tx.reserve().await.unwrap();
1592    /// assert_eq!(tx.max_capacity(), 5);
1593    /// // but drops the capacity by one
1594    /// assert_eq!(tx.capacity(), 4);
1595    /// # }
1596    /// ```
1597    ///
1598    /// [`channel`]: channel
1599    /// [`max_capacity`]: Sender::max_capacity
1600    /// [`capacity`]: Sender::capacity
1601    pub fn max_capacity(&self) -> usize {
1602        self.chan.semaphore().bound
1603    }
1604
1605    /// Returns the number of [`Sender`] handles.
1606    pub fn strong_count(&self) -> usize {
1607        self.chan.strong_count()
1608    }
1609
1610    /// Returns the number of [`WeakSender`] handles.
1611    pub fn weak_count(&self) -> usize {
1612        self.chan.weak_count()
1613    }
1614}
1615
1616impl<T> Clone for Sender<T> {
1617    fn clone(&self) -> Self {
1618        Sender {
1619            chan: self.chan.clone(),
1620        }
1621    }
1622}
1623
1624impl<T> fmt::Debug for Sender<T> {
1625    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1626        fmt.debug_struct("Sender")
1627            .field("chan", &self.chan)
1628            .finish()
1629    }
1630}
1631
1632impl<T> Clone for WeakSender<T> {
1633    fn clone(&self) -> Self {
1634        self.chan.increment_weak_count();
1635
1636        WeakSender {
1637            chan: self.chan.clone(),
1638        }
1639    }
1640}
1641
1642impl<T> Drop for WeakSender<T> {
1643    fn drop(&mut self) {
1644        self.chan.decrement_weak_count();
1645    }
1646}
1647
1648impl<T> WeakSender<T> {
1649    /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1650    /// if there are other `Sender` instances alive and the channel wasn't
1651    /// previously dropped, otherwise `None` is returned.
1652    pub fn upgrade(&self) -> Option<Sender<T>> {
1653        chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1654    }
1655
1656    /// Returns the number of [`Sender`] handles.
1657    pub fn strong_count(&self) -> usize {
1658        self.chan.strong_count()
1659    }
1660
1661    /// Returns the number of [`WeakSender`] handles.
1662    pub fn weak_count(&self) -> usize {
1663        self.chan.weak_count()
1664    }
1665}
1666
1667impl<T> fmt::Debug for WeakSender<T> {
1668    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1669        fmt.debug_struct("WeakSender").finish()
1670    }
1671}
1672
1673// ===== impl Permit =====
1674
1675impl<T> Permit<'_, T> {
1676    /// Sends a value using the reserved capacity.
1677    ///
1678    /// Capacity for the message has already been reserved. The message is sent
1679    /// to the receiver and the permit is consumed. The operation will succeed
1680    /// even if the receiver half has been closed. See [`Receiver::close`] for
1681    /// more details on performing a clean shutdown.
1682    ///
1683    /// [`Receiver::close`]: Receiver::close
1684    ///
1685    /// # Examples
1686    ///
1687    /// ```
1688    /// use tokio::sync::mpsc;
1689    ///
1690    /// # #[tokio::main(flavor = "current_thread")]
1691    /// # async fn main() {
1692    /// let (tx, mut rx) = mpsc::channel(1);
1693    ///
1694    /// // Reserve capacity
1695    /// let permit = tx.reserve().await.unwrap();
1696    ///
1697    /// // Trying to send directly on the `tx` will fail due to no
1698    /// // available capacity.
1699    /// assert!(tx.try_send(123).is_err());
1700    ///
1701    /// // Send a message on the permit
1702    /// permit.send(456);
1703    ///
1704    /// // The value sent on the permit is received
1705    /// assert_eq!(rx.recv().await.unwrap(), 456);
1706    /// # }
1707    /// ```
1708    pub fn send(self, value: T) {
1709        use std::mem;
1710
1711        self.chan.send(value);
1712
1713        // Avoid the drop logic
1714        mem::forget(self);
1715    }
1716}
1717
1718impl<T> Drop for Permit<'_, T> {
1719    fn drop(&mut self) {
1720        use chan::Semaphore;
1721
1722        let semaphore = self.chan.semaphore();
1723
1724        // Add the permit back to the semaphore
1725        semaphore.add_permit();
1726
1727        // If this is the last sender for this channel, wake the receiver so
1728        // that it can be notified that the channel is closed.
1729        if semaphore.is_closed() && semaphore.is_idle() {
1730            self.chan.wake_rx();
1731        }
1732    }
1733}
1734
1735impl<T> fmt::Debug for Permit<'_, T> {
1736    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1737        fmt.debug_struct("Permit")
1738            .field("chan", &self.chan)
1739            .finish()
1740    }
1741}
1742
1743// ===== impl PermitIterator =====
1744
1745impl<'a, T> Iterator for PermitIterator<'a, T> {
1746    type Item = Permit<'a, T>;
1747
1748    fn next(&mut self) -> Option<Self::Item> {
1749        if self.n == 0 {
1750            return None;
1751        }
1752
1753        self.n -= 1;
1754        Some(Permit { chan: self.chan })
1755    }
1756
1757    fn size_hint(&self) -> (usize, Option<usize>) {
1758        let n = self.n;
1759        (n, Some(n))
1760    }
1761}
1762impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1763impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1764
1765impl<T> Drop for PermitIterator<'_, T> {
1766    fn drop(&mut self) {
1767        use chan::Semaphore;
1768
1769        if self.n == 0 {
1770            return;
1771        }
1772
1773        let semaphore = self.chan.semaphore();
1774
1775        // Add the remaining permits back to the semaphore
1776        semaphore.add_permits(self.n);
1777
1778        // If this is the last sender for this channel, wake the receiver so
1779        // that it can be notified that the channel is closed.
1780        if semaphore.is_closed() && semaphore.is_idle() {
1781            self.chan.wake_rx();
1782        }
1783    }
1784}
1785
1786impl<T> fmt::Debug for PermitIterator<'_, T> {
1787    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1788        fmt.debug_struct("PermitIterator")
1789            .field("chan", &self.chan)
1790            .field("capacity", &self.n)
1791            .finish()
1792    }
1793}
1794
1795// ===== impl Permit =====
1796
1797impl<T> OwnedPermit<T> {
1798    /// Sends a value using the reserved capacity.
1799    ///
1800    /// Capacity for the message has already been reserved. The message is sent
1801    /// to the receiver and the permit is consumed. The operation will succeed
1802    /// even if the receiver half has been closed. See [`Receiver::close`] for
1803    /// more details on performing a clean shutdown.
1804    ///
1805    /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1806    /// the `OwnedPermit` was reserved.
1807    ///
1808    /// [`Receiver::close`]: Receiver::close
1809    ///
1810    /// # Examples
1811    ///
1812    /// ```
1813    /// use tokio::sync::mpsc;
1814    ///
1815    /// # #[tokio::main(flavor = "current_thread")]
1816    /// # async fn main() {
1817    /// let (tx, mut rx) = mpsc::channel(1);
1818    ///
1819    /// // Reserve capacity
1820    /// let permit = tx.reserve_owned().await.unwrap();
1821    ///
1822    /// // Send a message on the permit, returning the sender.
1823    /// let tx = permit.send(456);
1824    ///
1825    /// // The value sent on the permit is received
1826    /// assert_eq!(rx.recv().await.unwrap(), 456);
1827    ///
1828    /// // We may now reuse `tx` to send another message.
1829    /// tx.send(789).await.unwrap();
1830    /// # }
1831    /// ```
1832    pub fn send(mut self, value: T) -> Sender<T> {
1833        let chan = self.chan.take().unwrap_or_else(|| {
1834            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1835        });
1836        chan.send(value);
1837
1838        Sender { chan }
1839    }
1840
1841    /// Releases the reserved capacity *without* sending a message, returning the
1842    /// [`Sender`].
1843    ///
1844    /// # Examples
1845    ///
1846    /// ```
1847    /// use tokio::sync::mpsc;
1848    ///
1849    /// # #[tokio::main(flavor = "current_thread")]
1850    /// # async fn main() {
1851    /// let (tx, rx) = mpsc::channel(1);
1852    ///
1853    /// // Clone the sender and reserve capacity
1854    /// let permit = tx.clone().reserve_owned().await.unwrap();
1855    ///
1856    /// // Trying to send on the original `tx` will fail, since the `permit`
1857    /// // has reserved all the available capacity.
1858    /// assert!(tx.try_send(123).is_err());
1859    ///
1860    /// // Release the permit without sending a message, returning the clone
1861    /// // of the sender.
1862    /// let tx2 = permit.release();
1863    ///
1864    /// // We may now reuse `tx` to send another message.
1865    /// tx.send(789).await.unwrap();
1866    /// # drop(rx); drop(tx2);
1867    /// # }
1868    /// ```
1869    ///
1870    /// [`Sender`]: Sender
1871    pub fn release(mut self) -> Sender<T> {
1872        use chan::Semaphore;
1873
1874        let chan = self.chan.take().unwrap_or_else(|| {
1875            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1876        });
1877
1878        // Add the permit back to the semaphore
1879        chan.semaphore().add_permit();
1880        Sender { chan }
1881    }
1882
1883    /// Returns `true` if permits belong to the same channel.
1884    ///
1885    /// # Examples
1886    ///
1887    /// ```
1888    /// use tokio::sync::mpsc;
1889    ///
1890    /// # #[tokio::main(flavor = "current_thread")]
1891    /// # async fn main() {
1892    /// let (tx, rx) = mpsc::channel::<()>(2);
1893    ///
1894    /// let permit1 = tx.clone().reserve_owned().await.unwrap();
1895    /// let permit2 = tx.clone().reserve_owned().await.unwrap();
1896    /// assert!(permit1.same_channel(&permit2));
1897    ///
1898    /// let (tx2, rx2) = mpsc::channel::<()>(1);
1899    ///
1900    /// let permit3 = tx2.clone().reserve_owned().await.unwrap();
1901    /// assert!(!permit3.same_channel(&permit2));
1902    /// # }
1903    /// ```
1904    pub fn same_channel(&self, other: &Self) -> bool {
1905        self.chan
1906            .as_ref()
1907            .zip(other.chan.as_ref())
1908            .is_some_and(|(a, b)| a.same_channel(b))
1909    }
1910
1911    /// Returns `true` if this permit belongs to the same channel as the given [`Sender`].
1912    ///
1913    /// # Examples
1914    ///
1915    /// ```
1916    /// use tokio::sync::mpsc;
1917    ///
1918    /// # #[tokio::main(flavor = "current_thread")]
1919    /// # async fn main() {
1920    /// let (tx, rx) = mpsc::channel::<()>(1);
1921    ///
1922    /// let permit = tx.clone().reserve_owned().await.unwrap();
1923    /// assert!(permit.same_channel_as_sender(&tx));
1924    ///
1925    /// let (tx2, rx2) = mpsc::channel::<()>(1);
1926    /// assert!(!permit.same_channel_as_sender(&tx2));
1927    /// # }
1928    /// ```
1929    pub fn same_channel_as_sender(&self, sender: &Sender<T>) -> bool {
1930        self.chan
1931            .as_ref()
1932            .is_some_and(|chan| chan.same_channel(&sender.chan))
1933    }
1934}
1935
1936impl<T> Drop for OwnedPermit<T> {
1937    fn drop(&mut self) {
1938        use chan::Semaphore;
1939
1940        // Are we still holding onto the sender?
1941        if let Some(chan) = self.chan.take() {
1942            let semaphore = chan.semaphore();
1943
1944            // Add the permit back to the semaphore
1945            semaphore.add_permit();
1946
1947            // If this `OwnedPermit` is holding the last sender for this
1948            // channel, wake the receiver so that it can be notified that the
1949            // channel is closed.
1950            if semaphore.is_closed() && semaphore.is_idle() {
1951                chan.wake_rx();
1952            }
1953        }
1954
1955        // Otherwise, do nothing.
1956    }
1957}
1958
1959impl<T> fmt::Debug for OwnedPermit<T> {
1960    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1961        fmt.debug_struct("OwnedPermit")
1962            .field("chan", &self.chan)
1963            .finish()
1964    }
1965}