tokio_util/codec/
framed_read.rs1use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2use crate::codec::Decoder;
3
4use futures_core::Stream;
5use tokio::io::AsyncRead;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use super::FramedParts;
15
16pin_project! {
17 pub struct FramedRead<T, D> {
32 #[pin]
33 inner: FramedImpl<T, D, ReadFrame>,
34 }
35}
36
37impl<T, D> FramedRead<T, D>
40where
41 T: AsyncRead,
42 D: Decoder,
43{
44 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
46 FramedRead {
47 inner: FramedImpl {
48 inner,
49 codec: decoder,
50 state: Default::default(),
51 },
52 }
53 }
54
55 pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
58 FramedRead {
59 inner: FramedImpl {
60 inner,
61 codec: decoder,
62 state: ReadFrame {
63 eof: false,
64 is_readable: false,
65 buffer: BytesMut::with_capacity(capacity),
66 has_errored: false,
67 },
68 },
69 }
70 }
71}
72
73impl<T, D> FramedRead<T, D> {
74 pub fn get_ref(&self) -> &T {
81 &self.inner.inner
82 }
83
84 pub fn get_mut(&mut self) -> &mut T {
91 &mut self.inner.inner
92 }
93
94 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
101 self.project().inner.project().inner
102 }
103
104 pub fn into_inner(self) -> T {
110 self.inner.inner
111 }
112
113 pub fn decoder(&self) -> &D {
115 &self.inner.codec
116 }
117
118 pub fn decoder_mut(&mut self) -> &mut D {
120 &mut self.inner.codec
121 }
122
123 pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
126 where
127 F: FnOnce(D) -> C,
128 {
129 let FramedImpl {
131 inner,
132 state,
133 codec,
134 } = self.inner;
135 FramedRead {
136 inner: FramedImpl {
137 inner,
138 state,
139 codec: map(codec),
140 },
141 }
142 }
143
144 pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
146 self.project().inner.project().codec
147 }
148
149 pub fn read_buffer(&self) -> &BytesMut {
151 &self.inner.state.buffer
152 }
153
154 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
156 &mut self.inner.state.buffer
157 }
158
159 pub fn into_parts(self) -> FramedParts<T, D> {
162 FramedParts {
163 io: self.inner.inner,
164 codec: self.inner.codec,
165 read_buf: self.inner.state.buffer,
166 write_buf: BytesMut::new(),
167 _priv: (),
168 }
169 }
170}
171
172impl<T, D> Stream for FramedRead<T, D>
174where
175 T: AsyncRead,
176 D: Decoder,
177{
178 type Item = Result<D::Item, D::Error>;
179
180 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 self.project().inner.poll_next(cx)
182 }
183}
184
185impl<T, I, D> Sink<I> for FramedRead<T, D>
187where
188 T: Sink<I>,
189{
190 type Error = T::Error;
191
192 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
193 self.project().inner.project().inner.poll_ready(cx)
194 }
195
196 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
197 self.project().inner.project().inner.start_send(item)
198 }
199
200 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
201 self.project().inner.project().inner.poll_flush(cx)
202 }
203
204 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
205 self.project().inner.project().inner.poll_close(cx)
206 }
207}
208
209impl<T, D> fmt::Debug for FramedRead<T, D>
210where
211 T: fmt::Debug,
212 D: fmt::Debug,
213{
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 f.debug_struct("FramedRead")
216 .field("inner", &self.get_ref())
217 .field("decoder", &self.decoder())
218 .field("eof", &self.inner.state.eof)
219 .field("is_readable", &self.inner.state.is_readable)
220 .field("buffer", &self.read_buffer())
221 .finish()
222 }
223}