1use core::ops::{Deref, DerefMut};
2use std::io::{BufRead, IoSlice, Read, Result, Write};
3
4use crate::conn::{ConnectionCommon, SideData};
5
6#[derive(Debug)]
11pub struct Stream<'a, C: 'a + ?Sized, T: 'a + Read + Write + ?Sized> {
12 pub conn: &'a mut C,
14
15 pub sock: &'a mut T,
17}
18
19impl<'a, C, T, S> Stream<'a, C, T>
20where
21 C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
22 T: 'a + Read + Write,
23 S: SideData,
24{
25 pub fn new(conn: &'a mut C, sock: &'a mut T) -> Self {
28 Self { conn, sock }
29 }
30
31 fn complete_prior_io(&mut self) -> Result<()> {
34 if self.conn.is_handshaking() {
35 self.conn.complete_io(self.sock)?;
36 }
37
38 if self.conn.wants_write() {
39 self.conn.complete_io(self.sock)?;
40 }
41
42 Ok(())
43 }
44
45 fn prepare_read(&mut self) -> Result<()> {
46 self.complete_prior_io()?;
47
48 while self.conn.wants_read() {
53 if self.conn.complete_io(self.sock)?.0 == 0 {
54 break;
55 }
56 }
57
58 Ok(())
59 }
60
61 fn fill_buf(mut self) -> Result<&'a [u8]>
63 where
64 S: 'a,
65 {
66 self.prepare_read()?;
67 self.conn.reader().into_first_chunk()
68 }
69}
70
71impl<'a, C, T, S> Read for Stream<'a, C, T>
72where
73 C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
74 T: 'a + Read + Write,
75 S: SideData,
76{
77 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
78 self.prepare_read()?;
79 self.conn.reader().read(buf)
80 }
81}
82
83impl<'a, C, T, S> BufRead for Stream<'a, C, T>
84where
85 C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
86 T: 'a + Read + Write,
87 S: 'a + SideData,
88{
89 fn fill_buf(&mut self) -> Result<&[u8]> {
90 Stream {
92 conn: self.conn,
93 sock: self.sock,
94 }
95 .fill_buf()
96 }
97
98 fn consume(&mut self, amt: usize) {
99 self.conn.reader().consume(amt)
100 }
101}
102
103impl<'a, C, T, S> Write for Stream<'a, C, T>
104where
105 C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
106 T: 'a + Read + Write,
107 S: SideData,
108{
109 fn write(&mut self, buf: &[u8]) -> Result<usize> {
110 self.complete_prior_io()?;
111
112 let len = self.conn.writer().write(buf)?;
113
114 let _ = self.conn.complete_io(self.sock);
118
119 Ok(len)
120 }
121
122 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
123 self.complete_prior_io()?;
124
125 let len = self
126 .conn
127 .writer()
128 .write_vectored(bufs)?;
129
130 let _ = self.conn.complete_io(self.sock);
134
135 Ok(len)
136 }
137
138 fn flush(&mut self) -> Result<()> {
139 self.complete_prior_io()?;
140
141 self.conn.writer().flush()?;
142 if self.conn.wants_write() {
143 self.conn.complete_io(self.sock)?;
144 }
145 Ok(())
146 }
147}
148
149#[derive(Debug)]
155pub struct StreamOwned<C: Sized, T: Read + Write + Sized> {
156 pub conn: C,
158
159 pub sock: T,
161}
162
163impl<C, T, S> StreamOwned<C, T>
164where
165 C: DerefMut + Deref<Target = ConnectionCommon<S>>,
166 T: Read + Write,
167 S: SideData,
168{
169 pub fn new(conn: C, sock: T) -> Self {
175 Self { conn, sock }
176 }
177
178 pub fn get_ref(&self) -> &T {
180 &self.sock
181 }
182
183 pub fn get_mut(&mut self) -> &mut T {
185 &mut self.sock
186 }
187
188 pub fn into_parts(self) -> (C, T) {
190 (self.conn, self.sock)
191 }
192}
193
194impl<'a, C, T, S> StreamOwned<C, T>
195where
196 C: DerefMut + Deref<Target = ConnectionCommon<S>>,
197 T: Read + Write,
198 S: SideData,
199{
200 fn as_stream(&'a mut self) -> Stream<'a, C, T> {
201 Stream {
202 conn: &mut self.conn,
203 sock: &mut self.sock,
204 }
205 }
206}
207
208impl<C, T, S> Read for StreamOwned<C, T>
209where
210 C: DerefMut + Deref<Target = ConnectionCommon<S>>,
211 T: Read + Write,
212 S: SideData,
213{
214 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
215 self.as_stream().read(buf)
216 }
217}
218
219impl<C, T, S> BufRead for StreamOwned<C, T>
220where
221 C: DerefMut + Deref<Target = ConnectionCommon<S>>,
222 T: Read + Write,
223 S: 'static + SideData,
224{
225 fn fill_buf(&mut self) -> Result<&[u8]> {
226 self.as_stream().fill_buf()
227 }
228
229 fn consume(&mut self, amt: usize) {
230 self.as_stream().consume(amt)
231 }
232}
233
234impl<C, T, S> Write for StreamOwned<C, T>
235where
236 C: DerefMut + Deref<Target = ConnectionCommon<S>>,
237 T: Read + Write,
238 S: SideData,
239{
240 fn write(&mut self, buf: &[u8]) -> Result<usize> {
241 self.as_stream().write(buf)
242 }
243
244 fn flush(&mut self) -> Result<()> {
245 self.as_stream().flush()
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use std::net::TcpStream;
252
253 use super::{Stream, StreamOwned};
254 use crate::client::ClientConnection;
255 use crate::server::ServerConnection;
256
257 #[test]
258 fn stream_can_be_created_for_connection_and_tcpstream() {
259 type _Test<'a> = Stream<'a, ClientConnection, TcpStream>;
260 }
261
262 #[test]
263 fn streamowned_can_be_created_for_client_and_tcpstream() {
264 type _Test = StreamOwned<ClientConnection, TcpStream>;
265 }
266
267 #[test]
268 fn streamowned_can_be_created_for_server_and_tcpstream() {
269 type _Test = StreamOwned<ServerConnection, TcpStream>;
270 }
271}