hydro_deploy_integration/
lib.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use async_recursion::async_recursion;
11use async_trait::async_trait;
12use bytes::{Bytes, BytesMut};
13use futures::sink::Buffer;
14use futures::{Future, Sink, SinkExt, Stream, ready, stream};
15use pin_project::pin_project;
16use serde::{Deserialize, Serialize};
17use tokio::io;
18use tokio::net::{TcpListener, TcpStream};
19#[cfg(unix)]
20use tokio::net::{UnixListener, UnixStream};
21use tokio::task::JoinHandle;
22use tokio_stream::StreamExt;
23use tokio_stream::wrappers::TcpListenerStream;
24use tokio_util::codec::{Framed, LengthDelimitedCodec};
25
26pub type InitConfig = (HashMap<String, ServerBindConfig>, Option<String>);
27
28/// Contains runtime information passed by Hydro Deploy to a program,
29/// describing how to connect to other services and metadata about them.
30pub struct DeployPorts<T = Option<()>> {
31    pub ports: RefCell<HashMap<String, Connection>>,
32    pub meta: T,
33}
34
35impl<T> DeployPorts<T> {
36    pub fn port(&self, name: &str) -> Connection {
37        self.ports
38            .try_borrow_mut()
39            .unwrap()
40            .remove(name)
41            .unwrap_or_else(|| panic!("port {} not found", name))
42    }
43}
44
45#[cfg(not(unix))]
46type UnixStream = std::convert::Infallible;
47
48#[cfg(not(unix))]
49#[expect(dead_code, reason = "conditional compilation placeholder")]
50type UnixListener = std::convert::Infallible;
51
52/// Describes how to connect to a service which is listening on some port.
53#[derive(Serialize, Deserialize, Clone, Debug)]
54pub enum ServerPort {
55    UnixSocket(PathBuf),
56    TcpPort(SocketAddr),
57    Demux(HashMap<u32, ServerPort>),
58    Merge(Vec<ServerPort>),
59    Tagged(Box<ServerPort>, u32),
60    Null,
61}
62
63impl ServerPort {
64    pub fn connect(&self) -> ClientConnection {
65        match self {
66            ServerPort::UnixSocket(path) => {
67                #[cfg(unix)]
68                {
69                    let bound = UnixStream::connect(path.clone());
70                    ClientConnection::UnixSocket(tokio::spawn(bound))
71                }
72
73                #[cfg(not(unix))]
74                {
75                    let _ = path;
76                    panic!("Unix sockets are not supported on this platform")
77                }
78            }
79            ServerPort::TcpPort(addr) => {
80                let addr_clone = *addr;
81                let bound = async_retry(
82                    move || TcpStream::connect(addr_clone),
83                    10,
84                    Duration::from_secs(1),
85                );
86                ClientConnection::TcpPort(tokio::spawn(bound))
87            }
88            ServerPort::Demux(bindings) => {
89                ClientConnection::Demux(bindings.iter().map(|(k, v)| (*k, v.connect())).collect())
90            }
91            ServerPort::Merge(ports) => {
92                ClientConnection::Merge(ports.iter().map(|p| p.connect()).collect())
93            }
94            ServerPort::Tagged(port, tag) => {
95                ClientConnection::Tagged(Box::new(port.as_ref().connect()), *tag)
96            }
97            ServerPort::Null => ClientConnection::Null,
98        }
99    }
100
101    pub fn instantiate(&self) -> Connection {
102        Connection::AsClient(self.connect())
103    }
104}
105
106#[derive(Debug)]
107pub enum ClientConnection {
108    UnixSocket(JoinHandle<io::Result<UnixStream>>),
109    TcpPort(JoinHandle<io::Result<TcpStream>>),
110    Demux(HashMap<u32, ClientConnection>),
111    Merge(Vec<ClientConnection>),
112    Tagged(Box<ClientConnection>, u32),
113    Null,
114}
115
116#[derive(Serialize, Deserialize, Clone, Debug)]
117pub enum ServerBindConfig {
118    UnixSocket,
119    TcpPort(
120        /// The host the port should be bound on.
121        String,
122    ),
123    Demux(HashMap<u32, ServerBindConfig>),
124    Merge(Vec<ServerBindConfig>),
125    Tagged(Box<ServerBindConfig>, u32),
126    Null,
127}
128
129impl ServerBindConfig {
130    #[async_recursion]
131    pub async fn bind(self) -> BoundServer {
132        match self {
133            ServerBindConfig::UnixSocket => {
134                #[cfg(unix)]
135                {
136                    let dir = tempfile::tempdir().unwrap();
137                    let socket_path = dir.path().join("socket");
138                    let bound = UnixListener::bind(socket_path).unwrap();
139                    BoundServer::UnixSocket(
140                        tokio::spawn(async move { Ok(bound.accept().await?.0) }),
141                        dir,
142                    )
143                }
144
145                #[cfg(not(unix))]
146                {
147                    panic!("Unix sockets are not supported on this platform")
148                }
149            }
150            ServerBindConfig::TcpPort(host) => {
151                let listener = TcpListener::bind((host, 0)).await.unwrap();
152                let addr = listener.local_addr().unwrap();
153                BoundServer::TcpPort(TcpListenerStream::new(listener), addr)
154            }
155            ServerBindConfig::Demux(bindings) => {
156                let mut demux = HashMap::new();
157                for (key, bind) in bindings {
158                    demux.insert(key, bind.bind().await);
159                }
160                BoundServer::Demux(demux)
161            }
162            ServerBindConfig::Merge(bindings) => {
163                let mut merge = Vec::new();
164                for bind in bindings {
165                    merge.push(bind.bind().await);
166                }
167                BoundServer::Merge(merge)
168            }
169            ServerBindConfig::Tagged(underlying, id) => {
170                BoundServer::Tagged(Box::new(underlying.bind().await), id)
171            }
172            ServerBindConfig::Null => BoundServer::Null,
173        }
174    }
175}
176
177#[derive(Debug)]
178pub enum Connection {
179    AsClient(ClientConnection),
180    AsServer(BoundServer),
181}
182
183impl Connection {
184    pub async fn connect<T: Connected>(self) -> T {
185        T::from_defn(self).await
186    }
187
188    pub fn connect_local_blocking<T: Connected>(self) -> T {
189        let handle = tokio::runtime::Handle::current();
190        let _guard = handle.enter();
191        futures::executor::block_on(T::from_defn(self))
192    }
193
194    pub async fn accept_tcp(&mut self) -> TcpStream {
195        if let Connection::AsServer(BoundServer::TcpPort(handle, _)) = self {
196            handle.next().await.unwrap().unwrap()
197        } else {
198            panic!("Not a TCP port")
199        }
200    }
201}
202
203pub type DynStream = Pin<Box<dyn Stream<Item = Result<BytesMut, io::Error>> + Send + Sync>>;
204
205pub type DynSink<Input> = Pin<Box<dyn Sink<Input, Error = io::Error> + Send + Sync>>;
206
207pub trait StreamSink:
208    Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>
209{
210}
211impl<T: Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>> StreamSink
212    for T
213{
214}
215
216pub type DynStreamSink = Pin<Box<dyn StreamSink + Send + Sync>>;
217
218#[async_trait]
219pub trait Connected: Send {
220    async fn from_defn(pipe: Connection) -> Self;
221}
222
223pub trait ConnectedSink {
224    type Input: Send;
225    type Sink: Sink<Self::Input, Error = io::Error> + Send + Sync;
226
227    fn into_sink(self) -> Self::Sink;
228}
229
230pub trait ConnectedSource {
231    type Output: Send;
232    type Stream: Stream<Item = Result<Self::Output, io::Error>> + Send + Sync;
233    fn into_source(self) -> Self::Stream;
234}
235
236#[derive(Debug)]
237pub enum BoundServer {
238    UnixSocket(JoinHandle<io::Result<UnixStream>>, tempfile::TempDir),
239    TcpPort(TcpListenerStream, SocketAddr),
240    Demux(HashMap<u32, BoundServer>),
241    Merge(Vec<BoundServer>),
242    Tagged(Box<BoundServer>, u32),
243    Null,
244}
245
246impl BoundServer {
247    pub fn server_port(&self) -> ServerPort {
248        match self {
249            BoundServer::UnixSocket(_, tempdir) => {
250                #[cfg(unix)]
251                {
252                    ServerPort::UnixSocket(tempdir.path().join("socket"))
253                }
254
255                #[cfg(not(unix))]
256                {
257                    let _ = tempdir;
258                    panic!("Unix sockets are not supported on this platform")
259                }
260            }
261            BoundServer::TcpPort(_, addr) => {
262                ServerPort::TcpPort(SocketAddr::new(addr.ip(), addr.port()))
263            }
264
265            BoundServer::Demux(bindings) => {
266                let mut demux = HashMap::new();
267                for (key, bind) in bindings {
268                    demux.insert(*key, bind.server_port());
269                }
270                ServerPort::Demux(demux)
271            }
272
273            BoundServer::Merge(bindings) => {
274                let mut merge = Vec::new();
275                for bind in bindings {
276                    merge.push(bind.server_port());
277                }
278                ServerPort::Merge(merge)
279            }
280
281            BoundServer::Tagged(underlying, id) => {
282                ServerPort::Tagged(Box::new(underlying.server_port()), *id)
283            }
284
285            BoundServer::Null => ServerPort::Null,
286        }
287    }
288}
289
290#[async_recursion]
291async fn accept(bound: BoundServer) -> ConnectedDirect {
292    match bound {
293        BoundServer::UnixSocket(listener, _) => {
294            #[cfg(unix)]
295            {
296                let stream = listener.await.unwrap().unwrap();
297                ConnectedDirect {
298                    stream_sink: Some(Box::pin(unix_bytes(stream))),
299                    source_only: None,
300                    sink_only: None,
301                }
302            }
303
304            #[cfg(not(unix))]
305            {
306                drop(listener);
307                panic!("Unix sockets are not supported on this platform")
308            }
309        }
310        BoundServer::TcpPort(mut listener, _) => {
311            let stream = listener.next().await.unwrap().unwrap();
312            ConnectedDirect {
313                stream_sink: Some(Box::pin(tcp_bytes(stream))),
314                source_only: None,
315                sink_only: None,
316            }
317        }
318        BoundServer::Merge(merge) => {
319            let mut sources = vec![];
320            for bound in merge {
321                sources.push(accept(bound).await.into_source());
322            }
323
324            let merge_source: DynStream = Box::pin(MergeSource {
325                marker: PhantomData,
326                sources,
327            });
328
329            ConnectedDirect {
330                stream_sink: None,
331                source_only: Some(merge_source),
332                sink_only: None,
333            }
334        }
335        BoundServer::Demux(_) => panic!("Cannot connect to a demux pipe directly"),
336        BoundServer::Tagged(_, _) => panic!("Cannot connect to a tagged pipe directly"),
337        BoundServer::Null => {
338            ConnectedDirect::from_defn(Connection::AsClient(ClientConnection::Null)).await
339        }
340    }
341}
342
343fn tcp_bytes(stream: TcpStream) -> impl StreamSink {
344    Framed::new(stream, LengthDelimitedCodec::new())
345}
346
347#[cfg(unix)]
348fn unix_bytes(stream: UnixStream) -> impl StreamSink {
349    Framed::new(stream, LengthDelimitedCodec::new())
350}
351
352struct IoErrorDrain<T> {
353    marker: PhantomData<T>,
354}
355
356impl<T> Sink<T> for IoErrorDrain<T> {
357    type Error = io::Error;
358
359    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
360        Poll::Ready(Ok(()))
361    }
362
363    fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> {
364        Ok(())
365    }
366
367    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
368        Poll::Ready(Ok(()))
369    }
370
371    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372        Poll::Ready(Ok(()))
373    }
374}
375
376async fn async_retry<T, E, F: Future<Output = Result<T, E>>>(
377    thunk: impl Fn() -> F,
378    count: usize,
379    delay: Duration,
380) -> Result<T, E> {
381    for _ in 1..count {
382        let result = thunk().await;
383        if result.is_ok() {
384            return result;
385        } else {
386            tokio::time::sleep(delay).await;
387        }
388    }
389
390    thunk().await
391}
392
393pub struct ConnectedDirect {
394    stream_sink: Option<DynStreamSink>,
395    source_only: Option<DynStream>,
396    sink_only: Option<DynSink<Bytes>>,
397}
398
399#[async_trait]
400impl Connected for ConnectedDirect {
401    async fn from_defn(pipe: Connection) -> Self {
402        match pipe {
403            Connection::AsClient(ClientConnection::UnixSocket(stream)) => {
404                #[cfg(unix)]
405                {
406                    let stream = stream.await.unwrap().unwrap();
407                    ConnectedDirect {
408                        stream_sink: Some(Box::pin(unix_bytes(stream))),
409                        source_only: None,
410                        sink_only: None,
411                    }
412                }
413
414                #[cfg(not(unix))]
415                {
416                    drop(stream);
417                    panic!("Unix sockets are not supported on this platform");
418                }
419            }
420            Connection::AsClient(ClientConnection::TcpPort(stream)) => {
421                let stream = stream.await.unwrap().unwrap();
422                stream.set_nodelay(true).unwrap();
423                ConnectedDirect {
424                    stream_sink: Some(Box::pin(tcp_bytes(stream))),
425                    source_only: None,
426                    sink_only: None,
427                }
428            }
429            Connection::AsClient(ClientConnection::Merge(merge)) => {
430                let sources = futures::future::join_all(merge.into_iter().map(|port| async {
431                    ConnectedDirect::from_defn(Connection::AsClient(port))
432                        .await
433                        .into_source()
434                }))
435                .await;
436
437                let merged = MergeSource {
438                    marker: PhantomData,
439                    sources,
440                };
441
442                ConnectedDirect {
443                    stream_sink: None,
444                    source_only: Some(Box::pin(merged)),
445                    sink_only: None,
446                }
447            }
448            Connection::AsClient(ClientConnection::Demux(_)) => {
449                panic!("Cannot connect to a demux pipe directly")
450            }
451
452            Connection::AsClient(ClientConnection::Tagged(_, _)) => {
453                panic!("Cannot connect to a tagged pipe directly")
454            }
455
456            Connection::AsClient(ClientConnection::Null) => ConnectedDirect {
457                stream_sink: None,
458                source_only: Some(Box::pin(stream::empty())),
459                sink_only: Some(Box::pin(IoErrorDrain {
460                    marker: PhantomData,
461                })),
462            },
463
464            Connection::AsServer(bound) => accept(bound).await,
465        }
466    }
467}
468
469impl ConnectedSource for ConnectedDirect {
470    type Output = BytesMut;
471    type Stream = DynStream;
472
473    fn into_source(mut self) -> DynStream {
474        if let Some(s) = self.stream_sink.take() {
475            Box::pin(s)
476        } else {
477            self.source_only.take().unwrap()
478        }
479    }
480}
481
482impl ConnectedSink for ConnectedDirect {
483    type Input = Bytes;
484    type Sink = DynSink<Bytes>;
485
486    fn into_sink(mut self) -> DynSink<Self::Input> {
487        if let Some(s) = self.stream_sink.take() {
488            Box::pin(s)
489        } else {
490            self.sink_only.take().unwrap()
491        }
492    }
493}
494
495pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;
496
497pub struct ConnectedDemux<T: ConnectedSink>
498where
499    <T as ConnectedSink>::Input: Sync,
500{
501    pub keys: Vec<u32>,
502    sink: Option<BufferedDrain<T::Sink, T::Input>>,
503}
504
505#[pin_project]
506pub struct DemuxDrain<T, S: Sink<T, Error = io::Error> + Send + Sync + ?Sized> {
507    marker: PhantomData<T>,
508    #[pin]
509    sinks: HashMap<u32, Pin<Box<S>>>,
510}
511
512impl<T, S: Sink<T, Error = io::Error> + Send + Sync> Sink<(u32, T)> for DemuxDrain<T, S> {
513    type Error = io::Error;
514
515    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
516        for sink in self.project().sinks.values_mut() {
517            ready!(Sink::poll_ready(sink.as_mut(), _cx))?;
518        }
519
520        Poll::Ready(Ok(()))
521    }
522
523    fn start_send(self: Pin<&mut Self>, item: (u32, T)) -> Result<(), Self::Error> {
524        Sink::start_send(
525            self.project()
526                .sinks
527                .get_mut()
528                .get_mut(&item.0)
529                .unwrap_or_else(|| panic!("No sink in this demux for key {}", item.0))
530                .as_mut(),
531            item.1,
532        )
533    }
534
535    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
536        for sink in self.project().sinks.values_mut() {
537            ready!(Sink::poll_flush(sink.as_mut(), _cx))?;
538        }
539
540        Poll::Ready(Ok(()))
541    }
542
543    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
544        for sink in self.project().sinks.values_mut() {
545            ready!(Sink::poll_close(sink.as_mut(), _cx))?;
546        }
547
548        Poll::Ready(Ok(()))
549    }
550}
551
552#[async_trait]
553impl<T: Connected + ConnectedSink> Connected for ConnectedDemux<T>
554where
555    <T as ConnectedSink>::Input: 'static + Sync,
556{
557    async fn from_defn(pipe: Connection) -> Self {
558        match pipe {
559            Connection::AsClient(ClientConnection::Demux(demux)) => {
560                let mut connected_demux = HashMap::new();
561                let keys = demux.keys().cloned().collect();
562                for (id, pipe) in demux {
563                    connected_demux.insert(
564                        id,
565                        Box::pin(
566                            T::from_defn(Connection::AsClient(pipe))
567                                .await
568                                .into_sink()
569                                .buffer(1024),
570                        ),
571                    );
572                }
573
574                let demuxer = DemuxDrain {
575                    marker: PhantomData,
576                    sinks: connected_demux,
577                };
578
579                ConnectedDemux {
580                    keys,
581                    sink: Some(demuxer),
582                }
583            }
584
585            Connection::AsServer(BoundServer::Demux(demux)) => {
586                let mut connected_demux = HashMap::new();
587                let keys = demux.keys().cloned().collect();
588                for (id, bound) in demux {
589                    connected_demux.insert(
590                        id,
591                        Box::pin(
592                            T::from_defn(Connection::AsServer(bound))
593                                .await
594                                .into_sink()
595                                .buffer(1024),
596                        ),
597                    );
598                }
599
600                let demuxer = DemuxDrain {
601                    marker: PhantomData,
602                    sinks: connected_demux,
603                };
604
605                ConnectedDemux {
606                    keys,
607                    sink: Some(demuxer),
608                }
609            }
610            _ => panic!("Cannot connect to a non-demux pipe as a demux"),
611        }
612    }
613}
614
615impl<T: ConnectedSink> ConnectedSink for ConnectedDemux<T>
616where
617    <T as ConnectedSink>::Input: 'static + Sync,
618{
619    type Input = (u32, T::Input);
620    type Sink = BufferedDrain<T::Sink, T::Input>;
621
622    fn into_sink(mut self) -> Self::Sink {
623        self.sink.take().unwrap()
624    }
625}
626
627pub struct MergeSource<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> {
628    marker: PhantomData<T>,
629    sources: Vec<Pin<Box<S>>>,
630}
631
632impl<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> Stream for MergeSource<T, S> {
633    type Item = T;
634
635    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
636        let sources = &mut self.get_mut().sources;
637        let mut next = None;
638
639        let mut i = 0;
640        while i < sources.len() {
641            match sources[i].as_mut().poll_next(cx) {
642                Poll::Ready(Some(v)) => {
643                    next = Some(v);
644                    break;
645                }
646                Poll::Ready(None) => {
647                    // this happens infrequently, so OK to be O(n)
648                    sources.remove(i);
649                }
650                Poll::Pending => {
651                    i += 1;
652                }
653            }
654        }
655
656        if sources.is_empty() {
657            Poll::Ready(None)
658        } else if next.is_none() {
659            Poll::Pending
660        } else {
661            Poll::Ready(next)
662        }
663    }
664}
665
666pub struct TaggedSource<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> {
667    marker: PhantomData<T>,
668    id: u32,
669    source: Pin<Box<S>>,
670}
671
672impl<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> Stream
673    for TaggedSource<T, S>
674{
675    type Item = Result<(u32, T), io::Error>;
676
677    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
678        let id = self.as_ref().id;
679        let source = &mut self.get_mut().source;
680        match source.as_mut().poll_next(cx) {
681            Poll::Ready(Some(v)) => Poll::Ready(Some(v.map(|d| (id, d)))),
682            Poll::Ready(None) => Poll::Ready(None),
683            Poll::Pending => Poll::Pending,
684        }
685    }
686}
687
688type MergedMux<T> = MergeSource<
689    Result<(u32, <T as ConnectedSource>::Output), io::Error>,
690    TaggedSource<<T as ConnectedSource>::Output, <T as ConnectedSource>::Stream>,
691>;
692
693pub struct ConnectedTagged<T: ConnectedSource>
694where
695    <T as ConnectedSource>::Output: 'static + Sync + Unpin,
696{
697    source: MergedMux<T>,
698}
699
700#[async_trait]
701impl<T: Connected + ConnectedSource> Connected for ConnectedTagged<T>
702where
703    <T as ConnectedSource>::Output: 'static + Sync + Unpin,
704{
705    async fn from_defn(pipe: Connection) -> Self {
706        let sources = match pipe {
707            Connection::AsClient(ClientConnection::Tagged(pipe, id)) => {
708                vec![(
709                    Box::pin(
710                        T::from_defn(Connection::AsClient(*pipe))
711                            .await
712                            .into_source(),
713                    ),
714                    id,
715                )]
716            }
717
718            Connection::AsClient(ClientConnection::Merge(m)) => {
719                let mut sources = Vec::new();
720                for port in m {
721                    if let ClientConnection::Tagged(pipe, id) = port {
722                        sources.push((
723                            Box::pin(
724                                T::from_defn(Connection::AsClient(*pipe))
725                                    .await
726                                    .into_source(),
727                            ),
728                            id,
729                        ));
730                    } else {
731                        panic!("Merge port must be tagged");
732                    }
733                }
734
735                sources
736            }
737
738            Connection::AsServer(BoundServer::Tagged(pipe, id)) => {
739                vec![(
740                    Box::pin(
741                        T::from_defn(Connection::AsServer(*pipe))
742                            .await
743                            .into_source(),
744                    ),
745                    id,
746                )]
747            }
748
749            Connection::AsServer(BoundServer::Merge(m)) => {
750                let mut sources = Vec::new();
751                for port in m {
752                    if let BoundServer::Tagged(pipe, id) = port {
753                        sources.push((
754                            Box::pin(
755                                T::from_defn(Connection::AsServer(*pipe))
756                                    .await
757                                    .into_source(),
758                            ),
759                            id,
760                        ));
761                    } else {
762                        panic!("Merge port must be tagged");
763                    }
764                }
765
766                sources
767            }
768
769            _ => panic!("Cannot connect to a non-tagged pipe as a tagged"),
770        };
771
772        let mut connected_mux = Vec::new();
773        for (pipe, id) in sources {
774            connected_mux.push(Box::pin(TaggedSource {
775                marker: PhantomData,
776                id,
777                source: pipe,
778            }));
779        }
780
781        let muxer = MergeSource {
782            marker: PhantomData,
783            sources: connected_mux,
784        };
785
786        ConnectedTagged { source: muxer }
787    }
788}
789
790impl<T: ConnectedSource> ConnectedSource for ConnectedTagged<T>
791where
792    <T as ConnectedSource>::Output: 'static + Sync + Unpin,
793{
794    type Output = (u32, T::Output);
795    type Stream = MergeSource<Result<Self::Output, io::Error>, TaggedSource<T::Output, T::Stream>>;
796
797    fn into_source(self) -> Self::Stream {
798        self.source
799    }
800}