tokio_util/codec/
framed_write.rs

1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15use super::FramedParts;
16
17pin_project! {
18    /// A [`Sink`] of frames encoded to an `AsyncWrite`.
19    ///
20    /// For examples of how to use `FramedWrite` with a codec, see the
21    /// examples on the [`codec`] module.
22    ///
23    /// # Cancellation safety
24    ///
25    /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a
26    /// `tokio::select!` statement and some other branch completes first, then it is
27    /// guaranteed that the message was not sent, but the message itself is lost.
28    ///
29    /// [`Sink`]: futures_sink::Sink
30    /// [`codec`]: crate::codec
31    /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send
32    pub struct FramedWrite<T, E> {
33        #[pin]
34        inner: FramedImpl<T, E, WriteFrame>,
35    }
36}
37
38impl<T, E> FramedWrite<T, E>
39where
40    T: AsyncWrite,
41{
42    /// Creates a new `FramedWrite` with the given `encoder`.
43    pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
44        FramedWrite {
45            inner: FramedImpl {
46                inner,
47                codec: encoder,
48                state: WriteFrame::default(),
49            },
50        }
51    }
52
53    /// Creates a new `FramedWrite` with the given `encoder` and a buffer of `capacity`
54    /// initial size.
55    pub fn with_capacity(inner: T, encoder: E, capacity: usize) -> FramedWrite<T, E> {
56        FramedWrite {
57            inner: FramedImpl {
58                inner,
59                codec: encoder,
60                state: WriteFrame {
61                    buffer: BytesMut::with_capacity(capacity),
62                    backpressure_boundary: capacity,
63                },
64            },
65        }
66    }
67}
68
69impl<T, E> FramedWrite<T, E> {
70    /// Returns a reference to the underlying I/O stream wrapped by
71    /// `FramedWrite`.
72    ///
73    /// Note that care should be taken to not tamper with the underlying stream
74    /// of data coming in as it may corrupt the stream of frames otherwise
75    /// being worked with.
76    pub fn get_ref(&self) -> &T {
77        &self.inner.inner
78    }
79
80    /// Returns a mutable reference to the underlying I/O stream wrapped by
81    /// `FramedWrite`.
82    ///
83    /// Note that care should be taken to not tamper with the underlying stream
84    /// of data coming in as it may corrupt the stream of frames otherwise
85    /// being worked with.
86    pub fn get_mut(&mut self) -> &mut T {
87        &mut self.inner.inner
88    }
89
90    /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
91    /// `FramedWrite`.
92    ///
93    /// Note that care should be taken to not tamper with the underlying stream
94    /// of data coming in as it may corrupt the stream of frames otherwise
95    /// being worked with.
96    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
97        self.project().inner.project().inner
98    }
99
100    /// Consumes the `FramedWrite`, returning its underlying I/O stream.
101    ///
102    /// Note that care should be taken to not tamper with the underlying stream
103    /// of data coming in as it may corrupt the stream of frames otherwise
104    /// being worked with.
105    pub fn into_inner(self) -> T {
106        self.inner.inner
107    }
108
109    /// Returns a reference to the underlying encoder.
110    pub fn encoder(&self) -> &E {
111        &self.inner.codec
112    }
113
114    /// Returns a mutable reference to the underlying encoder.
115    pub fn encoder_mut(&mut self) -> &mut E {
116        &mut self.inner.codec
117    }
118
119    /// Maps the encoder `E` to `C`, preserving the write buffer
120    /// wrapped by `Framed`.
121    pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
122    where
123        F: FnOnce(E) -> C,
124    {
125        // This could be potentially simplified once rust-lang/rust#86555 hits stable
126        let FramedImpl {
127            inner,
128            state,
129            codec,
130        } = self.inner;
131        FramedWrite {
132            inner: FramedImpl {
133                inner,
134                state,
135                codec: map(codec),
136            },
137        }
138    }
139
140    /// Returns a mutable reference to the underlying encoder.
141    pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
142        self.project().inner.project().codec
143    }
144
145    /// Returns a reference to the write buffer.
146    pub fn write_buffer(&self) -> &BytesMut {
147        &self.inner.state.buffer
148    }
149
150    /// Returns a mutable reference to the write buffer.
151    pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
152        &mut self.inner.state.buffer
153    }
154
155    /// Returns backpressure boundary
156    pub fn backpressure_boundary(&self) -> usize {
157        self.inner.state.backpressure_boundary
158    }
159
160    /// Updates backpressure boundary
161    pub fn set_backpressure_boundary(&mut self, boundary: usize) {
162        self.inner.state.backpressure_boundary = boundary;
163    }
164
165    /// Consumes the `FramedWrite`, returning its underlying I/O stream, the buffer
166    /// with unprocessed data, and the codec.
167    pub fn into_parts(self) -> FramedParts<T, E> {
168        FramedParts {
169            io: self.inner.inner,
170            codec: self.inner.codec,
171            read_buf: BytesMut::new(),
172            write_buf: self.inner.state.buffer,
173            _priv: (),
174        }
175    }
176}
177
178// This impl just defers to the underlying FramedImpl
179impl<T, I, E> Sink<I> for FramedWrite<T, E>
180where
181    T: AsyncWrite,
182    E: Encoder<I>,
183    E::Error: From<io::Error>,
184{
185    type Error = E::Error;
186
187    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
188        self.project().inner.poll_ready(cx)
189    }
190
191    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
192        self.project().inner.start_send(item)
193    }
194
195    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
196        self.project().inner.poll_flush(cx)
197    }
198
199    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200        self.project().inner.poll_close(cx)
201    }
202}
203
204// This impl just defers to the underlying T: Stream
205impl<T, D> Stream for FramedWrite<T, D>
206where
207    T: Stream,
208{
209    type Item = T::Item;
210
211    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212        self.project().inner.project().inner.poll_next(cx)
213    }
214}
215
216impl<T, U> fmt::Debug for FramedWrite<T, U>
217where
218    T: fmt::Debug,
219    U: fmt::Debug,
220{
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        f.debug_struct("FramedWrite")
223            .field("inner", &self.get_ref())
224            .field("encoder", &self.encoder())
225            .field("buffer", &self.inner.state.buffer)
226            .finish()
227    }
228}