hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::BytesMut;
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::cluster::ClusterIds;
40use crate::location::dynamic::LocationId;
41use crate::location::external_process::{
42    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
43};
44use crate::nondet::NonDet;
45use crate::staging_util::get_this_crate;
46
47pub mod dynamic;
48
49#[expect(missing_docs, reason = "TODO")]
50pub mod external_process;
51pub use external_process::External;
52
53#[expect(missing_docs, reason = "TODO")]
54pub mod process;
55pub use process::Process;
56
57#[expect(missing_docs, reason = "TODO")]
58pub mod cluster;
59pub use cluster::Cluster;
60
61#[expect(missing_docs, reason = "TODO")]
62pub mod member_id;
63pub use member_id::MemberId;
64
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72    Joined,
73    Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79    Auto,
80    TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89    private_bounds,
90    reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93    type Root: Location<'a>;
94
95    fn root(&self) -> Self::Root;
96
97    fn try_tick(&self) -> Option<Tick<Self>> {
98        if Self::is_top_level() {
99            let next_id = self.flow_state().borrow_mut().next_clock_id;
100            self.flow_state().borrow_mut().next_clock_id += 1;
101            Some(Tick {
102                id: next_id,
103                l: self.clone(),
104            })
105        } else {
106            None
107        }
108    }
109
110    fn id(&self) -> LocationId {
111        dynamic::DynLocation::id(self)
112    }
113
114    fn tick(&self) -> Tick<Self>
115    where
116        Self: NoTick,
117    {
118        let next_id = self.flow_state().borrow_mut().next_clock_id;
119        self.flow_state().borrow_mut().next_clock_id += 1;
120        Tick {
121            id: next_id,
122            l: self.clone(),
123        }
124    }
125
126    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127    where
128        Self: Sized + NoTick,
129    {
130        Stream::new(
131            self.clone(),
132            HydroNode::Source {
133                source: HydroSource::Spin(),
134                metadata: self.new_node_metadata(Stream::<
135                    (),
136                    Self,
137                    Unbounded,
138                    TotalOrder,
139                    ExactlyOnce,
140                >::collection_kind()),
141            },
142        )
143    }
144
145    fn source_stream<T, E>(
146        &self,
147        e: impl QuotedWithContext<'a, E, Self>,
148    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149    where
150        E: FuturesStream<Item = T> + Unpin,
151        Self: Sized + NoTick,
152    {
153        let e = e.splice_untyped_ctx(self);
154
155        Stream::new(
156            self.clone(),
157            HydroNode::Source {
158                source: HydroSource::Stream(e.into()),
159                metadata: self.new_node_metadata(Stream::<
160                    T,
161                    Self,
162                    Unbounded,
163                    TotalOrder,
164                    ExactlyOnce,
165                >::collection_kind()),
166            },
167        )
168    }
169
170    fn source_iter<T, E>(
171        &self,
172        e: impl QuotedWithContext<'a, E, Self>,
173    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174    where
175        E: IntoIterator<Item = T>,
176        Self: Sized + NoTick,
177    {
178        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
179        // for bounded top-level streams, and this is the only way to generate one
180        let e = e.splice_untyped_ctx(self);
181
182        Stream::new(
183            self.clone(),
184            HydroNode::Source {
185                source: HydroSource::Iter(e.into()),
186                metadata: self.new_node_metadata(Stream::<
187                    T,
188                    Self,
189                    Unbounded,
190                    TotalOrder,
191                    ExactlyOnce,
192                >::collection_kind()),
193            },
194        )
195    }
196
197    fn source_cluster_members<C: 'a>(
198        &self,
199        cluster: &Cluster<'a, C>,
200    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201    where
202        Self: Sized + NoTick,
203    {
204        let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
205            id: cluster.id,
206            _phantom: PhantomData,
207        };
208
209        self.source_iter(q!(underlying_memberids))
210            .map(q!(|id| (*id, MembershipEvent::Joined)))
211            .into_keyed()
212    }
213
214    fn source_external_bytes<L>(
215        &self,
216        from: &External<L>,
217    ) -> (
218        ExternalBytesPort,
219        Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
220    )
221    where
222        Self: Sized + NoTick,
223    {
224        let next_external_port_id = {
225            let mut flow_state = from.flow_state.borrow_mut();
226            let id = flow_state.next_external_out;
227            flow_state.next_external_out += 1;
228            id
229        };
230
231        (
232            ExternalBytesPort {
233                process_id: from.id,
234                port_id: next_external_port_id,
235                _phantom: Default::default(),
236            },
237            Stream::new(
238                self.clone(),
239                HydroNode::ExternalInput {
240                    from_external_id: from.id,
241                    from_key: next_external_port_id,
242                    from_many: false,
243                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
244                    port_hint: NetworkHint::Auto,
245                    instantiate_fn: DebugInstantiate::Building,
246                    deserialize_fn: None,
247                    metadata: self.new_node_metadata(Stream::<
248                        std::io::Result<BytesMut>,
249                        Self,
250                        Unbounded,
251                        TotalOrder,
252                        ExactlyOnce,
253                    >::collection_kind()),
254                },
255            ),
256        )
257    }
258
259    #[expect(clippy::type_complexity, reason = "stream markers")]
260    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
261        &self,
262        from: &External<L>,
263    ) -> (
264        ExternalBincodeSink<T, NotMany, O, R>,
265        Stream<T, Self, Unbounded, O, R>,
266    )
267    where
268        Self: Sized + NoTick,
269        T: Serialize + DeserializeOwned,
270    {
271        let next_external_port_id = {
272            let mut flow_state = from.flow_state.borrow_mut();
273            let id = flow_state.next_external_out;
274            flow_state.next_external_out += 1;
275            id
276        };
277
278        (
279            ExternalBincodeSink {
280                process_id: from.id,
281                port_id: next_external_port_id,
282                _phantom: PhantomData,
283            },
284            Stream::new(
285                self.clone(),
286                HydroNode::ExternalInput {
287                    from_external_id: from.id,
288                    from_key: next_external_port_id,
289                    from_many: false,
290                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
291                    port_hint: NetworkHint::Auto,
292                    instantiate_fn: DebugInstantiate::Building,
293                    deserialize_fn: Some(
294                        crate::live_collections::stream::networking::deserialize_bincode::<T>(None)
295                            .into(),
296                    ),
297                    metadata: self
298                        .new_node_metadata(Stream::<T, Self, Unbounded, O, R>::collection_kind()),
299                },
300            ),
301        )
302    }
303
304    #[expect(clippy::type_complexity, reason = "stream markers")]
305    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
306        &self,
307        from: &External<L>,
308        port_hint: NetworkHint,
309    ) -> (
310        ExternalBytesPort<Many>,
311        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
312        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
313        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
314    )
315    where
316        Self: Sized + NoTick,
317    {
318        let next_external_port_id = {
319            let mut flow_state = from.flow_state.borrow_mut();
320            let id = flow_state.next_external_out;
321            flow_state.next_external_out += 1;
322            id
323        };
324
325        let (fwd_ref, to_sink) =
326            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
327        let mut flow_state_borrow = self.flow_state().borrow_mut();
328
329        flow_state_borrow.push_root(HydroRoot::SendExternal {
330            to_external_id: from.id,
331            to_key: next_external_port_id,
332            to_many: true,
333            serialize_fn: None,
334            instantiate_fn: DebugInstantiate::Building,
335            input: Box::new(to_sink.entries().ir_node.into_inner()),
336            op_metadata: HydroIrOpMetadata::new(),
337        });
338
339        let raw_stream: Stream<
340            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
341            Self,
342            Unbounded,
343            TotalOrder,
344            ExactlyOnce,
345        > = Stream::new(
346            self.clone(),
347            HydroNode::ExternalInput {
348                from_external_id: from.id,
349                from_key: next_external_port_id,
350                from_many: true,
351                codec_type: quote_type::<Codec>().into(),
352                port_hint,
353                instantiate_fn: DebugInstantiate::Building,
354                deserialize_fn: None,
355                metadata: self.new_node_metadata(Stream::<
356                    std::io::Result<(u64, <Codec as Decoder>::Item)>,
357                    Self,
358                    Unbounded,
359                    TotalOrder,
360                    ExactlyOnce,
361                >::collection_kind()),
362            },
363        );
364
365        let membership_stream_ident = syn::Ident::new(
366            &format!(
367                "__hydro_deploy_many_{}_{}_membership",
368                from.id, next_external_port_id
369            ),
370            Span::call_site(),
371        );
372        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
373        let raw_membership_stream: KeyedStream<
374            u64,
375            bool,
376            Self,
377            Unbounded,
378            TotalOrder,
379            ExactlyOnce,
380        > = KeyedStream::new(
381            self.clone(),
382            HydroNode::Source {
383                source: HydroSource::Stream(membership_stream_expr.into()),
384                metadata: self.new_node_metadata(KeyedStream::<
385                    u64,
386                    bool,
387                    Self,
388                    Unbounded,
389                    TotalOrder,
390                    ExactlyOnce,
391                >::collection_kind()),
392            },
393        );
394
395        (
396            ExternalBytesPort {
397                process_id: from.id,
398                port_id: next_external_port_id,
399                _phantom: PhantomData,
400            },
401            raw_stream
402                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
403                .into_keyed(),
404            raw_membership_stream.map(q!(|join| {
405                if join {
406                    MembershipEvent::Joined
407                } else {
408                    MembershipEvent::Left
409                }
410            })),
411            fwd_ref,
412        )
413    }
414
415    #[expect(clippy::type_complexity, reason = "stream markers")]
416    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
417        &self,
418        from: &External<L>,
419    ) -> (
420        ExternalBincodeBidi<InT, OutT, Many>,
421        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
422        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
423        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
424    )
425    where
426        Self: Sized + NoTick,
427    {
428        let next_external_port_id = {
429            let mut flow_state = from.flow_state.borrow_mut();
430            let id = flow_state.next_external_out;
431            flow_state.next_external_out += 1;
432            id
433        };
434
435        let root = get_this_crate();
436
437        let (fwd_ref, to_sink) =
438            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
439        let mut flow_state_borrow = self.flow_state().borrow_mut();
440
441        let out_t_type = quote_type::<OutT>();
442        let ser_fn: syn::Expr = syn::parse_quote! {
443            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
444                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
445            )
446        };
447
448        flow_state_borrow.push_root(HydroRoot::SendExternal {
449            to_external_id: from.id,
450            to_key: next_external_port_id,
451            to_many: true,
452            serialize_fn: Some(ser_fn.into()),
453            instantiate_fn: DebugInstantiate::Building,
454            input: Box::new(to_sink.entries().ir_node.into_inner()),
455            op_metadata: HydroIrOpMetadata::new(),
456        });
457
458        let in_t_type = quote_type::<InT>();
459
460        let deser_fn: syn::Expr = syn::parse_quote! {
461            |res| {
462                let (id, b) = res.unwrap();
463                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
464            }
465        };
466
467        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
468            KeyedStream::new(
469                self.clone(),
470                HydroNode::ExternalInput {
471                    from_external_id: from.id,
472                    from_key: next_external_port_id,
473                    from_many: true,
474                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
475                    port_hint: NetworkHint::Auto,
476                    instantiate_fn: DebugInstantiate::Building,
477                    deserialize_fn: Some(deser_fn.into()),
478                    metadata: self.new_node_metadata(KeyedStream::<
479                        u64,
480                        InT,
481                        Self,
482                        Unbounded,
483                        TotalOrder,
484                        ExactlyOnce,
485                    >::collection_kind()),
486                },
487            );
488
489        let membership_stream_ident = syn::Ident::new(
490            &format!(
491                "__hydro_deploy_many_{}_{}_membership",
492                from.id, next_external_port_id
493            ),
494            Span::call_site(),
495        );
496        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
497        let raw_membership_stream: KeyedStream<
498            u64,
499            bool,
500            Self,
501            Unbounded,
502            TotalOrder,
503            ExactlyOnce,
504        > = KeyedStream::new(
505            self.clone(),
506            HydroNode::Source {
507                source: HydroSource::Stream(membership_stream_expr.into()),
508                metadata: self.new_node_metadata(KeyedStream::<
509                    u64,
510                    bool,
511                    Self,
512                    Unbounded,
513                    TotalOrder,
514                    ExactlyOnce,
515                >::collection_kind()),
516            },
517        );
518
519        (
520            ExternalBincodeBidi {
521                process_id: from.id,
522                port_id: next_external_port_id,
523                _phantom: PhantomData,
524            },
525            raw_stream,
526            raw_membership_stream.map(q!(|join| {
527                if join {
528                    MembershipEvent::Joined
529                } else {
530                    MembershipEvent::Left
531                }
532            })),
533            fwd_ref,
534        )
535    }
536
537    /// Constructs a [`Singleton`] materialized at this location with the given static value.
538    ///
539    /// # Example
540    /// ```rust
541    /// # use hydro_lang::prelude::*;
542    /// # use futures::StreamExt;
543    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
544    /// let tick = process.tick();
545    /// let singleton = tick.singleton(q!(5));
546    /// # singleton.all_ticks()
547    /// # }, |mut stream| async move {
548    /// // 5
549    /// # assert_eq!(stream.next().await.unwrap(), 5);
550    /// # }));
551    /// ```
552    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
553    where
554        T: Clone,
555        Self: Sized,
556    {
557        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
558        // for bounded top-level singletons, and this is the only way to generate one
559
560        let e_arr = q!([e]);
561        let e = e_arr.splice_untyped_ctx(self);
562
563        if Self::is_top_level() {
564            Singleton::new(
565                self.clone(),
566                HydroNode::Persist {
567                    inner: Box::new(HydroNode::Source {
568                        source: HydroSource::Iter(e.into()),
569                        metadata: self.new_node_metadata(Stream::<
570                            T,
571                            Self,
572                            Unbounded,
573                            TotalOrder,
574                            ExactlyOnce,
575                        >::collection_kind(
576                        )),
577                    }),
578                    metadata: self
579                        .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
580                },
581            )
582        } else {
583            Singleton::new(
584                self.clone(),
585                HydroNode::Source {
586                    source: HydroSource::Iter(e.into()),
587                    metadata: self
588                        .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
589                },
590            )
591        }
592    }
593
594    /// Generates a stream with values emitted at a fixed interval, with
595    /// each value being the current time (as an [`tokio::time::Instant`]).
596    ///
597    /// The clock source used is monotonic, so elements will be emitted in
598    /// increasing order.
599    ///
600    /// # Non-Determinism
601    /// Because this stream is generated by an OS timer, it will be
602    /// non-deterministic because each timestamp will be arbitrary.
603    fn source_interval(
604        &self,
605        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
606        _nondet: NonDet,
607    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
608    where
609        Self: Sized + NoTick,
610    {
611        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
612            tokio::time::interval(interval)
613        )))
614    }
615
616    /// Generates a stream with values emitted at a fixed interval (with an
617    /// initial delay), with each value being the current time
618    /// (as an [`tokio::time::Instant`]).
619    ///
620    /// The clock source used is monotonic, so elements will be emitted in
621    /// increasing order.
622    ///
623    /// # Non-Determinism
624    /// Because this stream is generated by an OS timer, it will be
625    /// non-deterministic because each timestamp will be arbitrary.
626    fn source_interval_delayed(
627        &self,
628        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
629        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
630        _nondet: NonDet,
631    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
632    where
633        Self: Sized + NoTick,
634    {
635        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
636            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
637        )))
638    }
639
640    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
641    where
642        S: CycleCollection<'a, ForwardRef, Location = Self>,
643        Self: NoTick,
644    {
645        let next_id = self.flow_state().borrow_mut().next_cycle_id();
646        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
647
648        (
649            ForwardHandle {
650                completed: false,
651                ident: ident.clone(),
652                expected_location: Location::id(self),
653                _phantom: PhantomData,
654            },
655            S::create_source(ident, self.clone()),
656        )
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use std::collections::HashSet;
663
664    use futures::{SinkExt, StreamExt};
665    use hydro_deploy::Deployment;
666    use stageleft::q;
667    use tokio_util::codec::LengthDelimitedCodec;
668
669    use crate::compile::builder::FlowBuilder;
670    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
671    use crate::location::{Location, NetworkHint};
672    use crate::nondet::nondet;
673
674    #[tokio::test]
675    async fn top_level_singleton_replay_cardinality() {
676        let mut deployment = Deployment::new();
677
678        let flow = FlowBuilder::new();
679        let node = flow.process::<()>();
680        let external = flow.external::<()>();
681
682        let (in_port, input) =
683            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
684        let singleton = node.singleton(q!(123));
685        let tick = node.tick();
686        let out = input
687            .batch(&tick, nondet!(/** test */))
688            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
689            .cross_singleton(
690                singleton
691                    .snapshot(&tick, nondet!(/** test */))
692                    .into_stream()
693                    .count(),
694            )
695            .all_ticks()
696            .send_bincode_external(&external);
697
698        let nodes = flow
699            .with_process(&node, deployment.Localhost())
700            .with_external(&external, deployment.Localhost())
701            .deploy(&mut deployment);
702
703        deployment.deploy().await.unwrap();
704
705        let mut external_in = nodes.connect(in_port).await;
706        let mut external_out = nodes.connect(out).await;
707
708        deployment.start().await.unwrap();
709
710        external_in.send(1).await.unwrap();
711        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
712
713        external_in.send(2).await.unwrap();
714        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
715    }
716
717    #[tokio::test]
718    async fn tick_singleton_replay_cardinality() {
719        let mut deployment = Deployment::new();
720
721        let flow = FlowBuilder::new();
722        let node = flow.process::<()>();
723        let external = flow.external::<()>();
724
725        let (in_port, input) =
726            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
727        let tick = node.tick();
728        let singleton = tick.singleton(q!(123));
729        let out = input
730            .batch(&tick, nondet!(/** test */))
731            .cross_singleton(singleton.clone())
732            .cross_singleton(singleton.into_stream().count())
733            .all_ticks()
734            .send_bincode_external(&external);
735
736        let nodes = flow
737            .with_process(&node, deployment.Localhost())
738            .with_external(&external, deployment.Localhost())
739            .deploy(&mut deployment);
740
741        deployment.deploy().await.unwrap();
742
743        let mut external_in = nodes.connect(in_port).await;
744        let mut external_out = nodes.connect(out).await;
745
746        deployment.start().await.unwrap();
747
748        external_in.send(1).await.unwrap();
749        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
750
751        external_in.send(2).await.unwrap();
752        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
753    }
754
755    #[tokio::test]
756    async fn external_bytes() {
757        let mut deployment = Deployment::new();
758
759        let flow = FlowBuilder::new();
760        let first_node = flow.process::<()>();
761        let external = flow.external::<()>();
762
763        let (in_port, input) = first_node.source_external_bytes(&external);
764        let out = input
765            .map(q!(|r| r.unwrap()))
766            .send_bincode_external(&external);
767
768        let nodes = flow
769            .with_process(&first_node, deployment.Localhost())
770            .with_external(&external, deployment.Localhost())
771            .deploy(&mut deployment);
772
773        deployment.deploy().await.unwrap();
774
775        let mut external_in = nodes.connect(in_port).await.1;
776        let mut external_out = nodes.connect(out).await;
777
778        deployment.start().await.unwrap();
779
780        external_in.send(vec![1, 2, 3].into()).await.unwrap();
781
782        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
783    }
784
785    #[tokio::test]
786    async fn multi_external_source() {
787        let mut deployment = Deployment::new();
788
789        let flow = FlowBuilder::new();
790        let first_node = flow.process::<()>();
791        let external = flow.external::<()>();
792
793        let (in_port, input, _membership, complete_sink) =
794            first_node.bidi_external_many_bincode(&external);
795        let out = input.entries().send_bincode_external(&external);
796        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
797
798        let nodes = flow
799            .with_process(&first_node, deployment.Localhost())
800            .with_external(&external, deployment.Localhost())
801            .deploy(&mut deployment);
802
803        deployment.deploy().await.unwrap();
804
805        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
806        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
807        let external_out = nodes.connect(out).await;
808
809        deployment.start().await.unwrap();
810
811        external_in_1.send(123).await.unwrap();
812        external_in_2.send(456).await.unwrap();
813
814        assert_eq!(
815            external_out.take(2).collect::<HashSet<_>>().await,
816            vec![(0, 123), (1, 456)].into_iter().collect()
817        );
818    }
819
820    #[tokio::test]
821    async fn second_connection_only_multi_source() {
822        let mut deployment = Deployment::new();
823
824        let flow = FlowBuilder::new();
825        let first_node = flow.process::<()>();
826        let external = flow.external::<()>();
827
828        let (in_port, input, _membership, complete_sink) =
829            first_node.bidi_external_many_bincode(&external);
830        let out = input.entries().send_bincode_external(&external);
831        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
832
833        let nodes = flow
834            .with_process(&first_node, deployment.Localhost())
835            .with_external(&external, deployment.Localhost())
836            .deploy(&mut deployment);
837
838        deployment.deploy().await.unwrap();
839
840        // intentionally skipped to test stream waking logic
841        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
842        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
843        let mut external_out = nodes.connect(out).await;
844
845        deployment.start().await.unwrap();
846
847        external_in_2.send(456).await.unwrap();
848
849        assert_eq!(external_out.next().await.unwrap(), (1, 456));
850    }
851
852    #[tokio::test]
853    async fn multi_external_bytes() {
854        let mut deployment = Deployment::new();
855
856        let flow = FlowBuilder::new();
857        let first_node = flow.process::<()>();
858        let external = flow.external::<()>();
859
860        let (in_port, input, _membership, complete_sink) = first_node
861            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
862        let out = input.entries().send_bincode_external(&external);
863        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
864
865        let nodes = flow
866            .with_process(&first_node, deployment.Localhost())
867            .with_external(&external, deployment.Localhost())
868            .deploy(&mut deployment);
869
870        deployment.deploy().await.unwrap();
871
872        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
873        let mut external_in_2 = nodes.connect(in_port).await.1;
874        let external_out = nodes.connect(out).await;
875
876        deployment.start().await.unwrap();
877
878        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
879        external_in_2.send(vec![4, 5].into()).await.unwrap();
880
881        assert_eq!(
882            external_out.take(2).collect::<HashSet<_>>().await,
883            vec![
884                (0, (&[1u8, 2, 3] as &[u8]).into()),
885                (1, (&[4u8, 5] as &[u8]).into())
886            ]
887            .into_iter()
888            .collect()
889        );
890    }
891
892    #[tokio::test]
893    async fn echo_external_bytes() {
894        let mut deployment = Deployment::new();
895
896        let flow = FlowBuilder::new();
897        let first_node = flow.process::<()>();
898        let external = flow.external::<()>();
899
900        let (port, input, _membership, complete_sink) = first_node
901            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
902        complete_sink
903            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
904
905        let nodes = flow
906            .with_process(&first_node, deployment.Localhost())
907            .with_external(&external, deployment.Localhost())
908            .deploy(&mut deployment);
909
910        deployment.deploy().await.unwrap();
911
912        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
913        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
914
915        deployment.start().await.unwrap();
916
917        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
918        external_in_2.send(vec![4, 5].into()).await.unwrap();
919
920        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
921        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
922    }
923
924    #[tokio::test]
925    async fn echo_external_bincode() {
926        let mut deployment = Deployment::new();
927
928        let flow = FlowBuilder::new();
929        let first_node = flow.process::<()>();
930        let external = flow.external::<()>();
931
932        let (port, input, _membership, complete_sink) =
933            first_node.bidi_external_many_bincode(&external);
934        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
935
936        let nodes = flow
937            .with_process(&first_node, deployment.Localhost())
938            .with_external(&external, deployment.Localhost())
939            .deploy(&mut deployment);
940
941        deployment.deploy().await.unwrap();
942
943        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
944        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
945
946        deployment.start().await.unwrap();
947
948        external_in_1.send("hi".to_string()).await.unwrap();
949        external_in_2.send("hello".to_string()).await.unwrap();
950
951        assert_eq!(external_out_1.next().await.unwrap(), "HI");
952        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
953    }
954}