hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
37use crate::location::cluster::ClusterIds;
38use crate::location::dynamic::LocationId;
39use crate::location::external_process::{
40    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many,
41};
42use crate::nondet::{NonDet, nondet};
43use crate::staging_util::get_this_crate;
44
45pub mod dynamic;
46
47#[expect(missing_docs, reason = "TODO")]
48pub mod external_process;
49pub use external_process::External;
50
51#[expect(missing_docs, reason = "TODO")]
52pub mod process;
53pub use process::Process;
54
55#[expect(missing_docs, reason = "TODO")]
56pub mod cluster;
57pub use cluster::Cluster;
58
59#[expect(missing_docs, reason = "TODO")]
60pub mod member_id;
61pub use member_id::MemberId;
62
63#[expect(missing_docs, reason = "TODO")]
64pub mod tick;
65pub use tick::{Atomic, NoTick, Tick};
66
67#[expect(missing_docs, reason = "TODO")]
68#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
69pub enum MembershipEvent {
70    Joined,
71    Left,
72}
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum NetworkHint {
77    Auto,
78    TcpPort(Option<u16>),
79}
80
81pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
82    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
83}
84
85#[expect(missing_docs, reason = "TODO")]
86#[expect(
87    private_bounds,
88    reason = "only internal Hydro code can define location types"
89)]
90pub trait Location<'a>: dynamic::DynLocation {
91    type Root: Location<'a>;
92
93    fn root(&self) -> Self::Root;
94
95    fn id(&self) -> LocationId {
96        dynamic::DynLocation::id(self)
97    }
98
99    fn tick(&self) -> Tick<Self>
100    where
101        Self: NoTick,
102    {
103        let next_id = self.flow_state().borrow_mut().next_clock_id;
104        self.flow_state().borrow_mut().next_clock_id += 1;
105        Tick {
106            id: next_id,
107            l: self.clone(),
108        }
109    }
110
111    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
112    where
113        Self: Sized + NoTick,
114    {
115        Stream::new(
116            self.clone(),
117            HydroNode::Persist {
118                inner: Box::new(HydroNode::Source {
119                    source: HydroSource::Spin(),
120                    metadata: self.new_node_metadata::<()>(),
121                }),
122                metadata: self.new_node_metadata::<()>(),
123            },
124        )
125    }
126
127    fn source_stream<T, E>(
128        &self,
129        e: impl QuotedWithContext<'a, E, Self>,
130    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
131    where
132        E: FuturesStream<Item = T> + Unpin,
133        Self: Sized + NoTick,
134    {
135        let e = e.splice_untyped_ctx(self);
136
137        Stream::new(
138            self.clone(),
139            HydroNode::Persist {
140                inner: Box::new(HydroNode::Source {
141                    source: HydroSource::Stream(e.into()),
142                    metadata: self.new_node_metadata::<T>(),
143                }),
144                metadata: self.new_node_metadata::<T>(),
145            },
146        )
147    }
148
149    fn source_iter<T, E>(
150        &self,
151        e: impl QuotedWithContext<'a, E, Self>,
152    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
153    where
154        E: IntoIterator<Item = T>,
155        Self: Sized + NoTick,
156    {
157        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
158        // for bounded top-level streams, and this is the only way to generate one
159        let e = e.splice_untyped_ctx(self);
160
161        Stream::new(
162            self.clone(),
163            HydroNode::Persist {
164                inner: Box::new(HydroNode::Source {
165                    source: HydroSource::Iter(e.into()),
166                    metadata: self.new_node_metadata::<T>(),
167                }),
168                metadata: self.new_node_metadata::<T>(),
169            },
170        )
171    }
172
173    fn source_cluster_members<C: 'a>(
174        &self,
175        cluster: &Cluster<'a, C>,
176    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
177    where
178        Self: Sized + NoTick,
179    {
180        let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
181            id: cluster.id,
182            _phantom: PhantomData,
183        };
184
185        self.source_iter(q!(underlying_memberids))
186            .map(q!(|id| (*id, MembershipEvent::Joined)))
187            .into_keyed()
188    }
189
190    fn source_external_bytes<L>(
191        &self,
192        from: &External<L>,
193    ) -> (
194        ExternalBytesPort,
195        Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
196    )
197    where
198        Self: Sized + NoTick,
199    {
200        let next_external_port_id = {
201            let mut flow_state = from.flow_state.borrow_mut();
202            let id = flow_state.next_external_out;
203            flow_state.next_external_out += 1;
204            id
205        };
206
207        (
208            ExternalBytesPort {
209                process_id: from.id,
210                port_id: next_external_port_id,
211                _phantom: Default::default(),
212            },
213            Stream::new(
214                self.clone(),
215                HydroNode::Persist {
216                    inner: Box::new(HydroNode::ExternalInput {
217                        from_external_id: from.id,
218                        from_key: next_external_port_id,
219                        from_many: false,
220                        codec_type: quote_type::<LengthDelimitedCodec>().into(),
221                        port_hint: NetworkHint::Auto,
222                        instantiate_fn: DebugInstantiate::Building,
223                        deserialize_fn: None,
224                        metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
225                    }),
226                    metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
227                },
228            ),
229        )
230    }
231
232    fn source_external_bincode<L, T>(
233        &self,
234        from: &External<L>,
235    ) -> (
236        ExternalBincodeSink<T>,
237        Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>,
238    )
239    where
240        Self: Sized + NoTick,
241        T: Serialize + DeserializeOwned,
242    {
243        let next_external_port_id = {
244            let mut flow_state = from.flow_state.borrow_mut();
245            let id = flow_state.next_external_out;
246            flow_state.next_external_out += 1;
247            id
248        };
249
250        (
251            ExternalBincodeSink {
252                process_id: from.id,
253                port_id: next_external_port_id,
254                _phantom: PhantomData,
255            },
256            Stream::new(
257                self.clone(),
258                HydroNode::Persist {
259                    inner: Box::new(HydroNode::ExternalInput {
260                        from_external_id: from.id,
261                        from_key: next_external_port_id,
262                        from_many: false,
263                        codec_type: quote_type::<LengthDelimitedCodec>().into(),
264                        port_hint: NetworkHint::Auto,
265                        instantiate_fn: DebugInstantiate::Building,
266                        deserialize_fn: Some(
267                            crate::live_collections::stream::networking::deserialize_bincode::<T>(
268                                None,
269                            )
270                            .into(),
271                        ),
272                        metadata: self.new_node_metadata::<T>(),
273                    }),
274                    metadata: self.new_node_metadata::<T>(),
275                },
276            ),
277        )
278    }
279
280    #[expect(clippy::type_complexity, reason = "stream markers")]
281    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
282        &self,
283        from: &External<L>,
284        port_hint: NetworkHint,
285    ) -> (
286        ExternalBytesPort<Many>,
287        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
288        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
289        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
290    )
291    where
292        Self: Sized + NoTick,
293    {
294        let next_external_port_id = {
295            let mut flow_state = from.flow_state.borrow_mut();
296            let id = flow_state.next_external_out;
297            flow_state.next_external_out += 1;
298            id
299        };
300
301        let (fwd_ref, to_sink) =
302            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
303        let mut flow_state_borrow = self.flow_state().borrow_mut();
304
305        flow_state_borrow.push_root(HydroRoot::SendExternal {
306            to_external_id: from.id,
307            to_key: next_external_port_id,
308            to_many: true,
309            serialize_fn: None,
310            instantiate_fn: DebugInstantiate::Building,
311            input: Box::new(HydroNode::Unpersist {
312                inner: Box::new(to_sink.entries().ir_node.into_inner()),
313                metadata: self.new_node_metadata::<(u64, T)>(),
314            }),
315            op_metadata: HydroIrOpMetadata::new(),
316        });
317
318        let raw_stream: Stream<
319            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
320            Self,
321            Unbounded,
322            NoOrder,
323            ExactlyOnce,
324        > = Stream::new(
325            self.clone(),
326            HydroNode::Persist {
327                inner: Box::new(HydroNode::ExternalInput {
328                    from_external_id: from.id,
329                    from_key: next_external_port_id,
330                    from_many: true,
331                    codec_type: quote_type::<Codec>().into(),
332                    port_hint,
333                    instantiate_fn: DebugInstantiate::Building,
334                    deserialize_fn: None,
335                    metadata: self
336                        .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
337                }),
338                metadata: self
339                    .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
340            },
341        );
342
343        let membership_stream_ident = syn::Ident::new(
344            &format!(
345                "__hydro_deploy_many_{}_{}_membership",
346                from.id, next_external_port_id
347            ),
348            Span::call_site(),
349        );
350        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
351        let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, TotalOrder, ExactlyOnce> =
352            Stream::new(
353                self.clone(),
354                HydroNode::Persist {
355                    inner: Box::new(HydroNode::Source {
356                        source: HydroSource::Stream(membership_stream_expr.into()),
357                        metadata: self.new_node_metadata::<(u64, bool)>(),
358                    }),
359                    metadata: self.new_node_metadata::<(u64, bool)>(),
360                },
361            );
362
363        (
364            ExternalBytesPort {
365                process_id: from.id,
366                port_id: next_external_port_id,
367                _phantom: PhantomData,
368            },
369            raw_stream
370                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
371                .into_keyed()
372                .assume_ordering::<TotalOrder>(
373                    nondet!(/** order of messages is deterministic within each key due to TCP */)
374                ),
375            raw_membership_stream
376                .into_keyed()
377                .assume_ordering::<TotalOrder>(
378                    nondet!(/** membership events are ordered within each key */),
379                )
380                .map(q!(|join| {
381                    if join {
382                        MembershipEvent::Joined
383                    } else {
384                        MembershipEvent::Left
385                    }
386                })),
387            fwd_ref,
388        )
389    }
390
391    #[expect(clippy::type_complexity, reason = "stream markers")]
392    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
393        &self,
394        from: &External<L>,
395    ) -> (
396        ExternalBincodeBidi<InT, OutT, Many>,
397        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
398        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
399        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
400    )
401    where
402        Self: Sized + NoTick,
403    {
404        let next_external_port_id = {
405            let mut flow_state = from.flow_state.borrow_mut();
406            let id = flow_state.next_external_out;
407            flow_state.next_external_out += 1;
408            id
409        };
410
411        let root = get_this_crate();
412
413        let (fwd_ref, to_sink) =
414            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
415        let mut flow_state_borrow = self.flow_state().borrow_mut();
416
417        let out_t_type = quote_type::<OutT>();
418        let ser_fn: syn::Expr = syn::parse_quote! {
419            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
420                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
421            )
422        };
423
424        flow_state_borrow.push_root(HydroRoot::SendExternal {
425            to_external_id: from.id,
426            to_key: next_external_port_id,
427            to_many: true,
428            serialize_fn: Some(ser_fn.into()),
429            instantiate_fn: DebugInstantiate::Building,
430            input: Box::new(HydroNode::Unpersist {
431                inner: Box::new(to_sink.entries().ir_node.into_inner()),
432                metadata: self.new_node_metadata::<(u64, Bytes)>(),
433            }),
434            op_metadata: HydroIrOpMetadata::new(),
435        });
436
437        let in_t_type = quote_type::<InT>();
438
439        let deser_fn: syn::Expr = syn::parse_quote! {
440            |res| {
441                let (id, b) = res.unwrap();
442                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
443            }
444        };
445
446        let raw_stream: Stream<(u64, InT), Self, Unbounded, NoOrder, ExactlyOnce> = Stream::new(
447            self.clone(),
448            HydroNode::Persist {
449                inner: Box::new(HydroNode::ExternalInput {
450                    from_external_id: from.id,
451                    from_key: next_external_port_id,
452                    from_many: true,
453                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
454                    port_hint: NetworkHint::Auto,
455                    instantiate_fn: DebugInstantiate::Building,
456                    deserialize_fn: Some(deser_fn.into()),
457                    metadata: self.new_node_metadata::<(u64, InT)>(),
458                }),
459                metadata: self.new_node_metadata::<(u64, InT)>(),
460            },
461        );
462
463        let membership_stream_ident = syn::Ident::new(
464            &format!(
465                "__hydro_deploy_many_{}_{}_membership",
466                from.id, next_external_port_id
467            ),
468            Span::call_site(),
469        );
470        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
471        let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, NoOrder, ExactlyOnce> =
472            Stream::new(
473                self.clone(),
474                HydroNode::Persist {
475                    inner: Box::new(HydroNode::Source {
476                        source: HydroSource::Stream(membership_stream_expr.into()),
477                        metadata: self.new_node_metadata::<(u64, bool)>(),
478                    }),
479                    metadata: self.new_node_metadata::<(u64, bool)>(),
480                },
481            );
482
483        (
484            ExternalBincodeBidi {
485                process_id: from.id,
486                port_id: next_external_port_id,
487                _phantom: PhantomData,
488            },
489            raw_stream.into_keyed().assume_ordering::<TotalOrder>(
490                nondet!(/** order of messages is deterministic within each key due to TCP */),
491            ),
492            raw_membership_stream
493                .into_keyed()
494                .assume_ordering::<TotalOrder>(
495                    nondet!(/** membership events are ordered within each key */),
496                )
497                .map(q!(|join| {
498                    if join {
499                        MembershipEvent::Joined
500                    } else {
501                        MembershipEvent::Left
502                    }
503                })),
504            fwd_ref,
505        )
506    }
507
508    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
509    where
510        T: Clone,
511        Self: Sized + NoTick,
512    {
513        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
514        // for bounded top-level singletons, and this is the only way to generate one
515
516        let e_arr = q!([e]);
517        let e = e_arr.splice_untyped_ctx(self);
518
519        // we do a double persist here because if the singleton shows up on every tick,
520        // we first persist the source so that we store that value and then persist again
521        // so that it grows every tick
522        Singleton::new(
523            self.clone(),
524            HydroNode::Persist {
525                inner: Box::new(HydroNode::Persist {
526                    inner: Box::new(HydroNode::Source {
527                        source: HydroSource::Iter(e.into()),
528                        metadata: self.new_node_metadata::<T>(),
529                    }),
530                    metadata: self.new_node_metadata::<T>(),
531                }),
532                metadata: self.new_node_metadata::<T>(),
533            },
534        )
535    }
536
537    /// Generates a stream with values emitted at a fixed interval, with
538    /// each value being the current time (as an [`tokio::time::Instant`]).
539    ///
540    /// The clock source used is monotonic, so elements will be emitted in
541    /// increasing order.
542    ///
543    /// # Non-Determinism
544    /// Because this stream is generated by an OS timer, it will be
545    /// non-deterministic because each timestamp will be arbitrary.
546    fn source_interval(
547        &self,
548        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
549        _nondet: NonDet,
550    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
551    where
552        Self: Sized + NoTick,
553    {
554        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
555            tokio::time::interval(interval)
556        )))
557    }
558
559    /// Generates a stream with values emitted at a fixed interval (with an
560    /// initial delay), with each value being the current time
561    /// (as an [`tokio::time::Instant`]).
562    ///
563    /// The clock source used is monotonic, so elements will be emitted in
564    /// increasing order.
565    ///
566    /// # Non-Determinism
567    /// Because this stream is generated by an OS timer, it will be
568    /// non-deterministic because each timestamp will be arbitrary.
569    fn source_interval_delayed(
570        &self,
571        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
572        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
573        _nondet: NonDet,
574    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
575    where
576        Self: Sized + NoTick,
577    {
578        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
579            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
580        )))
581    }
582
583    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
584    where
585        S: CycleCollection<'a, ForwardRef, Location = Self>,
586        Self: NoTick,
587    {
588        let next_id = self.flow_state().borrow_mut().next_cycle_id();
589        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
590
591        (
592            ForwardHandle {
593                completed: false,
594                ident: ident.clone(),
595                expected_location: Location::id(self),
596                _phantom: PhantomData,
597            },
598            S::create_source(ident, self.clone()),
599        )
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use std::collections::HashSet;
606
607    use futures::{SinkExt, StreamExt};
608    use hydro_deploy::Deployment;
609    use stageleft::q;
610    use tokio_util::codec::LengthDelimitedCodec;
611
612    use crate::compile::builder::FlowBuilder;
613    use crate::location::{Location, NetworkHint};
614
615    #[tokio::test]
616    async fn external_bytes() {
617        let mut deployment = Deployment::new();
618
619        let flow = FlowBuilder::new();
620        let first_node = flow.process::<()>();
621        let external = flow.external::<()>();
622
623        let (in_port, input) = first_node.source_external_bytes(&external);
624        let out = input
625            .map(q!(|r| r.unwrap()))
626            .send_bincode_external(&external);
627
628        let nodes = flow
629            .with_process(&first_node, deployment.Localhost())
630            .with_external(&external, deployment.Localhost())
631            .deploy(&mut deployment);
632
633        deployment.deploy().await.unwrap();
634
635        let mut external_in = nodes.connect_sink_bytes(in_port).await;
636        let mut external_out = nodes.connect_source_bincode(out).await;
637
638        deployment.start().await.unwrap();
639
640        external_in.send(vec![1, 2, 3].into()).await.unwrap();
641
642        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
643    }
644
645    #[tokio::test]
646    async fn multi_external_source() {
647        let mut deployment = Deployment::new();
648
649        let flow = FlowBuilder::new();
650        let first_node = flow.process::<()>();
651        let external = flow.external::<()>();
652
653        let (in_port, input, _membership, complete_sink) =
654            first_node.bidi_external_many_bincode(&external);
655        let out = input.entries().send_bincode_external(&external);
656        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
657
658        let nodes = flow
659            .with_process(&first_node, deployment.Localhost())
660            .with_external(&external, deployment.Localhost())
661            .deploy(&mut deployment);
662
663        deployment.deploy().await.unwrap();
664
665        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
666        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
667        let external_out = nodes.connect_source_bincode(out).await;
668
669        deployment.start().await.unwrap();
670
671        external_in_1.send(123).await.unwrap();
672        external_in_2.send(456).await.unwrap();
673
674        assert_eq!(
675            external_out.take(2).collect::<HashSet<_>>().await,
676            vec![(0, 123), (1, 456)].into_iter().collect()
677        );
678    }
679
680    #[tokio::test]
681    async fn second_connection_only_multi_source() {
682        let mut deployment = Deployment::new();
683
684        let flow = FlowBuilder::new();
685        let first_node = flow.process::<()>();
686        let external = flow.external::<()>();
687
688        let (in_port, input, _membership, complete_sink) =
689            first_node.bidi_external_many_bincode(&external);
690        let out = input.entries().send_bincode_external(&external);
691        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
692
693        let nodes = flow
694            .with_process(&first_node, deployment.Localhost())
695            .with_external(&external, deployment.Localhost())
696            .deploy(&mut deployment);
697
698        deployment.deploy().await.unwrap();
699
700        // intentionally skipped to test stream waking logic
701        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
702        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
703        let mut external_out = nodes.connect_source_bincode(out).await;
704
705        deployment.start().await.unwrap();
706
707        external_in_2.send(456).await.unwrap();
708
709        assert_eq!(external_out.next().await.unwrap(), (1, 456));
710    }
711
712    #[tokio::test]
713    async fn multi_external_bytes() {
714        let mut deployment = Deployment::new();
715
716        let flow = FlowBuilder::new();
717        let first_node = flow.process::<()>();
718        let external = flow.external::<()>();
719
720        let (in_port, input, _membership, complete_sink) = first_node
721            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
722        let out = input.entries().send_bincode_external(&external);
723        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
724
725        let nodes = flow
726            .with_process(&first_node, deployment.Localhost())
727            .with_external(&external, deployment.Localhost())
728            .deploy(&mut deployment);
729
730        deployment.deploy().await.unwrap();
731
732        let mut external_in_1 = nodes.connect_sink_bytes(in_port.clone()).await;
733        let mut external_in_2 = nodes.connect_sink_bytes(in_port).await;
734        let external_out = nodes.connect_source_bincode(out).await;
735
736        deployment.start().await.unwrap();
737
738        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
739        external_in_2.send(vec![4, 5].into()).await.unwrap();
740
741        assert_eq!(
742            external_out.take(2).collect::<HashSet<_>>().await,
743            vec![
744                (0, (&[1u8, 2, 3] as &[u8]).into()),
745                (1, (&[4u8, 5] as &[u8]).into())
746            ]
747            .into_iter()
748            .collect()
749        );
750    }
751
752    #[tokio::test]
753    async fn echo_external_bytes() {
754        let mut deployment = Deployment::new();
755
756        let flow = FlowBuilder::new();
757        let first_node = flow.process::<()>();
758        let external = flow.external::<()>();
759
760        let (port, input, _membership, complete_sink) = first_node
761            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
762        complete_sink
763            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
764
765        let nodes = flow
766            .with_process(&first_node, deployment.Localhost())
767            .with_external(&external, deployment.Localhost())
768            .deploy(&mut deployment);
769
770        deployment.deploy().await.unwrap();
771
772        let (mut external_out_1, mut external_in_1) = nodes.connect_bytes(port.clone()).await;
773        let (mut external_out_2, mut external_in_2) = nodes.connect_bytes(port).await;
774
775        deployment.start().await.unwrap();
776
777        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
778        external_in_2.send(vec![4, 5].into()).await.unwrap();
779
780        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
781        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
782    }
783
784    #[tokio::test]
785    async fn echo_external_bincode() {
786        let mut deployment = Deployment::new();
787
788        let flow = FlowBuilder::new();
789        let first_node = flow.process::<()>();
790        let external = flow.external::<()>();
791
792        let (port, input, _membership, complete_sink) =
793            first_node.bidi_external_many_bincode(&external);
794        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
795
796        let nodes = flow
797            .with_process(&first_node, deployment.Localhost())
798            .with_external(&external, deployment.Localhost())
799            .deploy(&mut deployment);
800
801        deployment.deploy().await.unwrap();
802
803        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
804        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
805
806        deployment.start().await.unwrap();
807
808        external_in_1.send("hi".to_string()).await.unwrap();
809        external_in_2.send("hello".to_string()).await.unwrap();
810
811        assert_eq!(external_out_1.next().await.unwrap(), "HI");
812        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
813    }
814}