1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use async_recursion::async_recursion;
11use async_trait::async_trait;
12use bytes::{Bytes, BytesMut};
13use futures::sink::Buffer;
14use futures::{Future, Sink, SinkExt, Stream, ready, stream};
15use pin_project::pin_project;
16use serde::{Deserialize, Serialize};
17use tokio::io;
18use tokio::net::{TcpListener, TcpStream};
19#[cfg(unix)]
20use tokio::net::{UnixListener, UnixStream};
21use tokio::task::JoinHandle;
22use tokio_stream::StreamExt;
23use tokio_stream::wrappers::TcpListenerStream;
24use tokio_util::codec::{Framed, LengthDelimitedCodec};
25
26pub type InitConfig = (HashMap<String, ServerBindConfig>, Option<String>);
27
28pub struct DeployPorts<T = Option<()>> {
31 pub ports: RefCell<HashMap<String, Connection>>,
32 pub meta: T,
33}
34
35impl<T> DeployPorts<T> {
36 pub fn port(&self, name: &str) -> Connection {
37 self.ports
38 .try_borrow_mut()
39 .unwrap()
40 .remove(name)
41 .unwrap_or_else(|| panic!("port {} not found", name))
42 }
43}
44
45#[cfg(not(unix))]
46type UnixStream = std::convert::Infallible;
47
48#[cfg(not(unix))]
49#[expect(dead_code, reason = "conditional compilation placeholder")]
50type UnixListener = std::convert::Infallible;
51
52#[derive(Serialize, Deserialize, Clone, Debug)]
54pub enum ServerPort {
55 UnixSocket(PathBuf),
56 TcpPort(SocketAddr),
57 Demux(HashMap<u32, ServerPort>),
58 Merge(Vec<ServerPort>),
59 Tagged(Box<ServerPort>, u32),
60 Null,
61}
62
63impl ServerPort {
64 pub fn connect(&self) -> ClientConnection {
65 match self {
66 ServerPort::UnixSocket(path) => {
67 #[cfg(unix)]
68 {
69 let bound = UnixStream::connect(path.clone());
70 ClientConnection::UnixSocket(tokio::spawn(bound))
71 }
72
73 #[cfg(not(unix))]
74 {
75 let _ = path;
76 panic!("Unix sockets are not supported on this platform")
77 }
78 }
79 ServerPort::TcpPort(addr) => {
80 let addr_clone = *addr;
81 let bound = async_retry(
82 move || TcpStream::connect(addr_clone),
83 10,
84 Duration::from_secs(1),
85 );
86 ClientConnection::TcpPort(tokio::spawn(bound))
87 }
88 ServerPort::Demux(bindings) => {
89 ClientConnection::Demux(bindings.iter().map(|(k, v)| (*k, v.connect())).collect())
90 }
91 ServerPort::Merge(ports) => {
92 ClientConnection::Merge(ports.iter().map(|p| p.connect()).collect())
93 }
94 ServerPort::Tagged(port, tag) => {
95 ClientConnection::Tagged(Box::new(port.as_ref().connect()), *tag)
96 }
97 ServerPort::Null => ClientConnection::Null,
98 }
99 }
100
101 pub fn instantiate(&self) -> Connection {
102 Connection::AsClient(self.connect())
103 }
104}
105
106#[derive(Debug)]
107pub enum ClientConnection {
108 UnixSocket(JoinHandle<io::Result<UnixStream>>),
109 TcpPort(JoinHandle<io::Result<TcpStream>>),
110 Demux(HashMap<u32, ClientConnection>),
111 Merge(Vec<ClientConnection>),
112 Tagged(Box<ClientConnection>, u32),
113 Null,
114}
115
116#[derive(Serialize, Deserialize, Clone, Debug)]
117pub enum ServerBindConfig {
118 UnixSocket,
119 TcpPort(
120 String,
122 ),
123 Demux(HashMap<u32, ServerBindConfig>),
124 Merge(Vec<ServerBindConfig>),
125 Tagged(Box<ServerBindConfig>, u32),
126 Null,
127}
128
129impl ServerBindConfig {
130 #[async_recursion]
131 pub async fn bind(self) -> BoundServer {
132 match self {
133 ServerBindConfig::UnixSocket => {
134 #[cfg(unix)]
135 {
136 let dir = tempfile::tempdir().unwrap();
137 let socket_path = dir.path().join("socket");
138 let bound = UnixListener::bind(socket_path).unwrap();
139 BoundServer::UnixSocket(
140 tokio::spawn(async move { Ok(bound.accept().await?.0) }),
141 dir,
142 )
143 }
144
145 #[cfg(not(unix))]
146 {
147 panic!("Unix sockets are not supported on this platform")
148 }
149 }
150 ServerBindConfig::TcpPort(host) => {
151 let listener = TcpListener::bind((host, 0)).await.unwrap();
152 let addr = listener.local_addr().unwrap();
153 BoundServer::TcpPort(TcpListenerStream::new(listener), addr)
154 }
155 ServerBindConfig::Demux(bindings) => {
156 let mut demux = HashMap::new();
157 for (key, bind) in bindings {
158 demux.insert(key, bind.bind().await);
159 }
160 BoundServer::Demux(demux)
161 }
162 ServerBindConfig::Merge(bindings) => {
163 let mut merge = Vec::new();
164 for bind in bindings {
165 merge.push(bind.bind().await);
166 }
167 BoundServer::Merge(merge)
168 }
169 ServerBindConfig::Tagged(underlying, id) => {
170 BoundServer::Tagged(Box::new(underlying.bind().await), id)
171 }
172 ServerBindConfig::Null => BoundServer::Null,
173 }
174 }
175}
176
177#[derive(Debug)]
178pub enum Connection {
179 AsClient(ClientConnection),
180 AsServer(BoundServer),
181}
182
183impl Connection {
184 pub async fn connect<T: Connected>(self) -> T {
185 T::from_defn(self).await
186 }
187
188 pub fn connect_local_blocking<T: Connected>(self) -> T {
189 let handle = tokio::runtime::Handle::current();
190 let _guard = handle.enter();
191 futures::executor::block_on(T::from_defn(self))
192 }
193
194 pub async fn accept_tcp(&mut self) -> TcpStream {
195 if let Connection::AsServer(BoundServer::TcpPort(handle, _)) = self {
196 handle.next().await.unwrap().unwrap()
197 } else {
198 panic!("Not a TCP port")
199 }
200 }
201}
202
203pub type DynStream = Pin<Box<dyn Stream<Item = Result<BytesMut, io::Error>> + Send + Sync>>;
204
205pub type DynSink<Input> = Pin<Box<dyn Sink<Input, Error = io::Error> + Send + Sync>>;
206
207pub trait StreamSink:
208 Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>
209{
210}
211impl<T: Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>> StreamSink
212 for T
213{
214}
215
216pub type DynStreamSink = Pin<Box<dyn StreamSink + Send + Sync>>;
217
218#[async_trait]
219pub trait Connected: Send {
220 async fn from_defn(pipe: Connection) -> Self;
221}
222
223pub trait ConnectedSink {
224 type Input: Send;
225 type Sink: Sink<Self::Input, Error = io::Error> + Send + Sync;
226
227 fn into_sink(self) -> Self::Sink;
228}
229
230pub trait ConnectedSource {
231 type Output: Send;
232 type Stream: Stream<Item = Result<Self::Output, io::Error>> + Send + Sync;
233 fn into_source(self) -> Self::Stream;
234}
235
236#[derive(Debug)]
237pub enum BoundServer {
238 UnixSocket(JoinHandle<io::Result<UnixStream>>, tempfile::TempDir),
239 TcpPort(TcpListenerStream, SocketAddr),
240 Demux(HashMap<u32, BoundServer>),
241 Merge(Vec<BoundServer>),
242 Tagged(Box<BoundServer>, u32),
243 Null,
244}
245
246impl BoundServer {
247 pub fn server_port(&self) -> ServerPort {
248 match self {
249 BoundServer::UnixSocket(_, tempdir) => {
250 #[cfg(unix)]
251 {
252 ServerPort::UnixSocket(tempdir.path().join("socket"))
253 }
254
255 #[cfg(not(unix))]
256 {
257 let _ = tempdir;
258 panic!("Unix sockets are not supported on this platform")
259 }
260 }
261 BoundServer::TcpPort(_, addr) => {
262 ServerPort::TcpPort(SocketAddr::new(addr.ip(), addr.port()))
263 }
264
265 BoundServer::Demux(bindings) => {
266 let mut demux = HashMap::new();
267 for (key, bind) in bindings {
268 demux.insert(*key, bind.server_port());
269 }
270 ServerPort::Demux(demux)
271 }
272
273 BoundServer::Merge(bindings) => {
274 let mut merge = Vec::new();
275 for bind in bindings {
276 merge.push(bind.server_port());
277 }
278 ServerPort::Merge(merge)
279 }
280
281 BoundServer::Tagged(underlying, id) => {
282 ServerPort::Tagged(Box::new(underlying.server_port()), *id)
283 }
284
285 BoundServer::Null => ServerPort::Null,
286 }
287 }
288}
289
290#[async_recursion]
291async fn accept(bound: BoundServer) -> ConnectedDirect {
292 match bound {
293 BoundServer::UnixSocket(listener, _) => {
294 #[cfg(unix)]
295 {
296 let stream = listener.await.unwrap().unwrap();
297 ConnectedDirect {
298 stream_sink: Some(Box::pin(unix_bytes(stream))),
299 source_only: None,
300 sink_only: None,
301 }
302 }
303
304 #[cfg(not(unix))]
305 {
306 drop(listener);
307 panic!("Unix sockets are not supported on this platform")
308 }
309 }
310 BoundServer::TcpPort(mut listener, _) => {
311 let stream = listener.next().await.unwrap().unwrap();
312 ConnectedDirect {
313 stream_sink: Some(Box::pin(tcp_bytes(stream))),
314 source_only: None,
315 sink_only: None,
316 }
317 }
318 BoundServer::Merge(merge) => {
319 let mut sources = vec![];
320 for bound in merge {
321 sources.push(accept(bound).await.into_source());
322 }
323
324 let merge_source: DynStream = Box::pin(MergeSource {
325 marker: PhantomData,
326 sources,
327 });
328
329 ConnectedDirect {
330 stream_sink: None,
331 source_only: Some(merge_source),
332 sink_only: None,
333 }
334 }
335 BoundServer::Demux(_) => panic!("Cannot connect to a demux pipe directly"),
336 BoundServer::Tagged(_, _) => panic!("Cannot connect to a tagged pipe directly"),
337 BoundServer::Null => {
338 ConnectedDirect::from_defn(Connection::AsClient(ClientConnection::Null)).await
339 }
340 }
341}
342
343fn tcp_bytes(stream: TcpStream) -> impl StreamSink {
344 Framed::new(stream, LengthDelimitedCodec::new())
345}
346
347#[cfg(unix)]
348fn unix_bytes(stream: UnixStream) -> impl StreamSink {
349 Framed::new(stream, LengthDelimitedCodec::new())
350}
351
352struct IoErrorDrain<T> {
353 marker: PhantomData<T>,
354}
355
356impl<T> Sink<T> for IoErrorDrain<T> {
357 type Error = io::Error;
358
359 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
360 Poll::Ready(Ok(()))
361 }
362
363 fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> {
364 Ok(())
365 }
366
367 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
368 Poll::Ready(Ok(()))
369 }
370
371 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372 Poll::Ready(Ok(()))
373 }
374}
375
376async fn async_retry<T, E, F: Future<Output = Result<T, E>>>(
377 thunk: impl Fn() -> F,
378 count: usize,
379 delay: Duration,
380) -> Result<T, E> {
381 for _ in 1..count {
382 let result = thunk().await;
383 if result.is_ok() {
384 return result;
385 } else {
386 tokio::time::sleep(delay).await;
387 }
388 }
389
390 thunk().await
391}
392
393pub struct ConnectedDirect {
394 stream_sink: Option<DynStreamSink>,
395 source_only: Option<DynStream>,
396 sink_only: Option<DynSink<Bytes>>,
397}
398
399#[async_trait]
400impl Connected for ConnectedDirect {
401 async fn from_defn(pipe: Connection) -> Self {
402 match pipe {
403 Connection::AsClient(ClientConnection::UnixSocket(stream)) => {
404 #[cfg(unix)]
405 {
406 let stream = stream.await.unwrap().unwrap();
407 ConnectedDirect {
408 stream_sink: Some(Box::pin(unix_bytes(stream))),
409 source_only: None,
410 sink_only: None,
411 }
412 }
413
414 #[cfg(not(unix))]
415 {
416 drop(stream);
417 panic!("Unix sockets are not supported on this platform");
418 }
419 }
420 Connection::AsClient(ClientConnection::TcpPort(stream)) => {
421 let stream = stream.await.unwrap().unwrap();
422 stream.set_nodelay(true).unwrap();
423 ConnectedDirect {
424 stream_sink: Some(Box::pin(tcp_bytes(stream))),
425 source_only: None,
426 sink_only: None,
427 }
428 }
429 Connection::AsClient(ClientConnection::Merge(merge)) => {
430 let sources = futures::future::join_all(merge.into_iter().map(|port| async {
431 ConnectedDirect::from_defn(Connection::AsClient(port))
432 .await
433 .into_source()
434 }))
435 .await;
436
437 let merged = MergeSource {
438 marker: PhantomData,
439 sources,
440 };
441
442 ConnectedDirect {
443 stream_sink: None,
444 source_only: Some(Box::pin(merged)),
445 sink_only: None,
446 }
447 }
448 Connection::AsClient(ClientConnection::Demux(_)) => {
449 panic!("Cannot connect to a demux pipe directly")
450 }
451
452 Connection::AsClient(ClientConnection::Tagged(_, _)) => {
453 panic!("Cannot connect to a tagged pipe directly")
454 }
455
456 Connection::AsClient(ClientConnection::Null) => ConnectedDirect {
457 stream_sink: None,
458 source_only: Some(Box::pin(stream::empty())),
459 sink_only: Some(Box::pin(IoErrorDrain {
460 marker: PhantomData,
461 })),
462 },
463
464 Connection::AsServer(bound) => accept(bound).await,
465 }
466 }
467}
468
469impl ConnectedSource for ConnectedDirect {
470 type Output = BytesMut;
471 type Stream = DynStream;
472
473 fn into_source(mut self) -> DynStream {
474 if let Some(s) = self.stream_sink.take() {
475 Box::pin(s)
476 } else {
477 self.source_only.take().unwrap()
478 }
479 }
480}
481
482impl ConnectedSink for ConnectedDirect {
483 type Input = Bytes;
484 type Sink = DynSink<Bytes>;
485
486 fn into_sink(mut self) -> DynSink<Self::Input> {
487 if let Some(s) = self.stream_sink.take() {
488 Box::pin(s)
489 } else {
490 self.sink_only.take().unwrap()
491 }
492 }
493}
494
495pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;
496
497pub struct ConnectedDemux<T: ConnectedSink>
498where
499 <T as ConnectedSink>::Input: Sync,
500{
501 pub keys: Vec<u32>,
502 sink: Option<BufferedDrain<T::Sink, T::Input>>,
503}
504
505#[pin_project]
506pub struct DemuxDrain<T, S: Sink<T, Error = io::Error> + Send + Sync + ?Sized> {
507 marker: PhantomData<T>,
508 #[pin]
509 sinks: HashMap<u32, Pin<Box<S>>>,
510}
511
512impl<T, S: Sink<T, Error = io::Error> + Send + Sync> Sink<(u32, T)> for DemuxDrain<T, S> {
513 type Error = io::Error;
514
515 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
516 for sink in self.project().sinks.values_mut() {
517 ready!(Sink::poll_ready(sink.as_mut(), _cx))?;
518 }
519
520 Poll::Ready(Ok(()))
521 }
522
523 fn start_send(self: Pin<&mut Self>, item: (u32, T)) -> Result<(), Self::Error> {
524 Sink::start_send(
525 self.project()
526 .sinks
527 .get_mut()
528 .get_mut(&item.0)
529 .unwrap_or_else(|| panic!("No sink in this demux for key {}", item.0))
530 .as_mut(),
531 item.1,
532 )
533 }
534
535 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
536 for sink in self.project().sinks.values_mut() {
537 ready!(Sink::poll_flush(sink.as_mut(), _cx))?;
538 }
539
540 Poll::Ready(Ok(()))
541 }
542
543 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
544 for sink in self.project().sinks.values_mut() {
545 ready!(Sink::poll_close(sink.as_mut(), _cx))?;
546 }
547
548 Poll::Ready(Ok(()))
549 }
550}
551
552#[async_trait]
553impl<T: Connected + ConnectedSink> Connected for ConnectedDemux<T>
554where
555 <T as ConnectedSink>::Input: 'static + Sync,
556{
557 async fn from_defn(pipe: Connection) -> Self {
558 match pipe {
559 Connection::AsClient(ClientConnection::Demux(demux)) => {
560 let mut connected_demux = HashMap::new();
561 let keys = demux.keys().cloned().collect();
562 for (id, pipe) in demux {
563 connected_demux.insert(
564 id,
565 Box::pin(
566 T::from_defn(Connection::AsClient(pipe))
567 .await
568 .into_sink()
569 .buffer(1024),
570 ),
571 );
572 }
573
574 let demuxer = DemuxDrain {
575 marker: PhantomData,
576 sinks: connected_demux,
577 };
578
579 ConnectedDemux {
580 keys,
581 sink: Some(demuxer),
582 }
583 }
584
585 Connection::AsServer(BoundServer::Demux(demux)) => {
586 let mut connected_demux = HashMap::new();
587 let keys = demux.keys().cloned().collect();
588 for (id, bound) in demux {
589 connected_demux.insert(
590 id,
591 Box::pin(
592 T::from_defn(Connection::AsServer(bound))
593 .await
594 .into_sink()
595 .buffer(1024),
596 ),
597 );
598 }
599
600 let demuxer = DemuxDrain {
601 marker: PhantomData,
602 sinks: connected_demux,
603 };
604
605 ConnectedDemux {
606 keys,
607 sink: Some(demuxer),
608 }
609 }
610 _ => panic!("Cannot connect to a non-demux pipe as a demux"),
611 }
612 }
613}
614
615impl<T: ConnectedSink> ConnectedSink for ConnectedDemux<T>
616where
617 <T as ConnectedSink>::Input: 'static + Sync,
618{
619 type Input = (u32, T::Input);
620 type Sink = BufferedDrain<T::Sink, T::Input>;
621
622 fn into_sink(mut self) -> Self::Sink {
623 self.sink.take().unwrap()
624 }
625}
626
627pub struct MergeSource<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> {
628 marker: PhantomData<T>,
629 sources: Vec<Pin<Box<S>>>,
630}
631
632impl<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> Stream for MergeSource<T, S> {
633 type Item = T;
634
635 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
636 let sources = &mut self.get_mut().sources;
637 let mut next = None;
638
639 let mut i = 0;
640 while i < sources.len() {
641 match sources[i].as_mut().poll_next(cx) {
642 Poll::Ready(Some(v)) => {
643 next = Some(v);
644 break;
645 }
646 Poll::Ready(None) => {
647 sources.remove(i);
649 }
650 Poll::Pending => {
651 i += 1;
652 }
653 }
654 }
655
656 if sources.is_empty() {
657 Poll::Ready(None)
658 } else if next.is_none() {
659 Poll::Pending
660 } else {
661 Poll::Ready(next)
662 }
663 }
664}
665
666pub struct TaggedSource<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> {
667 marker: PhantomData<T>,
668 id: u32,
669 source: Pin<Box<S>>,
670}
671
672impl<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> Stream
673 for TaggedSource<T, S>
674{
675 type Item = Result<(u32, T), io::Error>;
676
677 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
678 let id = self.as_ref().id;
679 let source = &mut self.get_mut().source;
680 match source.as_mut().poll_next(cx) {
681 Poll::Ready(Some(v)) => Poll::Ready(Some(v.map(|d| (id, d)))),
682 Poll::Ready(None) => Poll::Ready(None),
683 Poll::Pending => Poll::Pending,
684 }
685 }
686}
687
688type MergedMux<T> = MergeSource<
689 Result<(u32, <T as ConnectedSource>::Output), io::Error>,
690 TaggedSource<<T as ConnectedSource>::Output, <T as ConnectedSource>::Stream>,
691>;
692
693pub struct ConnectedTagged<T: ConnectedSource>
694where
695 <T as ConnectedSource>::Output: 'static + Sync + Unpin,
696{
697 source: MergedMux<T>,
698}
699
700#[async_trait]
701impl<T: Connected + ConnectedSource> Connected for ConnectedTagged<T>
702where
703 <T as ConnectedSource>::Output: 'static + Sync + Unpin,
704{
705 async fn from_defn(pipe: Connection) -> Self {
706 let sources = match pipe {
707 Connection::AsClient(ClientConnection::Tagged(pipe, id)) => {
708 vec![(
709 Box::pin(
710 T::from_defn(Connection::AsClient(*pipe))
711 .await
712 .into_source(),
713 ),
714 id,
715 )]
716 }
717
718 Connection::AsClient(ClientConnection::Merge(m)) => {
719 let mut sources = Vec::new();
720 for port in m {
721 if let ClientConnection::Tagged(pipe, id) = port {
722 sources.push((
723 Box::pin(
724 T::from_defn(Connection::AsClient(*pipe))
725 .await
726 .into_source(),
727 ),
728 id,
729 ));
730 } else {
731 panic!("Merge port must be tagged");
732 }
733 }
734
735 sources
736 }
737
738 Connection::AsServer(BoundServer::Tagged(pipe, id)) => {
739 vec![(
740 Box::pin(
741 T::from_defn(Connection::AsServer(*pipe))
742 .await
743 .into_source(),
744 ),
745 id,
746 )]
747 }
748
749 Connection::AsServer(BoundServer::Merge(m)) => {
750 let mut sources = Vec::new();
751 for port in m {
752 if let BoundServer::Tagged(pipe, id) = port {
753 sources.push((
754 Box::pin(
755 T::from_defn(Connection::AsServer(*pipe))
756 .await
757 .into_source(),
758 ),
759 id,
760 ));
761 } else {
762 panic!("Merge port must be tagged");
763 }
764 }
765
766 sources
767 }
768
769 _ => panic!("Cannot connect to a non-tagged pipe as a tagged"),
770 };
771
772 let mut connected_mux = Vec::new();
773 for (pipe, id) in sources {
774 connected_mux.push(Box::pin(TaggedSource {
775 marker: PhantomData,
776 id,
777 source: pipe,
778 }));
779 }
780
781 let muxer = MergeSource {
782 marker: PhantomData,
783 sources: connected_mux,
784 };
785
786 ConnectedTagged { source: muxer }
787 }
788}
789
790impl<T: ConnectedSource> ConnectedSource for ConnectedTagged<T>
791where
792 <T as ConnectedSource>::Output: 'static + Sync + Unpin,
793{
794 type Output = (u32, T::Output);
795 type Stream = MergeSource<Result<Self::Output, io::Error>, TaggedSource<T::Output, T::Stream>>;
796
797 fn into_source(self) -> Self::Stream {
798 self.source
799 }
800}