hydro_lang/location/
mod.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use bytes::{Bytes, BytesMut};
6use futures::stream::Stream as FuturesStream;
7use proc_macro2::Span;
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use stageleft::{QuotedWithContext, q, quote_type};
11use syn::parse_quote;
12use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
13
14use super::builder::FlowState;
15use crate::backtrace::get_backtrace;
16use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
17use crate::ir::{DebugInstantiate, HydroIrMetadata, HydroLeaf, HydroNode, HydroSource};
18use crate::keyed_stream::KeyedStream;
19use crate::location::cluster::ClusterIds;
20use crate::location::external_process::{
21    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many,
22};
23use crate::staging_util::get_this_crate;
24use crate::stream::ExactlyOnce;
25use crate::unsafety::NonDet;
26use crate::{NoOrder, Singleton, Stream, TotalOrder, Unbounded, nondet};
27
28pub mod external_process;
29pub use external_process::External;
30
31pub mod process;
32pub use process::Process;
33
34pub mod cluster;
35pub use cluster::Cluster;
36
37pub mod member_id;
38pub use member_id::MemberId;
39
40pub mod tick;
41pub use tick::{Atomic, NoTick, Tick};
42
43#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
44pub enum LocationId {
45    Process(usize),
46    Cluster(usize),
47    Tick(usize, Box<LocationId>),
48}
49
50#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
51pub enum MembershipEvent {
52    Joined,
53    Left,
54}
55
56impl LocationId {
57    pub fn root(&self) -> &LocationId {
58        match self {
59            LocationId::Process(_) => self,
60            LocationId::Cluster(_) => self,
61            LocationId::Tick(_, id) => id.root(),
62        }
63    }
64
65    pub fn is_root(&self) -> bool {
66        match self {
67            LocationId::Process(_) | LocationId::Cluster(_) => true,
68            LocationId::Tick(_, _) => false,
69        }
70    }
71
72    pub fn raw_id(&self) -> usize {
73        match self {
74            LocationId::Process(id) => *id,
75            LocationId::Cluster(id) => *id,
76            LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
77        }
78    }
79
80    pub fn swap_root(&mut self, new_root: LocationId) {
81        match self {
82            LocationId::Tick(_, id) => {
83                id.swap_root(new_root);
84            }
85            _ => {
86                assert!(new_root.is_root());
87                *self = new_root;
88            }
89        }
90    }
91}
92
93pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
94    assert_eq!(l1.id(), l2.id(), "locations do not match");
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
98pub enum NetworkHint {
99    Auto,
100    TcpPort(Option<u16>),
101}
102
103pub trait Location<'a>: Clone {
104    type Root: Location<'a>;
105
106    fn root(&self) -> Self::Root;
107
108    fn id(&self) -> LocationId;
109
110    fn flow_state(&self) -> &FlowState;
111
112    fn is_top_level() -> bool;
113
114    fn tick(&self) -> Tick<Self>
115    where
116        Self: NoTick,
117    {
118        let next_id = self.flow_state().borrow_mut().next_clock_id;
119        self.flow_state().borrow_mut().next_clock_id += 1;
120        Tick {
121            id: next_id,
122            l: self.clone(),
123        }
124    }
125
126    fn next_node_id(&self) -> usize {
127        let next_id = self.flow_state().borrow_mut().next_node_id;
128        self.flow_state().borrow_mut().next_node_id += 1;
129        next_id
130    }
131
132    #[inline(never)]
133    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
134        HydroIrMetadata {
135            location_kind: self.id(),
136            backtrace: get_backtrace(2),
137            output_type: Some(quote_type::<T>().into()),
138            cardinality: None,
139            cpu_usage: None,
140            network_recv_cpu_usage: None,
141            id: None,
142            tag: None,
143        }
144    }
145
146    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
147    where
148        Self: Sized + NoTick,
149    {
150        Stream::new(
151            self.clone(),
152            HydroNode::Persist {
153                inner: Box::new(HydroNode::Source {
154                    source: HydroSource::Spin(),
155                    metadata: self.new_node_metadata::<()>(),
156                }),
157                metadata: self.new_node_metadata::<()>(),
158            },
159        )
160    }
161
162    fn source_stream<T, E>(
163        &self,
164        e: impl QuotedWithContext<'a, E, Self>,
165    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
166    where
167        E: FuturesStream<Item = T> + Unpin,
168        Self: Sized + NoTick,
169    {
170        let e = e.splice_untyped_ctx(self);
171
172        Stream::new(
173            self.clone(),
174            HydroNode::Persist {
175                inner: Box::new(HydroNode::Source {
176                    source: HydroSource::Stream(e.into()),
177                    metadata: self.new_node_metadata::<T>(),
178                }),
179                metadata: self.new_node_metadata::<T>(),
180            },
181        )
182    }
183
184    fn source_iter<T, E>(
185        &self,
186        e: impl QuotedWithContext<'a, E, Self>,
187    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
188    where
189        E: IntoIterator<Item = T>,
190        Self: Sized + NoTick,
191    {
192        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
193        // for bounded top-level streams, and this is the only way to generate one
194        let e = e.splice_untyped_ctx(self);
195
196        Stream::new(
197            self.clone(),
198            HydroNode::Persist {
199                inner: Box::new(HydroNode::Source {
200                    source: HydroSource::Iter(e.into()),
201                    metadata: self.new_node_metadata::<T>(),
202                }),
203                metadata: self.new_node_metadata::<T>(),
204            },
205        )
206    }
207
208    fn source_cluster_members<C: 'a>(
209        &self,
210        cluster: &Cluster<'a, C>,
211    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
212    where
213        Self: Sized + NoTick,
214    {
215        let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
216            id: cluster.id,
217            _phantom: PhantomData,
218        };
219
220        self.source_iter(q!(underlying_memberids))
221            .map(q!(|id| (*id, MembershipEvent::Joined)))
222            .into_keyed()
223    }
224
225    fn source_external_bytes<L>(
226        &self,
227        from: &External<L>,
228    ) -> (
229        ExternalBytesPort,
230        Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
231    )
232    where
233        Self: Sized + NoTick,
234    {
235        let next_external_port_id = {
236            let mut flow_state = from.flow_state.borrow_mut();
237            let id = flow_state.next_external_out;
238            flow_state.next_external_out += 1;
239            id
240        };
241
242        (
243            ExternalBytesPort {
244                process_id: from.id,
245                port_id: next_external_port_id,
246                _phantom: Default::default(),
247            },
248            Stream::new(
249                self.clone(),
250                HydroNode::Persist {
251                    inner: Box::new(HydroNode::ExternalInput {
252                        from_external_id: from.id,
253                        from_key: next_external_port_id,
254                        from_many: false,
255                        codec_type: quote_type::<LengthDelimitedCodec>().into(),
256                        port_hint: NetworkHint::Auto,
257                        instantiate_fn: DebugInstantiate::Building,
258                        deserialize_fn: None,
259                        metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
260                    }),
261                    metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
262                },
263            ),
264        )
265    }
266
267    fn source_external_bincode<L, T>(
268        &self,
269        from: &External<L>,
270    ) -> (
271        ExternalBincodeSink<T>,
272        Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>,
273    )
274    where
275        Self: Sized + NoTick,
276        T: Serialize + DeserializeOwned,
277    {
278        let next_external_port_id = {
279            let mut flow_state = from.flow_state.borrow_mut();
280            let id = flow_state.next_external_out;
281            flow_state.next_external_out += 1;
282            id
283        };
284
285        (
286            ExternalBincodeSink {
287                process_id: from.id,
288                port_id: next_external_port_id,
289                _phantom: PhantomData,
290            },
291            Stream::new(
292                self.clone(),
293                HydroNode::Persist {
294                    inner: Box::new(HydroNode::ExternalInput {
295                        from_external_id: from.id,
296                        from_key: next_external_port_id,
297                        from_many: false,
298                        codec_type: quote_type::<LengthDelimitedCodec>().into(),
299                        port_hint: NetworkHint::Auto,
300                        instantiate_fn: DebugInstantiate::Building,
301                        deserialize_fn: Some(
302                            crate::stream::networking::deserialize_bincode::<T>(None).into(),
303                        ),
304                        metadata: self.new_node_metadata::<T>(),
305                    }),
306                    metadata: self.new_node_metadata::<T>(),
307                },
308            ),
309        )
310    }
311
312    #[expect(clippy::type_complexity, reason = "stream markers")]
313    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
314        &self,
315        from: &External<L>,
316        port_hint: NetworkHint,
317    ) -> (
318        ExternalBytesPort<Many>,
319        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
320        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
321        ForwardRef<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
322    )
323    where
324        Self: Sized + NoTick,
325    {
326        let next_external_port_id = {
327            let mut flow_state = from.flow_state.borrow_mut();
328            let id = flow_state.next_external_out;
329            flow_state.next_external_out += 1;
330            id
331        };
332
333        let (fwd_ref, to_sink) =
334            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
335        let mut flow_state_borrow = self.flow_state().borrow_mut();
336
337        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
338
339        leaves.push(HydroLeaf::SendExternal {
340            to_external_id: from.id,
341            to_key: next_external_port_id,
342            to_many: true,
343            serialize_fn: None,
344            instantiate_fn: DebugInstantiate::Building,
345            input: Box::new(HydroNode::Unpersist {
346                inner: Box::new(to_sink.entries().ir_node.into_inner()),
347                metadata: self.new_node_metadata::<(u64, T)>(),
348            }),
349        });
350
351        let raw_stream: Stream<
352            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
353            Self,
354            Unbounded,
355            NoOrder,
356            ExactlyOnce,
357        > = Stream::new(
358            self.clone(),
359            HydroNode::Persist {
360                inner: Box::new(HydroNode::ExternalInput {
361                    from_external_id: from.id,
362                    from_key: next_external_port_id,
363                    from_many: true,
364                    codec_type: quote_type::<Codec>().into(),
365                    port_hint,
366                    instantiate_fn: DebugInstantiate::Building,
367                    deserialize_fn: None,
368                    metadata: self
369                        .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
370                }),
371                metadata: self
372                    .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
373            },
374        );
375
376        let membership_stream_ident = syn::Ident::new(
377            &format!(
378                "__hydro_deploy_many_{}_{}_membership",
379                from.id, next_external_port_id
380            ),
381            Span::call_site(),
382        );
383        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
384        let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, TotalOrder, ExactlyOnce> =
385            Stream::new(
386                self.clone(),
387                HydroNode::Persist {
388                    inner: Box::new(HydroNode::Source {
389                        source: HydroSource::Stream(membership_stream_expr.into()),
390                        metadata: self.new_node_metadata::<(u64, bool)>(),
391                    }),
392                    metadata: self.new_node_metadata::<(u64, bool)>(),
393                },
394            );
395
396        (
397            ExternalBytesPort {
398                process_id: from.id,
399                port_id: next_external_port_id,
400                _phantom: PhantomData,
401            },
402            raw_stream
403                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
404                .into_keyed()
405                .assume_ordering::<TotalOrder>(
406                    nondet!(/** order of messages is deterministic within each key due to TCP */)
407                ),
408            raw_membership_stream
409                .into_keyed()
410                .assume_ordering::<TotalOrder>(
411                    nondet!(/** membership events are ordered within each key */),
412                )
413                .map(q!(|join| {
414                    if join {
415                        MembershipEvent::Joined
416                    } else {
417                        MembershipEvent::Left
418                    }
419                })),
420            fwd_ref,
421        )
422    }
423
424    #[expect(clippy::type_complexity, reason = "stream markers")]
425    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
426        &self,
427        from: &External<L>,
428    ) -> (
429        ExternalBincodeBidi<InT, OutT, Many>,
430        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
431        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
432        ForwardRef<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
433    )
434    where
435        Self: Sized + NoTick,
436    {
437        let next_external_port_id = {
438            let mut flow_state = from.flow_state.borrow_mut();
439            let id = flow_state.next_external_out;
440            flow_state.next_external_out += 1;
441            id
442        };
443
444        let root = get_this_crate();
445
446        let (fwd_ref, to_sink) =
447            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
448        let mut flow_state_borrow = self.flow_state().borrow_mut();
449
450        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
451
452        let out_t_type = quote_type::<OutT>();
453        let ser_fn: syn::Expr = syn::parse_quote! {
454            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
455                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
456            )
457        };
458
459        leaves.push(HydroLeaf::SendExternal {
460            to_external_id: from.id,
461            to_key: next_external_port_id,
462            to_many: true,
463            serialize_fn: Some(ser_fn.into()),
464            instantiate_fn: DebugInstantiate::Building,
465            input: Box::new(HydroNode::Unpersist {
466                inner: Box::new(to_sink.entries().ir_node.into_inner()),
467                metadata: self.new_node_metadata::<(u64, Bytes)>(),
468            }),
469        });
470
471        let in_t_type = quote_type::<InT>();
472
473        let deser_fn: syn::Expr = syn::parse_quote! {
474            |res| {
475                let (id, b) = res.unwrap();
476                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
477            }
478        };
479
480        let raw_stream: Stream<(u64, InT), Self, Unbounded, NoOrder, ExactlyOnce> = Stream::new(
481            self.clone(),
482            HydroNode::Persist {
483                inner: Box::new(HydroNode::ExternalInput {
484                    from_external_id: from.id,
485                    from_key: next_external_port_id,
486                    from_many: true,
487                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
488                    port_hint: NetworkHint::Auto,
489                    instantiate_fn: DebugInstantiate::Building,
490                    deserialize_fn: Some(deser_fn.into()),
491                    metadata: self.new_node_metadata::<(u64, InT)>(),
492                }),
493                metadata: self.new_node_metadata::<(u64, InT)>(),
494            },
495        );
496
497        let membership_stream_ident = syn::Ident::new(
498            &format!(
499                "__hydro_deploy_many_{}_{}_membership",
500                from.id, next_external_port_id
501            ),
502            Span::call_site(),
503        );
504        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
505        let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, NoOrder, ExactlyOnce> =
506            Stream::new(
507                self.clone(),
508                HydroNode::Persist {
509                    inner: Box::new(HydroNode::Source {
510                        source: HydroSource::Stream(membership_stream_expr.into()),
511                        metadata: self.new_node_metadata::<(u64, bool)>(),
512                    }),
513                    metadata: self.new_node_metadata::<(u64, bool)>(),
514                },
515            );
516
517        (
518            ExternalBincodeBidi {
519                process_id: from.id,
520                port_id: next_external_port_id,
521                _phantom: PhantomData,
522            },
523            raw_stream.into_keyed().assume_ordering::<TotalOrder>(
524                nondet!(/** order of messages is deterministic within each key due to TCP */),
525            ),
526            raw_membership_stream
527                .into_keyed()
528                .assume_ordering::<TotalOrder>(
529                    nondet!(/** membership events are ordered within each key */),
530                )
531                .map(q!(|join| {
532                    if join {
533                        MembershipEvent::Joined
534                    } else {
535                        MembershipEvent::Left
536                    }
537                })),
538            fwd_ref,
539        )
540    }
541
542    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
543    where
544        T: Clone,
545        Self: Sized + NoTick,
546    {
547        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
548        // for bounded top-level singletons, and this is the only way to generate one
549
550        let e_arr = q!([e]);
551        let e = e_arr.splice_untyped_ctx(self);
552
553        // we do a double persist here because if the singleton shows up on every tick,
554        // we first persist the source so that we store that value and then persist again
555        // so that it grows every tick
556        Singleton::new(
557            self.clone(),
558            HydroNode::Persist {
559                inner: Box::new(HydroNode::Persist {
560                    inner: Box::new(HydroNode::Source {
561                        source: HydroSource::Iter(e.into()),
562                        metadata: self.new_node_metadata::<T>(),
563                    }),
564                    metadata: self.new_node_metadata::<T>(),
565                }),
566                metadata: self.new_node_metadata::<T>(),
567            },
568        )
569    }
570
571    /// Generates a stream with values emitted at a fixed interval, with
572    /// each value being the current time (as an [`tokio::time::Instant`]).
573    ///
574    /// The clock source used is monotonic, so elements will be emitted in
575    /// increasing order.
576    ///
577    /// # Non-Determinism
578    /// Because this stream is generated by an OS timer, it will be
579    /// non-deterministic because each timestamp will be arbitrary.
580    fn source_interval(
581        &self,
582        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
583        _nondet: NonDet,
584    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
585    where
586        Self: Sized + NoTick,
587    {
588        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
589            tokio::time::interval(interval)
590        )))
591    }
592
593    /// Generates a stream with values emitted at a fixed interval (with an
594    /// initial delay), with each value being the current time
595    /// (as an [`tokio::time::Instant`]).
596    ///
597    /// The clock source used is monotonic, so elements will be emitted in
598    /// increasing order.
599    ///
600    /// # Non-Determinism
601    /// Because this stream is generated by an OS timer, it will be
602    /// non-deterministic because each timestamp will be arbitrary.
603    fn source_interval_delayed(
604        &self,
605        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
606        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
607        _nondet: NonDet,
608    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
609    where
610        Self: Sized + NoTick,
611    {
612        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
613            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
614        )))
615    }
616
617    fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
618    where
619        S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
620        Self: NoTick,
621    {
622        let next_id = self.flow_state().borrow_mut().next_cycle_id();
623        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
624
625        (
626            ForwardRef {
627                completed: false,
628                ident: ident.clone(),
629                expected_location: self.id(),
630                _phantom: PhantomData,
631            },
632            S::create_source(ident, self.clone()),
633        )
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use std::collections::HashSet;
640
641    use futures::{SinkExt, StreamExt};
642    use hydro_deploy::Deployment;
643    use stageleft::q;
644    use tokio_util::codec::LengthDelimitedCodec;
645
646    use crate::{FlowBuilder, Location, NetworkHint};
647
648    #[tokio::test]
649    async fn external_bytes() {
650        let mut deployment = Deployment::new();
651
652        let flow = FlowBuilder::new();
653        let first_node = flow.process::<()>();
654        let external = flow.external::<()>();
655
656        let (in_port, input) = first_node.source_external_bytes(&external);
657        let out = input
658            .map(q!(|r| r.unwrap()))
659            .send_bincode_external(&external);
660
661        let nodes = flow
662            .with_process(&first_node, deployment.Localhost())
663            .with_external(&external, deployment.Localhost())
664            .deploy(&mut deployment);
665
666        deployment.deploy().await.unwrap();
667
668        let mut external_in = nodes.connect_sink_bytes(in_port).await;
669        let mut external_out = nodes.connect_source_bincode(out).await;
670
671        deployment.start().await.unwrap();
672
673        external_in.send(vec![1, 2, 3].into()).await.unwrap();
674
675        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
676    }
677
678    #[tokio::test]
679    async fn multi_external_source() {
680        let mut deployment = Deployment::new();
681
682        let flow = FlowBuilder::new();
683        let first_node = flow.process::<()>();
684        let external = flow.external::<()>();
685
686        let (in_port, input, _membership, complete_sink) =
687            first_node.bidi_external_many_bincode(&external);
688        let out = input.entries().send_bincode_external(&external);
689        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
690
691        let nodes = flow
692            .with_process(&first_node, deployment.Localhost())
693            .with_external(&external, deployment.Localhost())
694            .deploy(&mut deployment);
695
696        deployment.deploy().await.unwrap();
697
698        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
699        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
700        let external_out = nodes.connect_source_bincode(out).await;
701
702        deployment.start().await.unwrap();
703
704        external_in_1.send(123).await.unwrap();
705        external_in_2.send(456).await.unwrap();
706
707        assert_eq!(
708            external_out.take(2).collect::<HashSet<_>>().await,
709            vec![(0, 123), (1, 456)].into_iter().collect()
710        );
711    }
712
713    #[tokio::test]
714    async fn second_connection_only_multi_source() {
715        let mut deployment = Deployment::new();
716
717        let flow = FlowBuilder::new();
718        let first_node = flow.process::<()>();
719        let external = flow.external::<()>();
720
721        let (in_port, input, _membership, complete_sink) =
722            first_node.bidi_external_many_bincode(&external);
723        let out = input.entries().send_bincode_external(&external);
724        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
725
726        let nodes = flow
727            .with_process(&first_node, deployment.Localhost())
728            .with_external(&external, deployment.Localhost())
729            .deploy(&mut deployment);
730
731        deployment.deploy().await.unwrap();
732
733        // intentionally skipped to test stream waking logic
734        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
735        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
736        let mut external_out = nodes.connect_source_bincode(out).await;
737
738        deployment.start().await.unwrap();
739
740        external_in_2.send(456).await.unwrap();
741
742        assert_eq!(external_out.next().await.unwrap(), (1, 456));
743    }
744
745    #[tokio::test]
746    async fn multi_external_bytes() {
747        let mut deployment = Deployment::new();
748
749        let flow = FlowBuilder::new();
750        let first_node = flow.process::<()>();
751        let external = flow.external::<()>();
752
753        let (in_port, input, _membership, complete_sink) = first_node
754            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
755        let out = input.entries().send_bincode_external(&external);
756        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
757
758        let nodes = flow
759            .with_process(&first_node, deployment.Localhost())
760            .with_external(&external, deployment.Localhost())
761            .deploy(&mut deployment);
762
763        deployment.deploy().await.unwrap();
764
765        let mut external_in_1 = nodes.connect_sink_bytes(in_port.clone()).await;
766        let mut external_in_2 = nodes.connect_sink_bytes(in_port).await;
767        let external_out = nodes.connect_source_bincode(out).await;
768
769        deployment.start().await.unwrap();
770
771        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
772        external_in_2.send(vec![4, 5].into()).await.unwrap();
773
774        assert_eq!(
775            external_out.take(2).collect::<HashSet<_>>().await,
776            vec![
777                (0, (&[1u8, 2, 3] as &[u8]).into()),
778                (1, (&[4u8, 5] as &[u8]).into())
779            ]
780            .into_iter()
781            .collect()
782        );
783    }
784
785    #[tokio::test]
786    async fn echo_external_bytes() {
787        let mut deployment = Deployment::new();
788
789        let flow = FlowBuilder::new();
790        let first_node = flow.process::<()>();
791        let external = flow.external::<()>();
792
793        let (port, input, _membership, complete_sink) = first_node
794            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
795        complete_sink
796            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
797
798        let nodes = flow
799            .with_process(&first_node, deployment.Localhost())
800            .with_external(&external, deployment.Localhost())
801            .deploy(&mut deployment);
802
803        deployment.deploy().await.unwrap();
804
805        let (mut external_out_1, mut external_in_1) = nodes.connect_bytes(port.clone()).await;
806        let (mut external_out_2, mut external_in_2) = nodes.connect_bytes(port).await;
807
808        deployment.start().await.unwrap();
809
810        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
811        external_in_2.send(vec![4, 5].into()).await.unwrap();
812
813        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
814        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
815    }
816
817    #[tokio::test]
818    async fn echo_external_bincode() {
819        let mut deployment = Deployment::new();
820
821        let flow = FlowBuilder::new();
822        let first_node = flow.process::<()>();
823        let external = flow.external::<()>();
824
825        let (port, input, _membership, complete_sink) =
826            first_node.bidi_external_many_bincode(&external);
827        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
828
829        let nodes = flow
830            .with_process(&first_node, deployment.Localhost())
831            .with_external(&external, deployment.Localhost())
832            .deploy(&mut deployment);
833
834        deployment.deploy().await.unwrap();
835
836        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
837        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
838
839        deployment.start().await.unwrap();
840
841        external_in_1.send("hi".to_string()).await.unwrap();
842        external_in_2.send("hello".to_string()).await.unwrap();
843
844        assert_eq!(external_out_1.next().await.unwrap(), "HI");
845        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
846    }
847}