tokio_util/codec/
framed_read.rs

1use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2use crate::codec::Decoder;
3
4use futures_core::Stream;
5use tokio::io::AsyncRead;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use super::FramedParts;
15
16pin_project! {
17    /// A [`Stream`] of messages decoded from an [`AsyncRead`].
18    ///
19    /// For examples of how to use `FramedRead` with a codec, see the
20    /// examples on the [`codec`] module.
21    ///
22    /// # Cancellation safety
23    /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
24    /// future only holds onto a reference to the underlying stream, so dropping it will
25    /// never lose a value.
26    ///
27    /// [`Stream`]: futures_core::Stream
28    /// [`AsyncRead`]: tokio::io::AsyncRead
29    /// [`codec`]: crate::codec
30    /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
31    pub struct FramedRead<T, D> {
32        #[pin]
33        inner: FramedImpl<T, D, ReadFrame>,
34    }
35}
36
37// ===== impl FramedRead =====
38
39impl<T, D> FramedRead<T, D>
40where
41    T: AsyncRead,
42    D: Decoder,
43{
44    /// Creates a new `FramedRead` with the given `decoder`.
45    pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
46        FramedRead {
47            inner: FramedImpl {
48                inner,
49                codec: decoder,
50                state: Default::default(),
51            },
52        }
53    }
54
55    /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity`
56    /// initial size.
57    pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
58        FramedRead {
59            inner: FramedImpl {
60                inner,
61                codec: decoder,
62                state: ReadFrame {
63                    eof: false,
64                    is_readable: false,
65                    buffer: BytesMut::with_capacity(capacity),
66                    has_errored: false,
67                },
68            },
69        }
70    }
71}
72
73impl<T, D> FramedRead<T, D> {
74    /// Returns a reference to the underlying I/O stream wrapped by
75    /// `FramedRead`.
76    ///
77    /// Note that care should be taken to not tamper with the underlying stream
78    /// of data coming in as it may corrupt the stream of frames otherwise
79    /// being worked with.
80    pub fn get_ref(&self) -> &T {
81        &self.inner.inner
82    }
83
84    /// Returns a mutable reference to the underlying I/O stream wrapped by
85    /// `FramedRead`.
86    ///
87    /// Note that care should be taken to not tamper with the underlying stream
88    /// of data coming in as it may corrupt the stream of frames otherwise
89    /// being worked with.
90    pub fn get_mut(&mut self) -> &mut T {
91        &mut self.inner.inner
92    }
93
94    /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
95    /// `FramedRead`.
96    ///
97    /// Note that care should be taken to not tamper with the underlying stream
98    /// of data coming in as it may corrupt the stream of frames otherwise
99    /// being worked with.
100    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
101        self.project().inner.project().inner
102    }
103
104    /// Consumes the `FramedRead`, returning its underlying I/O stream.
105    ///
106    /// Note that care should be taken to not tamper with the underlying stream
107    /// of data coming in as it may corrupt the stream of frames otherwise
108    /// being worked with.
109    pub fn into_inner(self) -> T {
110        self.inner.inner
111    }
112
113    /// Returns a reference to the underlying decoder.
114    pub fn decoder(&self) -> &D {
115        &self.inner.codec
116    }
117
118    /// Returns a mutable reference to the underlying decoder.
119    pub fn decoder_mut(&mut self) -> &mut D {
120        &mut self.inner.codec
121    }
122
123    /// Maps the decoder `D` to `C`, preserving the read buffer
124    /// wrapped by `Framed`.
125    pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
126    where
127        F: FnOnce(D) -> C,
128    {
129        // This could be potentially simplified once rust-lang/rust#86555 hits stable
130        let FramedImpl {
131            inner,
132            state,
133            codec,
134        } = self.inner;
135        FramedRead {
136            inner: FramedImpl {
137                inner,
138                state,
139                codec: map(codec),
140            },
141        }
142    }
143
144    /// Returns a mutable reference to the underlying decoder.
145    pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
146        self.project().inner.project().codec
147    }
148
149    /// Returns a reference to the read buffer.
150    pub fn read_buffer(&self) -> &BytesMut {
151        &self.inner.state.buffer
152    }
153
154    /// Returns a mutable reference to the read buffer.
155    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
156        &mut self.inner.state.buffer
157    }
158
159    /// Consumes the `FramedRead`, returning its underlying I/O stream, the buffer
160    /// with unprocessed data, and the codec.
161    pub fn into_parts(self) -> FramedParts<T, D> {
162        FramedParts {
163            io: self.inner.inner,
164            codec: self.inner.codec,
165            read_buf: self.inner.state.buffer,
166            write_buf: BytesMut::new(),
167            _priv: (),
168        }
169    }
170}
171
172// This impl just defers to the underlying FramedImpl
173impl<T, D> Stream for FramedRead<T, D>
174where
175    T: AsyncRead,
176    D: Decoder,
177{
178    type Item = Result<D::Item, D::Error>;
179
180    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181        self.project().inner.poll_next(cx)
182    }
183}
184
185// This impl just defers to the underlying T: Sink
186impl<T, I, D> Sink<I> for FramedRead<T, D>
187where
188    T: Sink<I>,
189{
190    type Error = T::Error;
191
192    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
193        self.project().inner.project().inner.poll_ready(cx)
194    }
195
196    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
197        self.project().inner.project().inner.start_send(item)
198    }
199
200    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
201        self.project().inner.project().inner.poll_flush(cx)
202    }
203
204    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
205        self.project().inner.project().inner.poll_close(cx)
206    }
207}
208
209impl<T, D> fmt::Debug for FramedRead<T, D>
210where
211    T: fmt::Debug,
212    D: fmt::Debug,
213{
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        f.debug_struct("FramedRead")
216            .field("inner", &self.get_ref())
217            .field("decoder", &self.decoder())
218            .field("eof", &self.inner.state.eof)
219            .field("is_readable", &self.inner.state.is_readable)
220            .field("buffer", &self.read_buffer())
221            .finish()
222    }
223}