Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56pub mod external_process;
57pub use external_process::External;
58
59pub mod process;
60pub use process::Process;
61
62pub mod cluster;
63pub use cluster::Cluster;
64
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71/// An event indicating a change in membership status of a location in a group
72/// (e.g. a node in a [`Cluster`] or an external client connection).
73#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
74pub enum MembershipEvent {
75    /// The member has joined the group and is now active.
76    Joined,
77    /// The member has left the group and is no longer active.
78    Left,
79}
80
81/// A hint for configuring the network transport used by an external connection.
82///
83/// This controls how the underlying TCP listener is set up when binding
84/// external client connections via methods like [`Location::bind_single_client`]
85/// or [`Location::bidi_external_many_bytes`].
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub enum NetworkHint {
88    /// Automatically select the network configuration (e.g. an ephemeral port).
89    Auto,
90    /// Use a TCP port, optionally specifying a fixed port number.
91    ///
92    /// If `None`, an available port will be chosen automatically.
93    /// If `Some(port)`, the given port number will be used.
94    TcpPort(Option<u16>),
95}
96
97pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
98    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
99}
100
101#[stageleft::export(LocationKey)]
102new_key_type! {
103    /// A unique identifier for a clock tick.
104    pub struct LocationKey;
105}
106
107impl std::fmt::Display for LocationKey {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
110    }
111}
112
113/// This is used for the ECS membership stream.
114/// TODO(mingwei): Make this more robust?
115impl std::str::FromStr for LocationKey {
116    type Err = Option<ParseIntError>;
117
118    fn from_str(s: &str) -> Result<Self, Self::Err> {
119        let nvn = s.strip_prefix("loc").ok_or(None)?;
120        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
121        let idx: u64 = idx.parse()?;
122        let ver: u64 = ver.parse()?;
123        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
124    }
125}
126
127impl LocationKey {
128    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
129    /// The first location key, used by the simulator as the default external location.
130    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
131
132    /// A key for testing with index 1.
133    #[cfg(test)]
134    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
135
136    /// A key for testing with index 2.
137    #[cfg(test)]
138    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
139}
140
141/// This is used within `q!` code in docker and ECS.
142impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
143    type O = LocationKey;
144
145    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
146    where
147        Self: Sized,
148    {
149        let root = get_this_crate();
150        let n = Key::data(&self).as_ffi();
151        (
152            QuoteTokens {
153                prelude: None,
154                expr: Some(quote! {
155                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
156                }),
157            },
158            (),
159        )
160    }
161}
162
163/// A simple enum for the type of a root location.
164#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
165pub enum LocationType {
166    /// A process (single node).
167    Process,
168    /// A cluster (multiple nodes).
169    Cluster,
170    /// An external client.
171    External,
172}
173
174/// A location where data can be materialized and computation can be executed.
175///
176/// Hydro is a **global**, **distributed** programming model. This means that the data
177/// and computation in a Hydro program can be spread across multiple machines, data
178/// centers, and even continents. To achieve this, Hydro uses the concept of
179/// **locations** to keep track of _where_ data is located and computation is executed.
180///
181/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
182/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
183/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
184/// to allow live collections to be _moved_ between locations via network send/receive.
185///
186/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
187#[expect(
188    private_bounds,
189    reason = "only internal Hydro code can define location types"
190)]
191pub trait Location<'a>: dynamic::DynLocation {
192    /// The root location type for this location.
193    ///
194    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
195    /// For nested locations like [`Tick`], this is the root location that contains it.
196    type Root: Location<'a>;
197
198    /// Returns the root location for this location.
199    ///
200    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
201    /// For nested locations like [`Tick`], this returns the root location that contains it.
202    fn root(&self) -> Self::Root;
203
204    /// Attempts to create a new [`Tick`] clock domain at this location.
205    ///
206    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
207    /// or `None` if this location is already inside a tick (nested ticks are not supported).
208    ///
209    /// Prefer using [`Location::tick`] when you know the location is top-level.
210    fn try_tick(&self) -> Option<Tick<Self>> {
211        if Self::is_top_level() {
212            let id = self.flow_state().borrow_mut().next_clock_id();
213            Some(Tick {
214                id,
215                l: self.clone(),
216            })
217        } else {
218            None
219        }
220    }
221
222    /// Returns the unique identifier for this location.
223    fn id(&self) -> LocationId {
224        dynamic::DynLocation::id(self)
225    }
226
227    /// Creates a new [`Tick`] clock domain at this location.
228    ///
229    /// A tick represents a logical clock that can be used to batch streaming data
230    /// into discrete time steps. This is useful for implementing iterative algorithms
231    /// or for synchronizing data across multiple streams.
232    ///
233    /// # Example
234    /// ```rust
235    /// # #[cfg(feature = "deploy")] {
236    /// # use hydro_lang::prelude::*;
237    /// # use futures::StreamExt;
238    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
239    /// let tick = process.tick();
240    /// let inside_tick = process
241    ///     .source_iter(q!(vec![1, 2, 3, 4]))
242    ///     .batch(&tick, nondet!(/** test */));
243    /// inside_tick.all_ticks()
244    /// # }, |mut stream| async move {
245    /// // 1, 2, 3, 4
246    /// # for w in vec![1, 2, 3, 4] {
247    /// #     assert_eq!(stream.next().await.unwrap(), w);
248    /// # }
249    /// # }));
250    /// # }
251    /// ```
252    fn tick(&self) -> Tick<Self>
253    where
254        Self: NoTick,
255    {
256        let id = self.flow_state().borrow_mut().next_clock_id();
257        Tick {
258            id,
259            l: self.clone(),
260        }
261    }
262
263    /// Creates an unbounded stream that continuously emits unit values `()`.
264    ///
265    /// This is useful for driving computations that need to run continuously,
266    /// such as polling or heartbeat mechanisms.
267    ///
268    /// # Example
269    /// ```rust
270    /// # #[cfg(feature = "deploy")] {
271    /// # use hydro_lang::prelude::*;
272    /// # use futures::StreamExt;
273    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
274    /// let tick = process.tick();
275    /// process.spin()
276    ///     .batch(&tick, nondet!(/** test */))
277    ///     .map(q!(|_| 42))
278    ///     .all_ticks()
279    /// # }, |mut stream| async move {
280    /// // 42, 42, 42, ...
281    /// # assert_eq!(stream.next().await.unwrap(), 42);
282    /// # assert_eq!(stream.next().await.unwrap(), 42);
283    /// # assert_eq!(stream.next().await.unwrap(), 42);
284    /// # }));
285    /// # }
286    /// ```
287    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
288    where
289        Self: Sized + NoTick,
290    {
291        Stream::new(
292            self.clone(),
293            HydroNode::Source {
294                source: HydroSource::Spin(),
295                metadata: self.new_node_metadata(Stream::<
296                    (),
297                    Self,
298                    Unbounded,
299                    TotalOrder,
300                    ExactlyOnce,
301                >::collection_kind()),
302            },
303        )
304    }
305
306    /// Creates a stream from an async [`FuturesStream`].
307    ///
308    /// This is useful for integrating with external async data sources,
309    /// such as network connections or file readers.
310    ///
311    /// # Example
312    /// ```rust
313    /// # #[cfg(feature = "deploy")] {
314    /// # use hydro_lang::prelude::*;
315    /// # use futures::StreamExt;
316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
317    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
318    /// # }, |mut stream| async move {
319    /// // 1, 2, 3
320    /// # for w in vec![1, 2, 3] {
321    /// #     assert_eq!(stream.next().await.unwrap(), w);
322    /// # }
323    /// # }));
324    /// # }
325    /// ```
326    fn source_stream<T, E>(
327        &self,
328        e: impl QuotedWithContext<'a, E, Self>,
329    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
330    where
331        E: FuturesStream<Item = T> + Unpin,
332        Self: Sized + NoTick,
333    {
334        let e = e.splice_untyped_ctx(self);
335
336        Stream::new(
337            self.clone(),
338            HydroNode::Source {
339                source: HydroSource::Stream(e.into()),
340                metadata: self.new_node_metadata(Stream::<
341                    T,
342                    Self,
343                    Unbounded,
344                    TotalOrder,
345                    ExactlyOnce,
346                >::collection_kind()),
347            },
348        )
349    }
350
351    /// Creates a bounded stream from an iterator.
352    ///
353    /// The iterator is evaluated once at runtime, and all elements are emitted
354    /// in order. This is useful for creating streams from static data or
355    /// for testing.
356    ///
357    /// # Example
358    /// ```rust
359    /// # #[cfg(feature = "deploy")] {
360    /// # use hydro_lang::prelude::*;
361    /// # use futures::StreamExt;
362    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
363    /// process.source_iter(q!(vec![1, 2, 3, 4]))
364    /// # }, |mut stream| async move {
365    /// // 1, 2, 3, 4
366    /// # for w in vec![1, 2, 3, 4] {
367    /// #     assert_eq!(stream.next().await.unwrap(), w);
368    /// # }
369    /// # }));
370    /// # }
371    /// ```
372    fn source_iter<T, E>(
373        &self,
374        e: impl QuotedWithContext<'a, E, Self>,
375    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
376    where
377        E: IntoIterator<Item = T>,
378        Self: Sized + NoTick,
379    {
380        let e = e.splice_typed_ctx(self);
381
382        Stream::new(
383            self.clone(),
384            HydroNode::Source {
385                source: HydroSource::Iter(e.into()),
386                metadata: self.new_node_metadata(
387                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
388                ),
389            },
390        )
391    }
392
393    /// Creates a stream of membership events for a cluster.
394    ///
395    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
396    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
397    /// keyed by the [`MemberId`] of the cluster member.
398    ///
399    /// This is useful for implementing protocols that need to track cluster membership,
400    /// such as broadcasting to all members or detecting failures.
401    ///
402    /// # Example
403    /// ```rust
404    /// # #[cfg(feature = "deploy")] {
405    /// # use hydro_lang::prelude::*;
406    /// # use futures::StreamExt;
407    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
408    /// let p1 = flow.process::<()>();
409    /// let workers: Cluster<()> = flow.cluster::<()>();
410    /// # // do nothing on each worker
411    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
412    /// let cluster_members = p1.source_cluster_members(&workers);
413    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
414    /// // if there are 4 members in the cluster, we would see a join event for each
415    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
416    /// # }, |mut stream| async move {
417    /// # let mut results = Vec::new();
418    /// # for w in 0..4 {
419    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
420    /// # }
421    /// # results.sort();
422    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
423    /// # }));
424    /// # }
425    /// ```
426    fn source_cluster_members<C: 'a>(
427        &self,
428        cluster: &Cluster<'a, C>,
429    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
430    where
431        Self: Sized + NoTick,
432    {
433        Stream::new(
434            self.clone(),
435            HydroNode::Source {
436                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
437                metadata: self.new_node_metadata(Stream::<
438                    (TaglessMemberId, MembershipEvent),
439                    Self,
440                    Unbounded,
441                    TotalOrder,
442                    ExactlyOnce,
443                >::collection_kind()),
444            },
445        )
446        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
447        .into_keyed()
448    }
449
450    /// Creates a one-way connection from an external process to receive raw bytes.
451    ///
452    /// Returns a port handle for the external process to connect to, and a stream
453    /// of received byte buffers.
454    ///
455    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
456    /// or [`Location::source_external_bincode`].
457    fn source_external_bytes<L>(
458        &self,
459        from: &External<L>,
460    ) -> (
461        ExternalBytesPort,
462        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
463    )
464    where
465        Self: Sized + NoTick,
466    {
467        let (port, stream, sink) =
468            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
469
470        sink.complete(self.source_iter(q!([])));
471
472        (port, stream)
473    }
474
475    /// Creates a one-way connection from an external process to receive bincode-serialized data.
476    ///
477    /// Returns a sink handle for the external process to send data to, and a stream
478    /// of received values.
479    ///
480    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
481    #[expect(clippy::type_complexity, reason = "stream markers")]
482    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
483        &self,
484        from: &External<L>,
485    ) -> (
486        ExternalBincodeSink<T, NotMany, O, R>,
487        Stream<T, Self, Unbounded, O, R>,
488    )
489    where
490        Self: Sized + NoTick,
491        T: Serialize + DeserializeOwned,
492    {
493        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
494        sink.complete(self.source_iter(q!([])));
495
496        (
497            ExternalBincodeSink {
498                process_key: from.key,
499                port_id: port.port_id,
500                _phantom: PhantomData,
501            },
502            stream.weaken_ordering().weaken_retries(),
503        )
504    }
505
506    /// Sets up a simulated input port on this location for testing.
507    ///
508    /// Returns a handle to send messages to the location as well as a stream
509    /// of received messages. This is only available when the `sim` feature is enabled.
510    #[cfg(feature = "sim")]
511    #[expect(clippy::type_complexity, reason = "stream markers")]
512    fn sim_input<T, O: Ordering, R: Retries>(
513        &self,
514    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
515    where
516        Self: Sized + NoTick,
517        T: Serialize + DeserializeOwned,
518    {
519        let external_location: External<'a, ()> = External {
520            key: LocationKey::FIRST,
521            flow_state: self.flow_state().clone(),
522            _phantom: PhantomData,
523        };
524
525        let (external, stream) = self.source_external_bincode(&external_location);
526
527        (SimSender(external.port_id, PhantomData), stream)
528    }
529
530    /// Creates an external input stream for embedded deployment mode.
531    ///
532    /// The `name` parameter specifies the name of the generated function parameter
533    /// that will supply data to this stream at runtime. The generated function will
534    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
535    fn embedded_input<T>(
536        &self,
537        name: impl Into<String>,
538    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
539    where
540        Self: Sized + NoTick,
541    {
542        let ident = syn::Ident::new(&name.into(), Span::call_site());
543
544        Stream::new(
545            self.clone(),
546            HydroNode::Source {
547                source: HydroSource::Embedded(ident),
548                metadata: self.new_node_metadata(Stream::<
549                    T,
550                    Self,
551                    Unbounded,
552                    TotalOrder,
553                    ExactlyOnce,
554                >::collection_kind()),
555            },
556        )
557    }
558
559    /// Establishes a server on this location to receive a bidirectional connection from a single
560    /// client, identified by the given `External` handle. Returns a port handle for the external
561    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
562    /// messages.
563    ///
564    /// # Example
565    /// ```rust
566    /// # #[cfg(feature = "deploy")] {
567    /// # use hydro_lang::prelude::*;
568    /// # use hydro_deploy::Deployment;
569    /// # use futures::{SinkExt, StreamExt};
570    /// # tokio_test::block_on(async {
571    /// # use bytes::Bytes;
572    /// # use hydro_lang::location::NetworkHint;
573    /// # use tokio_util::codec::LengthDelimitedCodec;
574    /// # let mut flow = FlowBuilder::new();
575    /// let node = flow.process::<()>();
576    /// let external = flow.external::<()>();
577    /// let (port, incoming, outgoing) =
578    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
579    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
580    ///     let mut resp: Vec<u8> = data.into();
581    ///     resp.push(42);
582    ///     resp.into() // : Bytes
583    /// })));
584    ///
585    /// # let mut deployment = Deployment::new();
586    /// let nodes = flow // ... with_process and with_external
587    /// #     .with_process(&node, deployment.Localhost())
588    /// #     .with_external(&external, deployment.Localhost())
589    /// #     .deploy(&mut deployment);
590    ///
591    /// deployment.deploy().await.unwrap();
592    /// deployment.start().await.unwrap();
593    ///
594    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
595    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
596    /// assert_eq!(
597    ///     external_out.next().await.unwrap().unwrap(),
598    ///     vec![1, 2, 3, 42]
599    /// );
600    /// # });
601    /// # }
602    /// ```
603    #[expect(clippy::type_complexity, reason = "stream markers")]
604    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
605        &self,
606        from: &External<L>,
607        port_hint: NetworkHint,
608    ) -> (
609        ExternalBytesPort<NotMany>,
610        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
611        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
612    )
613    where
614        Self: Sized + NoTick,
615    {
616        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
617
618        let (fwd_ref, to_sink) =
619            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
620        let mut flow_state_borrow = self.flow_state().borrow_mut();
621
622        flow_state_borrow.push_root(HydroRoot::SendExternal {
623            to_external_key: from.key,
624            to_port_id: next_external_port_id,
625            to_many: false,
626            unpaired: false,
627            serialize_fn: None,
628            instantiate_fn: DebugInstantiate::Building,
629            input: Box::new(to_sink.ir_node.into_inner()),
630            op_metadata: HydroIrOpMetadata::new(),
631        });
632
633        let raw_stream: Stream<
634            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
635            Self,
636            Unbounded,
637            TotalOrder,
638            ExactlyOnce,
639        > = Stream::new(
640            self.clone(),
641            HydroNode::ExternalInput {
642                from_external_key: from.key,
643                from_port_id: next_external_port_id,
644                from_many: false,
645                codec_type: quote_type::<Codec>().into(),
646                port_hint,
647                instantiate_fn: DebugInstantiate::Building,
648                deserialize_fn: None,
649                metadata: self.new_node_metadata(Stream::<
650                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
651                    Self,
652                    Unbounded,
653                    TotalOrder,
654                    ExactlyOnce,
655                >::collection_kind()),
656            },
657        );
658
659        (
660            ExternalBytesPort {
661                process_key: from.key,
662                port_id: next_external_port_id,
663                _phantom: PhantomData,
664            },
665            raw_stream.flatten_ordered(),
666            fwd_ref,
667        )
668    }
669
670    /// Establishes a bidirectional connection from a single external client using bincode serialization.
671    ///
672    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
673    /// and a handle to send outgoing messages. This is a convenience wrapper around
674    /// [`Location::bind_single_client`] that uses bincode for serialization.
675    ///
676    /// # Type Parameters
677    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
678    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
679    #[expect(clippy::type_complexity, reason = "stream markers")]
680    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
681        &self,
682        from: &External<L>,
683    ) -> (
684        ExternalBincodeBidi<InT, OutT, NotMany>,
685        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
686        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
687    )
688    where
689        Self: Sized + NoTick,
690    {
691        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
692
693        let (fwd_ref, to_sink) =
694            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
695        let mut flow_state_borrow = self.flow_state().borrow_mut();
696
697        let root = get_this_crate();
698
699        let out_t_type = quote_type::<OutT>();
700        let ser_fn: syn::Expr = syn::parse_quote! {
701            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
702                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
703            )
704        };
705
706        flow_state_borrow.push_root(HydroRoot::SendExternal {
707            to_external_key: from.key,
708            to_port_id: next_external_port_id,
709            to_many: false,
710            unpaired: false,
711            serialize_fn: Some(ser_fn.into()),
712            instantiate_fn: DebugInstantiate::Building,
713            input: Box::new(to_sink.ir_node.into_inner()),
714            op_metadata: HydroIrOpMetadata::new(),
715        });
716
717        let in_t_type = quote_type::<InT>();
718
719        let deser_fn: syn::Expr = syn::parse_quote! {
720            |res| {
721                let b = res.unwrap();
722                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
723            }
724        };
725
726        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
727            self.clone(),
728            HydroNode::ExternalInput {
729                from_external_key: from.key,
730                from_port_id: next_external_port_id,
731                from_many: false,
732                codec_type: quote_type::<LengthDelimitedCodec>().into(),
733                port_hint: NetworkHint::Auto,
734                instantiate_fn: DebugInstantiate::Building,
735                deserialize_fn: Some(deser_fn.into()),
736                metadata: self.new_node_metadata(Stream::<
737                    InT,
738                    Self,
739                    Unbounded,
740                    TotalOrder,
741                    ExactlyOnce,
742                >::collection_kind()),
743            },
744        );
745
746        (
747            ExternalBincodeBidi {
748                process_key: from.key,
749                port_id: next_external_port_id,
750                _phantom: PhantomData,
751            },
752            raw_stream,
753            fwd_ref,
754        )
755    }
756
757    /// Establishes a server on this location to receive bidirectional connections from multiple
758    /// external clients using raw bytes.
759    ///
760    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
761    /// connections. Each client is assigned a unique `u64` identifier.
762    ///
763    /// Returns:
764    /// - A port handle for external processes to connect to
765    /// - A keyed stream of incoming messages, keyed by client ID
766    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
767    /// - A handle to send outgoing messages, keyed by client ID
768    #[expect(clippy::type_complexity, reason = "stream markers")]
769    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
770        &self,
771        from: &External<L>,
772        port_hint: NetworkHint,
773    ) -> (
774        ExternalBytesPort<Many>,
775        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
776        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
777        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
778    )
779    where
780        Self: Sized + NoTick,
781    {
782        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
783
784        let (fwd_ref, to_sink) =
785            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
786        let mut flow_state_borrow = self.flow_state().borrow_mut();
787
788        flow_state_borrow.push_root(HydroRoot::SendExternal {
789            to_external_key: from.key,
790            to_port_id: next_external_port_id,
791            to_many: true,
792            unpaired: false,
793            serialize_fn: None,
794            instantiate_fn: DebugInstantiate::Building,
795            input: Box::new(to_sink.entries().ir_node.into_inner()),
796            op_metadata: HydroIrOpMetadata::new(),
797        });
798
799        let raw_stream: Stream<
800            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
801            Self,
802            Unbounded,
803            TotalOrder,
804            ExactlyOnce,
805        > = Stream::new(
806            self.clone(),
807            HydroNode::ExternalInput {
808                from_external_key: from.key,
809                from_port_id: next_external_port_id,
810                from_many: true,
811                codec_type: quote_type::<Codec>().into(),
812                port_hint,
813                instantiate_fn: DebugInstantiate::Building,
814                deserialize_fn: None,
815                metadata: self.new_node_metadata(Stream::<
816                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
817                    Self,
818                    Unbounded,
819                    TotalOrder,
820                    ExactlyOnce,
821                >::collection_kind()),
822            },
823        );
824
825        let membership_stream_ident = syn::Ident::new(
826            &format!(
827                "__hydro_deploy_many_{}_{}_membership",
828                from.key, next_external_port_id
829            ),
830            Span::call_site(),
831        );
832        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
833        let raw_membership_stream: KeyedStream<
834            u64,
835            bool,
836            Self,
837            Unbounded,
838            TotalOrder,
839            ExactlyOnce,
840        > = KeyedStream::new(
841            self.clone(),
842            HydroNode::Source {
843                source: HydroSource::Stream(membership_stream_expr.into()),
844                metadata: self.new_node_metadata(KeyedStream::<
845                    u64,
846                    bool,
847                    Self,
848                    Unbounded,
849                    TotalOrder,
850                    ExactlyOnce,
851                >::collection_kind()),
852            },
853        );
854
855        (
856            ExternalBytesPort {
857                process_key: from.key,
858                port_id: next_external_port_id,
859                _phantom: PhantomData,
860            },
861            raw_stream
862                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
863                .into_keyed(),
864            raw_membership_stream.map(q!(|join| {
865                if join {
866                    MembershipEvent::Joined
867                } else {
868                    MembershipEvent::Left
869                }
870            })),
871            fwd_ref,
872        )
873    }
874
875    /// Establishes a server on this location to receive bidirectional connections from multiple
876    /// external clients using bincode serialization.
877    ///
878    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
879    /// client connections. Each client is assigned a unique `u64` identifier.
880    ///
881    /// Returns:
882    /// - A port handle for external processes to connect to
883    /// - A keyed stream of incoming messages, keyed by client ID
884    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
885    /// - A handle to send outgoing messages, keyed by client ID
886    ///
887    /// # Type Parameters
888    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
889    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
890    #[expect(clippy::type_complexity, reason = "stream markers")]
891    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
892        &self,
893        from: &External<L>,
894    ) -> (
895        ExternalBincodeBidi<InT, OutT, Many>,
896        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
897        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
898        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
899    )
900    where
901        Self: Sized + NoTick,
902    {
903        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
904
905        let (fwd_ref, to_sink) =
906            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
907        let mut flow_state_borrow = self.flow_state().borrow_mut();
908
909        let root = get_this_crate();
910
911        let out_t_type = quote_type::<OutT>();
912        let ser_fn: syn::Expr = syn::parse_quote! {
913            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
914                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
915            )
916        };
917
918        flow_state_borrow.push_root(HydroRoot::SendExternal {
919            to_external_key: from.key,
920            to_port_id: next_external_port_id,
921            to_many: true,
922            unpaired: false,
923            serialize_fn: Some(ser_fn.into()),
924            instantiate_fn: DebugInstantiate::Building,
925            input: Box::new(to_sink.entries().ir_node.into_inner()),
926            op_metadata: HydroIrOpMetadata::new(),
927        });
928
929        let in_t_type = quote_type::<InT>();
930
931        let deser_fn: syn::Expr = syn::parse_quote! {
932            |res| {
933                let (id, b) = res.unwrap();
934                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
935            }
936        };
937
938        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
939            KeyedStream::new(
940                self.clone(),
941                HydroNode::ExternalInput {
942                    from_external_key: from.key,
943                    from_port_id: next_external_port_id,
944                    from_many: true,
945                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
946                    port_hint: NetworkHint::Auto,
947                    instantiate_fn: DebugInstantiate::Building,
948                    deserialize_fn: Some(deser_fn.into()),
949                    metadata: self.new_node_metadata(KeyedStream::<
950                        u64,
951                        InT,
952                        Self,
953                        Unbounded,
954                        TotalOrder,
955                        ExactlyOnce,
956                    >::collection_kind()),
957                },
958            );
959
960        let membership_stream_ident = syn::Ident::new(
961            &format!(
962                "__hydro_deploy_many_{}_{}_membership",
963                from.key, next_external_port_id
964            ),
965            Span::call_site(),
966        );
967        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
968        let raw_membership_stream: KeyedStream<
969            u64,
970            bool,
971            Self,
972            Unbounded,
973            TotalOrder,
974            ExactlyOnce,
975        > = KeyedStream::new(
976            self.clone(),
977            HydroNode::Source {
978                source: HydroSource::Stream(membership_stream_expr.into()),
979                metadata: self.new_node_metadata(KeyedStream::<
980                    u64,
981                    bool,
982                    Self,
983                    Unbounded,
984                    TotalOrder,
985                    ExactlyOnce,
986                >::collection_kind()),
987            },
988        );
989
990        (
991            ExternalBincodeBidi {
992                process_key: from.key,
993                port_id: next_external_port_id,
994                _phantom: PhantomData,
995            },
996            raw_stream,
997            raw_membership_stream.map(q!(|join| {
998                if join {
999                    MembershipEvent::Joined
1000                } else {
1001                    MembershipEvent::Left
1002                }
1003            })),
1004            fwd_ref,
1005        )
1006    }
1007
1008    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1009    ///
1010    /// # Example
1011    /// ```rust
1012    /// # #[cfg(feature = "deploy")] {
1013    /// # use hydro_lang::prelude::*;
1014    /// # use futures::StreamExt;
1015    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1016    /// let tick = process.tick();
1017    /// let singleton = tick.singleton(q!(5));
1018    /// # singleton.all_ticks()
1019    /// # }, |mut stream| async move {
1020    /// // 5
1021    /// # assert_eq!(stream.next().await.unwrap(), 5);
1022    /// # }));
1023    /// # }
1024    /// ```
1025    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1026    where
1027        T: Clone,
1028        Self: Sized,
1029    {
1030        let e = e.splice_untyped_ctx(self);
1031
1032        Singleton::new(
1033            self.clone(),
1034            HydroNode::SingletonSource {
1035                value: e.into(),
1036                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1037            },
1038        )
1039    }
1040
1041    /// Generates a stream with values emitted at a fixed interval, with
1042    /// each value being the current time (as an [`tokio::time::Instant`]).
1043    ///
1044    /// The clock source used is monotonic, so elements will be emitted in
1045    /// increasing order.
1046    ///
1047    /// # Non-Determinism
1048    /// Because this stream is generated by an OS timer, it will be
1049    /// non-deterministic because each timestamp will be arbitrary.
1050    fn source_interval(
1051        &self,
1052        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1053        _nondet: NonDet,
1054    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1055    where
1056        Self: Sized + NoTick,
1057    {
1058        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1059            tokio::time::interval(interval)
1060        )))
1061    }
1062
1063    /// Generates a stream with values emitted at a fixed interval (with an
1064    /// initial delay), with each value being the current time
1065    /// (as an [`tokio::time::Instant`]).
1066    ///
1067    /// The clock source used is monotonic, so elements will be emitted in
1068    /// increasing order.
1069    ///
1070    /// # Non-Determinism
1071    /// Because this stream is generated by an OS timer, it will be
1072    /// non-deterministic because each timestamp will be arbitrary.
1073    fn source_interval_delayed(
1074        &self,
1075        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1076        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1077        _nondet: NonDet,
1078    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1079    where
1080        Self: Sized + NoTick,
1081    {
1082        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1083            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1084        )))
1085    }
1086
1087    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1088    ///
1089    /// Returns a handle that must be completed with the actual stream, and a placeholder
1090    /// stream that can be used in the dataflow graph before the actual stream is defined.
1091    ///
1092    /// This is useful for implementing feedback loops or recursive computations where
1093    /// a stream depends on its own output.
1094    ///
1095    /// # Example
1096    /// ```rust
1097    /// # #[cfg(feature = "deploy")] {
1098    /// # use hydro_lang::prelude::*;
1099    /// # use hydro_lang::live_collections::stream::NoOrder;
1100    /// # use futures::StreamExt;
1101    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1102    /// // Create a forward reference for the feedback stream
1103    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1104    ///
1105    /// // Combine initial input with feedback, then increment
1106    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1107    /// let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));
1108    ///
1109    /// // Complete the forward reference with the output
1110    /// complete.complete(output.clone());
1111    /// output
1112    /// # }, |mut stream| async move {
1113    /// // 2, 3, 4, 5, ...
1114    /// # assert_eq!(stream.next().await.unwrap(), 2);
1115    /// # assert_eq!(stream.next().await.unwrap(), 3);
1116    /// # assert_eq!(stream.next().await.unwrap(), 4);
1117    /// # }));
1118    /// # }
1119    /// ```
1120    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1121    where
1122        S: CycleCollection<'a, ForwardRef, Location = Self>,
1123    {
1124        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1125        (
1126            ForwardHandle::new(cycle_id, Location::id(self)),
1127            S::create_source(cycle_id, self.clone()),
1128        )
1129    }
1130}
1131
1132#[cfg(feature = "deploy")]
1133#[cfg(test)]
1134mod tests {
1135    use std::collections::HashSet;
1136
1137    use futures::{SinkExt, StreamExt};
1138    use hydro_deploy::Deployment;
1139    use stageleft::q;
1140    use tokio_util::codec::LengthDelimitedCodec;
1141
1142    use crate::compile::builder::FlowBuilder;
1143    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1144    use crate::location::{Location, NetworkHint};
1145    use crate::nondet::nondet;
1146
1147    #[tokio::test]
1148    async fn top_level_singleton_replay_cardinality() {
1149        let mut deployment = Deployment::new();
1150
1151        let mut flow = FlowBuilder::new();
1152        let node = flow.process::<()>();
1153        let external = flow.external::<()>();
1154
1155        let (in_port, input) =
1156            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1157        let singleton = node.singleton(q!(123));
1158        let tick = node.tick();
1159        let out = input
1160            .batch(&tick, nondet!(/** test */))
1161            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1162            .cross_singleton(
1163                singleton
1164                    .snapshot(&tick, nondet!(/** test */))
1165                    .into_stream()
1166                    .count(),
1167            )
1168            .all_ticks()
1169            .send_bincode_external(&external);
1170
1171        let nodes = flow
1172            .with_process(&node, deployment.Localhost())
1173            .with_external(&external, deployment.Localhost())
1174            .deploy(&mut deployment);
1175
1176        deployment.deploy().await.unwrap();
1177
1178        let mut external_in = nodes.connect(in_port).await;
1179        let mut external_out = nodes.connect(out).await;
1180
1181        deployment.start().await.unwrap();
1182
1183        external_in.send(1).await.unwrap();
1184        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1185
1186        external_in.send(2).await.unwrap();
1187        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1188    }
1189
1190    #[tokio::test]
1191    async fn tick_singleton_replay_cardinality() {
1192        let mut deployment = Deployment::new();
1193
1194        let mut flow = FlowBuilder::new();
1195        let node = flow.process::<()>();
1196        let external = flow.external::<()>();
1197
1198        let (in_port, input) =
1199            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1200        let tick = node.tick();
1201        let singleton = tick.singleton(q!(123));
1202        let out = input
1203            .batch(&tick, nondet!(/** test */))
1204            .cross_singleton(singleton.clone())
1205            .cross_singleton(singleton.into_stream().count())
1206            .all_ticks()
1207            .send_bincode_external(&external);
1208
1209        let nodes = flow
1210            .with_process(&node, deployment.Localhost())
1211            .with_external(&external, deployment.Localhost())
1212            .deploy(&mut deployment);
1213
1214        deployment.deploy().await.unwrap();
1215
1216        let mut external_in = nodes.connect(in_port).await;
1217        let mut external_out = nodes.connect(out).await;
1218
1219        deployment.start().await.unwrap();
1220
1221        external_in.send(1).await.unwrap();
1222        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1223
1224        external_in.send(2).await.unwrap();
1225        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1226    }
1227
1228    #[tokio::test]
1229    async fn external_bytes() {
1230        let mut deployment = Deployment::new();
1231
1232        let mut flow = FlowBuilder::new();
1233        let first_node = flow.process::<()>();
1234        let external = flow.external::<()>();
1235
1236        let (in_port, input) = first_node.source_external_bytes(&external);
1237        let out = input.send_bincode_external(&external);
1238
1239        let nodes = flow
1240            .with_process(&first_node, deployment.Localhost())
1241            .with_external(&external, deployment.Localhost())
1242            .deploy(&mut deployment);
1243
1244        deployment.deploy().await.unwrap();
1245
1246        let mut external_in = nodes.connect(in_port).await.1;
1247        let mut external_out = nodes.connect(out).await;
1248
1249        deployment.start().await.unwrap();
1250
1251        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1252
1253        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1254    }
1255
1256    #[tokio::test]
1257    async fn multi_external_source() {
1258        let mut deployment = Deployment::new();
1259
1260        let mut flow = FlowBuilder::new();
1261        let first_node = flow.process::<()>();
1262        let external = flow.external::<()>();
1263
1264        let (in_port, input, _membership, complete_sink) =
1265            first_node.bidi_external_many_bincode(&external);
1266        let out = input.entries().send_bincode_external(&external);
1267        complete_sink.complete(
1268            first_node
1269                .source_iter::<(u64, ()), _>(q!([]))
1270                .into_keyed()
1271                .weaken_ordering(),
1272        );
1273
1274        let nodes = flow
1275            .with_process(&first_node, deployment.Localhost())
1276            .with_external(&external, deployment.Localhost())
1277            .deploy(&mut deployment);
1278
1279        deployment.deploy().await.unwrap();
1280
1281        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1282        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1283        let external_out = nodes.connect(out).await;
1284
1285        deployment.start().await.unwrap();
1286
1287        external_in_1.send(123).await.unwrap();
1288        external_in_2.send(456).await.unwrap();
1289
1290        assert_eq!(
1291            external_out.take(2).collect::<HashSet<_>>().await,
1292            vec![(0, 123), (1, 456)].into_iter().collect()
1293        );
1294    }
1295
1296    #[tokio::test]
1297    async fn second_connection_only_multi_source() {
1298        let mut deployment = Deployment::new();
1299
1300        let mut flow = FlowBuilder::new();
1301        let first_node = flow.process::<()>();
1302        let external = flow.external::<()>();
1303
1304        let (in_port, input, _membership, complete_sink) =
1305            first_node.bidi_external_many_bincode(&external);
1306        let out = input.entries().send_bincode_external(&external);
1307        complete_sink.complete(
1308            first_node
1309                .source_iter::<(u64, ()), _>(q!([]))
1310                .into_keyed()
1311                .weaken_ordering(),
1312        );
1313
1314        let nodes = flow
1315            .with_process(&first_node, deployment.Localhost())
1316            .with_external(&external, deployment.Localhost())
1317            .deploy(&mut deployment);
1318
1319        deployment.deploy().await.unwrap();
1320
1321        // intentionally skipped to test stream waking logic
1322        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1323        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1324        let mut external_out = nodes.connect(out).await;
1325
1326        deployment.start().await.unwrap();
1327
1328        external_in_2.send(456).await.unwrap();
1329
1330        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1331    }
1332
1333    #[tokio::test]
1334    async fn multi_external_bytes() {
1335        let mut deployment = Deployment::new();
1336
1337        let mut flow = FlowBuilder::new();
1338        let first_node = flow.process::<()>();
1339        let external = flow.external::<()>();
1340
1341        let (in_port, input, _membership, complete_sink) = first_node
1342            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1343        let out = input.entries().send_bincode_external(&external);
1344        complete_sink.complete(
1345            first_node
1346                .source_iter(q!([]))
1347                .into_keyed()
1348                .weaken_ordering(),
1349        );
1350
1351        let nodes = flow
1352            .with_process(&first_node, deployment.Localhost())
1353            .with_external(&external, deployment.Localhost())
1354            .deploy(&mut deployment);
1355
1356        deployment.deploy().await.unwrap();
1357
1358        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1359        let mut external_in_2 = nodes.connect(in_port).await.1;
1360        let external_out = nodes.connect(out).await;
1361
1362        deployment.start().await.unwrap();
1363
1364        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1365        external_in_2.send(vec![4, 5].into()).await.unwrap();
1366
1367        assert_eq!(
1368            external_out.take(2).collect::<HashSet<_>>().await,
1369            vec![
1370                (0, (&[1u8, 2, 3] as &[u8]).into()),
1371                (1, (&[4u8, 5] as &[u8]).into())
1372            ]
1373            .into_iter()
1374            .collect()
1375        );
1376    }
1377
1378    #[tokio::test]
1379    async fn single_client_external_bytes() {
1380        let mut deployment = Deployment::new();
1381        let mut flow = FlowBuilder::new();
1382        let first_node = flow.process::<()>();
1383        let external = flow.external::<()>();
1384        let (port, input, complete_sink) = first_node
1385            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1386        complete_sink.complete(input.map(q!(|data| {
1387            let mut resp: Vec<u8> = data.into();
1388            resp.push(42);
1389            resp.into() // : Bytes
1390        })));
1391
1392        let nodes = flow
1393            .with_process(&first_node, deployment.Localhost())
1394            .with_external(&external, deployment.Localhost())
1395            .deploy(&mut deployment);
1396
1397        deployment.deploy().await.unwrap();
1398        deployment.start().await.unwrap();
1399
1400        let (mut external_out, mut external_in) = nodes.connect(port).await;
1401
1402        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1403        assert_eq!(
1404            external_out.next().await.unwrap().unwrap(),
1405            vec![1, 2, 3, 42]
1406        );
1407    }
1408
1409    #[tokio::test]
1410    async fn echo_external_bytes() {
1411        let mut deployment = Deployment::new();
1412
1413        let mut flow = FlowBuilder::new();
1414        let first_node = flow.process::<()>();
1415        let external = flow.external::<()>();
1416
1417        let (port, input, _membership, complete_sink) = first_node
1418            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1419        complete_sink
1420            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1421
1422        let nodes = flow
1423            .with_process(&first_node, deployment.Localhost())
1424            .with_external(&external, deployment.Localhost())
1425            .deploy(&mut deployment);
1426
1427        deployment.deploy().await.unwrap();
1428
1429        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1430        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1431
1432        deployment.start().await.unwrap();
1433
1434        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1435        external_in_2.send(vec![4, 5].into()).await.unwrap();
1436
1437        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1438        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1439    }
1440
1441    #[tokio::test]
1442    async fn echo_external_bincode() {
1443        let mut deployment = Deployment::new();
1444
1445        let mut flow = FlowBuilder::new();
1446        let first_node = flow.process::<()>();
1447        let external = flow.external::<()>();
1448
1449        let (port, input, _membership, complete_sink) =
1450            first_node.bidi_external_many_bincode(&external);
1451        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1452
1453        let nodes = flow
1454            .with_process(&first_node, deployment.Localhost())
1455            .with_external(&external, deployment.Localhost())
1456            .deploy(&mut deployment);
1457
1458        deployment.deploy().await.unwrap();
1459
1460        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1461        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1462
1463        deployment.start().await.unwrap();
1464
1465        external_in_1.send("hi".to_owned()).await.unwrap();
1466        external_in_2.send("hello".to_owned()).await.unwrap();
1467
1468        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1469        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1470    }
1471}