Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53use crate::properties::manual_proof;
54#[cfg(feature = "sim")]
55use crate::sim::SimSender;
56use crate::staging_util::get_this_crate;
57
58pub mod dynamic;
59
60pub mod external_process;
61pub use external_process::External;
62
63pub mod process;
64pub use process::Process;
65
66pub mod cluster;
67pub use cluster::Cluster;
68
69pub mod member_id;
70pub use member_id::{MemberId, TaglessMemberId};
71
72pub mod tick;
73pub use tick::{Atomic, Tick};
74
75/// An event indicating a change in membership status of a location in a group
76/// (e.g. a node in a [`Cluster`] or an external client connection).
77#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
78pub enum MembershipEvent {
79    /// The member has joined the group and is now active.
80    Joined,
81    /// The member has left the group and is no longer active.
82    Left,
83}
84
85/// A hint for configuring the network transport used by an external connection.
86///
87/// This controls how the underlying TCP listener is set up when binding
88/// external client connections via methods like [`Location::bind_single_client`]
89/// or [`Location::bidi_external_many_bytes`].
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub enum NetworkHint {
92    /// Automatically select the network configuration (e.g. an ephemeral port).
93    Auto,
94    /// Use a TCP port, optionally specifying a fixed port number.
95    ///
96    /// If `None`, an available port will be chosen automatically.
97    /// If `Some(port)`, the given port number will be used.
98    TcpPort(Option<u16>),
99}
100
101pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
102    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
103}
104
105#[stageleft::export(LocationKey)]
106new_key_type! {
107    /// A unique identifier for a clock tick.
108    pub struct LocationKey;
109}
110
111impl std::fmt::Display for LocationKey {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
114    }
115}
116
117/// This is used for the ECS membership stream.
118/// TODO(mingwei): Make this more robust?
119impl std::str::FromStr for LocationKey {
120    type Err = Option<ParseIntError>;
121
122    fn from_str(s: &str) -> Result<Self, Self::Err> {
123        let nvn = s.strip_prefix("loc").ok_or(None)?;
124        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
125        let idx: u64 = idx.parse()?;
126        let ver: u64 = ver.parse()?;
127        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
128    }
129}
130
131impl LocationKey {
132    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
133    /// The first location key, used by the simulator as the default external location.
134    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
135
136    /// A key for testing with index 1.
137    #[cfg(test)]
138    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); // `1v255`
139
140    /// A key for testing with index 2.
141    #[cfg(test)]
142    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); // `2v255`
143}
144
145/// This is used within `q!` code in docker and ECS.
146impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
147    type O = LocationKey;
148
149    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
150    where
151        Self: Sized,
152    {
153        let root = get_this_crate();
154        let n = Key::data(&self).as_ffi();
155        (
156            QuoteTokens {
157                prelude: None,
158                expr: Some(quote! {
159                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
160                }),
161            },
162            (),
163        )
164    }
165}
166
167/// A simple enum for the type of a root location.
168#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
169pub enum LocationType {
170    /// A process (single node).
171    Process,
172    /// A cluster (multiple nodes).
173    Cluster,
174    /// An external client.
175    External,
176}
177
178/// A top-level location (i.e. a [`Process`] or [`Cluster`]) that is outside a tick / atomic region.
179pub trait TopLevel<'a>: Location<'a> {}
180
181/// A location where data can be materialized and computation can be executed.
182///
183/// Hydro is a **global**, **distributed** programming model. This means that the data
184/// and computation in a Hydro program can be spread across multiple machines, data
185/// centers, and even continents. To achieve this, Hydro uses the concept of
186/// **locations** to keep track of _where_ data is located and computation is executed.
187///
188/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
189/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
190/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
191/// to allow live collections to be _moved_ between locations via network send/receive.
192///
193/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
194#[expect(
195    private_bounds,
196    reason = "only internal Hydro code can define location types"
197)]
198pub trait Location<'a>: DynLocation {
199    /// The root location type for this location.
200    ///
201    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
202    /// For nested locations like [`Tick`], this is the root location that contains it.
203    type Root: Location<'a>;
204
205    /// Location type with consistency guarantees dropped for the live collection on it.
206    type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
207
208    /// Returns the root location for this location.
209    ///
210    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
211    /// For nested locations like [`Tick`], this returns the root location that contains it.
212    fn root(&self) -> Self::Root;
213
214    /// This location but with consistency guarantees dropped for the live collection
215    fn drop_consistency(&self) -> Self::DropConsistency;
216    /// Gets the runtime enum variant for the current consistency level, if this is a cluster.
217    fn consistency() -> Option<ClusterConsistency>;
218
219    /// Updates the consistency guarantees to match that of the given location.
220    fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
221        L2::from_drop_consistency(self.drop_consistency())
222    }
223
224    #[doc(hidden)]
225    fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
226
227    /// Attempts to create a new [`Tick`] clock domain at this location.
228    ///
229    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
230    /// or `None` if this location is already inside a tick (nested ticks are not supported).
231    ///
232    /// Prefer using [`Location::tick`] when you know the location is top-level.
233    fn try_tick(&self) -> Option<Tick<Self>> {
234        if Self::is_top_level() {
235            let id = self.flow_state().borrow_mut().next_clock_id();
236            Some(Tick {
237                id,
238                l: self.clone(),
239            })
240        } else {
241            None
242        }
243    }
244
245    /// Returns the unique identifier for this location.
246    fn id(&self) -> LocationId {
247        DynLocation::dyn_id(self)
248    }
249
250    /// Creates a new [`Tick`] clock domain at this location.
251    ///
252    /// A tick represents a logical clock that can be used to batch streaming data
253    /// into discrete time steps. This is useful for implementing iterative algorithms
254    /// or for synchronizing data across multiple streams.
255    ///
256    /// # Example
257    /// ```rust
258    /// # #[cfg(feature = "deploy")] {
259    /// # use hydro_lang::prelude::*;
260    /// # use futures::StreamExt;
261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
262    /// let tick = process.tick();
263    /// let inside_tick = process
264    ///     .source_iter(q!(vec![1, 2, 3, 4]))
265    ///     .batch(&tick, nondet!(/** test */));
266    /// inside_tick.all_ticks()
267    /// # }, |mut stream| async move {
268    /// // 1, 2, 3, 4
269    /// # for w in vec![1, 2, 3, 4] {
270    /// #     assert_eq!(stream.next().await.unwrap(), w);
271    /// # }
272    /// # }));
273    /// # }
274    /// ```
275    fn tick(&self) -> Tick<Self> {
276        if let LocationId::Tick(_, _) = self.id() {
277            panic!("cannot create nested ticks");
278        }
279
280        let id = self.flow_state().borrow_mut().next_clock_id();
281        Tick {
282            id,
283            l: self.clone(),
284        }
285    }
286
287    /// Creates an unbounded stream that continuously emits unit values `()`.
288    ///
289    /// This is useful for driving computations that need to run continuously,
290    /// such as polling or heartbeat mechanisms.
291    ///
292    /// # Example
293    /// ```rust
294    /// # #[cfg(feature = "deploy")] {
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298    /// let tick = process.tick();
299    /// process.spin()
300    ///     .batch(&tick, nondet!(/** test */))
301    ///     .map(q!(|_| 42))
302    ///     .all_ticks()
303    /// # }, |mut stream| async move {
304    /// // 42, 42, 42, ...
305    /// # assert_eq!(stream.next().await.unwrap(), 42);
306    /// # assert_eq!(stream.next().await.unwrap(), 42);
307    /// # assert_eq!(stream.next().await.unwrap(), 42);
308    /// # }));
309    /// # }
310    /// ```
311    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
312    where
313        Self: TopLevel<'a> + Sized,
314    {
315        Stream::new(
316            self.clone(),
317            HydroNode::Source {
318                source: HydroSource::Spin(),
319                metadata: self.new_node_metadata(Stream::<
320                    (),
321                    Self,
322                    Unbounded,
323                    TotalOrder,
324                    ExactlyOnce,
325                >::collection_kind()),
326            },
327        )
328    }
329
330    /// Creates a stream from an async [`FuturesStream`].
331    ///
332    /// This is useful for integrating with external async data sources,
333    /// such as network connections or file readers.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
342    /// # }, |mut stream| async move {
343    /// // 1, 2, 3
344    /// # for w in vec![1, 2, 3] {
345    /// #     assert_eq!(stream.next().await.unwrap(), w);
346    /// # }
347    /// # }));
348    /// # }
349    /// ```
350    fn source_stream<T, E>(
351        &self,
352        e: impl QuotedWithContext<'a, E, Self>,
353    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
354    where
355        E: FuturesStream<Item = T> + Unpin,
356        Self: TopLevel<'a> + Sized,
357    {
358        let e = e.splice_untyped_ctx(self);
359
360        let target_location = self.drop_consistency();
361        Stream::new(
362            target_location.clone(),
363            HydroNode::Source {
364                source: HydroSource::Stream(e.into()),
365                metadata: target_location.new_node_metadata(Stream::<
366                    T,
367                    Self::DropConsistency,
368                    Unbounded,
369                    TotalOrder,
370                    ExactlyOnce,
371                >::collection_kind()),
372            },
373        )
374    }
375
376    /// Creates a bounded stream from an iterator.
377    ///
378    /// The iterator is evaluated once at runtime, and all elements are emitted
379    /// in order. This is useful for creating streams from static data or
380    /// for testing.
381    ///
382    /// # Example
383    /// ```rust
384    /// # #[cfg(feature = "deploy")] {
385    /// # use hydro_lang::prelude::*;
386    /// # use futures::StreamExt;
387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
388    /// process.source_iter(q!(vec![1, 2, 3, 4]))
389    /// # }, |mut stream| async move {
390    /// // 1, 2, 3, 4
391    /// # for w in vec![1, 2, 3, 4] {
392    /// #     assert_eq!(stream.next().await.unwrap(), w);
393    /// # }
394    /// # }));
395    /// # }
396    /// ```
397    fn source_iter<T, E>(
398        &self,
399        e: impl QuotedWithContext<'a, E, Self>,
400    ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
401    where
402        E: IntoIterator<Item = T>,
403        Self: Sized,
404    {
405        let e = e.splice_typed_ctx(self);
406
407        let target_location = self.drop_consistency();
408        Stream::new(
409            target_location.clone(),
410            HydroNode::Source {
411                source: HydroSource::Iter(e.into()),
412                metadata: target_location.new_node_metadata(Stream::<
413                    T,
414                    Self::DropConsistency,
415                    Bounded,
416                    TotalOrder,
417                    ExactlyOnce,
418                >::collection_kind()),
419            },
420        )
421    }
422
423    #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
424    /// Creates a stream of membership events for a cluster.
425    ///
426    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
427    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
428    /// keyed by the [`MemberId`] of the cluster member.
429    ///
430    /// This is useful for implementing protocols that need to track cluster membership,
431    /// such as broadcasting to all members or detecting failures.
432    ///
433    /// # Non-Determinism
434    /// This stream is non-deterministic because the timing of membership events, for example
435    /// if a node leaves, the membership event may not be received if the node left before the
436    /// stream was created.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
444    /// let p1 = flow.process::<()>();
445    /// let workers: Cluster<()> = flow.cluster::<()>();
446    /// # // do nothing on each worker
447    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
448    /// let cluster_members = p1.source_cluster_members(&workers, nondet!(/** late joiners may miss events */));
449    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
450    /// // if there are 4 members in the cluster, we would see a join event for each
451    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
452    /// # }, |mut stream| async move {
453    /// # let mut results = Vec::new();
454    /// # for w in 0..4 {
455    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
456    /// # }
457    /// # results.sort();
458    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
459    /// # }));
460    /// # }
461    /// ```
462    fn source_cluster_members<C: 'a>(
463        &self,
464        cluster: &Cluster<'a, C>,
465        nondet_start: NonDet,
466    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
467    where
468        Self: TopLevel<'a> + Sized,
469    {
470        self.source_cluster_membership_stream(cluster, nondet_start)
471    }
472
473    /// Creates a stream of membership events for a cluster.
474    ///
475    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
476    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
477    /// keyed by the [`MemberId`] of the cluster member.
478    ///
479    /// This is useful for implementing protocols that need to track cluster membership,
480    /// such as broadcasting to all members or detecting failures.
481    ///
482    /// # Non-Determinism
483    /// This stream is non-deterministic because the timing of membership events, for example
484    /// if a node leaves, the membership event may not be received if the node left before the
485    /// stream was created.
486    ///
487    /// # Example
488    /// ```rust
489    /// # #[cfg(feature = "deploy")] {
490    /// # use hydro_lang::prelude::*;
491    /// # use futures::StreamExt;
492    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
493    /// let p1 = flow.process::<()>();
494    /// let workers: Cluster<()> = flow.cluster::<()>();
495    /// # // do nothing on each worker
496    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
497    /// let cluster_members = p1.source_cluster_membership_stream(&workers, nondet!(/** late joiners may miss events */));
498    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
499    /// // if there are 4 members in the cluster, we would see a join event for each
500    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
501    /// # }, |mut stream| async move {
502    /// # let mut results = Vec::new();
503    /// # for w in 0..4 {
504    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
505    /// # }
506    /// # results.sort();
507    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
508    /// # }));
509    /// # }
510    /// ```
511    fn source_cluster_membership_stream<C: 'a>(
512        &self,
513        cluster: &Cluster<'a, C>,
514        _nondet_start: NonDet,
515    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
516    where
517        Self: TopLevel<'a> + Sized,
518    {
519        let target_consistency = self.drop_consistency();
520        Stream::new(
521            target_consistency.clone(),
522            HydroNode::Source {
523                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
524                metadata: target_consistency.new_node_metadata(Stream::<
525                    (TaglessMemberId, MembershipEvent),
526                    Self,
527                    Unbounded,
528                    TotalOrder,
529                    ExactlyOnce,
530                >::collection_kind(
531                )),
532            },
533        )
534        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
535        .into_keyed()
536    }
537
538    /// Creates a one-way connection from an external process to receive raw bytes.
539    ///
540    /// Returns a port handle for the external process to connect to, and a stream
541    /// of received byte buffers.
542    ///
543    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
544    /// or [`Location::source_external_bincode`].
545    fn source_external_bytes<L>(
546        &self,
547        from: &External<L>,
548    ) -> (
549        ExternalBytesPort,
550        Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
551    )
552    where
553        Self: TopLevel<'a> + Sized,
554    {
555        let (port, stream, sink) =
556            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
557
558        sink.complete(stream.location().source_iter(q!([])));
559
560        (port, stream)
561    }
562
563    /// Creates a one-way connection from an external process to receive bincode-serialized data.
564    ///
565    /// Returns a sink handle for the external process to send data to, and a stream
566    /// of received values.
567    ///
568    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
569    #[expect(clippy::type_complexity, reason = "stream markers")]
570    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
571        &self,
572        from: &External<L>,
573    ) -> (
574        ExternalBincodeSink<T, NotMany, O, R>,
575        Stream<T, Self::DropConsistency, Unbounded, O, R>,
576    )
577    where
578        Self: TopLevel<'a> + Sized,
579        T: Serialize + DeserializeOwned,
580    {
581        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
582        sink.complete(stream.location().source_iter(q!([])));
583
584        (
585            ExternalBincodeSink {
586                process_key: from.key,
587                port_id: port.port_id,
588                _phantom: PhantomData,
589            },
590            stream.weaken_ordering().weaken_retries(),
591        )
592    }
593
594    /// Sets up a simulated input port on this location for testing.
595    ///
596    /// Returns a handle to send messages to the location as well as a stream
597    /// of received messages. This is only available when the `sim` feature is enabled.
598    #[cfg(feature = "sim")]
599    #[expect(clippy::type_complexity, reason = "stream markers")]
600    fn sim_input<T, O: Ordering, R: Retries>(
601        &self,
602    ) -> (
603        SimSender<T, O, R>,
604        Stream<T, Self::DropConsistency, Unbounded, O, R>,
605    )
606    where
607        Self: TopLevel<'a> + Sized,
608        T: Serialize + DeserializeOwned,
609    {
610        let external_location: External<'a, ()> = External {
611            key: LocationKey::FIRST,
612            flow_state: self.flow_state().clone(),
613            _phantom: PhantomData,
614        };
615
616        let (external, stream) = self.source_external_bincode(&external_location);
617
618        (SimSender(external.port_id, PhantomData), stream)
619    }
620
621    /// Creates an external input stream for embedded deployment mode.
622    ///
623    /// The `name` parameter specifies the name of the generated function parameter
624    /// that will supply data to this stream at runtime. The generated function will
625    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
626    fn embedded_input<T>(
627        &self,
628        name: impl Into<String>,
629    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
630    where
631        Self: TopLevel<'a> + Sized,
632    {
633        let ident = syn::Ident::new(&name.into(), Span::call_site());
634
635        let target_location = self.drop_consistency();
636        Stream::new(
637            target_location.clone(),
638            HydroNode::Source {
639                source: HydroSource::Embedded(ident),
640                metadata: target_location.new_node_metadata(Stream::<
641                    T,
642                    Self,
643                    Unbounded,
644                    TotalOrder,
645                    ExactlyOnce,
646                >::collection_kind()),
647            },
648        )
649    }
650
651    /// Creates an embedded singleton input for embedded deployment mode.
652    ///
653    /// The `name` parameter specifies the name of the generated function parameter
654    /// that will supply data to this singleton at runtime. The generated function will
655    /// accept a plain `T` parameter with this name.
656    fn embedded_singleton_input<T>(
657        &self,
658        name: impl Into<String>,
659    ) -> Singleton<T, Self::DropConsistency, Bounded>
660    where
661        Self: TopLevel<'a> + Sized,
662    {
663        let ident = syn::Ident::new(&name.into(), Span::call_site());
664
665        let target_location = self.drop_consistency();
666        Singleton::new(
667            target_location.clone(),
668            HydroNode::Source {
669                source: HydroSource::EmbeddedSingleton(ident),
670                metadata: target_location
671                    .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
672            },
673        )
674    }
675
676    /// Establishes a server on this location to receive a bidirectional connection from a single
677    /// client, identified by the given `External` handle. Returns a port handle for the external
678    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
679    /// messages.
680    ///
681    /// # Example
682    /// ```rust
683    /// # #[cfg(feature = "deploy")] {
684    /// # use hydro_lang::prelude::*;
685    /// # use hydro_deploy::Deployment;
686    /// # use futures::{SinkExt, StreamExt};
687    /// # tokio_test::block_on(async {
688    /// # use bytes::Bytes;
689    /// # use hydro_lang::location::NetworkHint;
690    /// # use tokio_util::codec::LengthDelimitedCodec;
691    /// # let mut flow = FlowBuilder::new();
692    /// let node = flow.process::<()>();
693    /// let external = flow.external::<()>();
694    /// let (port, incoming, outgoing) =
695    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
696    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
697    ///     let mut resp: Vec<u8> = data.into();
698    ///     resp.push(42);
699    ///     resp.into() // : Bytes
700    /// })));
701    ///
702    /// # let mut deployment = Deployment::new();
703    /// let nodes = flow // ... with_process and with_external
704    /// #     .with_process(&node, deployment.Localhost())
705    /// #     .with_external(&external, deployment.Localhost())
706    /// #     .deploy(&mut deployment);
707    ///
708    /// deployment.deploy().await.unwrap();
709    /// deployment.start().await.unwrap();
710    ///
711    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
712    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
713    /// assert_eq!(
714    ///     external_out.next().await.unwrap().unwrap(),
715    ///     vec![1, 2, 3, 42]
716    /// );
717    /// # });
718    /// # }
719    /// ```
720    #[expect(clippy::type_complexity, reason = "stream markers")]
721    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
722        &self,
723        from: &External<L>,
724        port_hint: NetworkHint,
725    ) -> (
726        ExternalBytesPort<NotMany>,
727        Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
728        ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
729    )
730    where
731        Self: TopLevel<'a> + Sized,
732    {
733        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
734        let target_consistency = self.drop_consistency();
735
736        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
737            T,
738            Self::DropConsistency,
739            Unbounded,
740            TotalOrder,
741            ExactlyOnce,
742        >>();
743        let mut flow_state_borrow = self.flow_state().borrow_mut();
744
745        flow_state_borrow.push_root(HydroRoot::SendExternal {
746            to_external_key: from.key,
747            to_port_id: next_external_port_id,
748            to_many: false,
749            unpaired: false,
750            serialize_fn: None,
751            instantiate_fn: DebugInstantiate::Building,
752            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
753            op_metadata: HydroIrOpMetadata::new(),
754        });
755
756        let raw_stream: Stream<
757            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
758            Self::DropConsistency,
759            Unbounded,
760            TotalOrder,
761            ExactlyOnce,
762        > = Stream::new(
763            target_consistency.clone(),
764            HydroNode::ExternalInput {
765                from_external_key: from.key,
766                from_port_id: next_external_port_id,
767                from_many: false,
768                codec_type: quote_type::<Codec>().into(),
769                port_hint,
770                instantiate_fn: DebugInstantiate::Building,
771                deserialize_fn: None,
772                metadata: target_consistency.new_node_metadata(Stream::<
773                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
774                    Self::DropConsistency,
775                    Unbounded,
776                    TotalOrder,
777                    ExactlyOnce,
778                >::collection_kind(
779                )),
780            },
781        );
782
783        (
784            ExternalBytesPort {
785                process_key: from.key,
786                port_id: next_external_port_id,
787                _phantom: PhantomData,
788            },
789            raw_stream.flatten_ordered(),
790            fwd_ref,
791        )
792    }
793
794    /// Establishes a bidirectional connection from a single external client using bincode serialization.
795    ///
796    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
797    /// and a handle to send outgoing messages. This is a convenience wrapper around
798    /// [`Location::bind_single_client`] that uses bincode for serialization.
799    ///
800    /// # Type Parameters
801    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
802    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
803    #[expect(clippy::type_complexity, reason = "stream markers")]
804    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
805        &self,
806        from: &External<L>,
807    ) -> (
808        ExternalBincodeBidi<InT, OutT, NotMany>,
809        Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
810        ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
811    )
812    where
813        Self: TopLevel<'a> + Sized,
814    {
815        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
816
817        let target_consistency = self.drop_consistency();
818        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
819            OutT,
820            Self::DropConsistency,
821            Unbounded,
822            TotalOrder,
823            ExactlyOnce,
824        >>();
825        let mut flow_state_borrow = self.flow_state().borrow_mut();
826
827        let root = get_this_crate();
828
829        let out_t_type = quote_type::<OutT>();
830        let ser_fn: syn::Expr = syn::parse_quote! {
831            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
832                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
833            )
834        };
835
836        flow_state_borrow.push_root(HydroRoot::SendExternal {
837            to_external_key: from.key,
838            to_port_id: next_external_port_id,
839            to_many: false,
840            unpaired: false,
841            serialize_fn: Some(ser_fn.into()),
842            instantiate_fn: DebugInstantiate::Building,
843            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
844            op_metadata: HydroIrOpMetadata::new(),
845        });
846
847        let in_t_type = quote_type::<InT>();
848
849        let deser_fn: syn::Expr = syn::parse_quote! {
850            |res| {
851                let b = res.unwrap();
852                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
853            }
854        };
855
856        let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
857            Stream::new(
858                target_consistency.clone(),
859                HydroNode::ExternalInput {
860                    from_external_key: from.key,
861                    from_port_id: next_external_port_id,
862                    from_many: false,
863                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
864                    port_hint: NetworkHint::Auto,
865                    instantiate_fn: DebugInstantiate::Building,
866                    deserialize_fn: Some(deser_fn.into()),
867                    metadata: target_consistency.new_node_metadata(Stream::<
868                        InT,
869                        Self::DropConsistency,
870                        Unbounded,
871                        TotalOrder,
872                        ExactlyOnce,
873                    >::collection_kind(
874                    )),
875                },
876            );
877
878        (
879            ExternalBincodeBidi {
880                process_key: from.key,
881                port_id: next_external_port_id,
882                _phantom: PhantomData,
883            },
884            raw_stream,
885            fwd_ref,
886        )
887    }
888
889    /// Establishes a server on this location to receive bidirectional connections from multiple
890    /// external clients using raw bytes.
891    ///
892    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
893    /// connections. Each client is assigned a unique `u64` identifier.
894    ///
895    /// Returns:
896    /// - A port handle for external processes to connect to
897    /// - A keyed stream of incoming messages, keyed by client ID
898    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
899    /// - A handle to send outgoing messages, keyed by client ID
900    #[expect(clippy::type_complexity, reason = "stream markers")]
901    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
902        &self,
903        from: &External<L>,
904        port_hint: NetworkHint,
905    ) -> (
906        ExternalBytesPort<Many>,
907        KeyedStream<
908            u64,
909            <Codec as Decoder>::Item,
910            Self::DropConsistency,
911            Unbounded,
912            TotalOrder,
913            ExactlyOnce,
914        >,
915        KeyedStream<
916            u64,
917            MembershipEvent,
918            Self::DropConsistency,
919            Unbounded,
920            TotalOrder,
921            ExactlyOnce,
922        >,
923        ForwardHandle<
924            'a,
925            KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
926        >,
927    )
928    where
929        Self: TopLevel<'a> + Sized,
930    {
931        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
932
933        let target_consistency = self.drop_consistency();
934        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
935            u64,
936            T,
937            Self::DropConsistency,
938            Unbounded,
939            NoOrder,
940            ExactlyOnce,
941        >>();
942        let mut flow_state_borrow = self.flow_state().borrow_mut();
943
944        flow_state_borrow.push_root(HydroRoot::SendExternal {
945            to_external_key: from.key,
946            to_port_id: next_external_port_id,
947            to_many: true,
948            unpaired: false,
949            serialize_fn: None,
950            instantiate_fn: DebugInstantiate::Building,
951            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
952            op_metadata: HydroIrOpMetadata::new(),
953        });
954
955        let raw_stream: Stream<
956            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
957            Self::DropConsistency,
958            Unbounded,
959            TotalOrder,
960            ExactlyOnce,
961        > = Stream::new(
962            target_consistency.clone(),
963            HydroNode::ExternalInput {
964                from_external_key: from.key,
965                from_port_id: next_external_port_id,
966                from_many: true,
967                codec_type: quote_type::<Codec>().into(),
968                port_hint,
969                instantiate_fn: DebugInstantiate::Building,
970                deserialize_fn: None,
971                metadata: target_consistency.new_node_metadata(Stream::<
972                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
973                    Self::DropConsistency,
974                    Unbounded,
975                    TotalOrder,
976                    ExactlyOnce,
977                >::collection_kind(
978                )),
979            },
980        );
981
982        let membership_stream_ident = syn::Ident::new(
983            &format!(
984                "__hydro_deploy_many_{}_{}_membership",
985                from.key, next_external_port_id
986            ),
987            Span::call_site(),
988        );
989        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
990        let raw_membership_stream: KeyedStream<
991            u64,
992            bool,
993            Self::DropConsistency,
994            Unbounded,
995            TotalOrder,
996            ExactlyOnce,
997        > = KeyedStream::new(
998            target_consistency.clone(),
999            HydroNode::Source {
1000                source: HydroSource::Stream(membership_stream_expr.into()),
1001                metadata: target_consistency.new_node_metadata(KeyedStream::<
1002                    u64,
1003                    bool,
1004                    Self::DropConsistency,
1005                    Unbounded,
1006                    TotalOrder,
1007                    ExactlyOnce,
1008                >::collection_kind(
1009                )),
1010            },
1011        );
1012
1013        (
1014            ExternalBytesPort {
1015                process_key: from.key,
1016                port_id: next_external_port_id,
1017                _phantom: PhantomData,
1018            },
1019            raw_stream
1020                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
1021                .into_keyed(),
1022            raw_membership_stream.map(q!(|join| {
1023                if join {
1024                    MembershipEvent::Joined
1025                } else {
1026                    MembershipEvent::Left
1027                }
1028            })),
1029            fwd_ref,
1030        )
1031    }
1032
1033    /// Establishes a server on this location to receive bidirectional connections from multiple
1034    /// external clients using bincode serialization.
1035    ///
1036    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
1037    /// client connections. Each client is assigned a unique `u64` identifier.
1038    ///
1039    /// Returns:
1040    /// - A port handle for external processes to connect to
1041    /// - A keyed stream of incoming messages, keyed by client ID
1042    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
1043    /// - A handle to send outgoing messages, keyed by client ID
1044    ///
1045    /// # Type Parameters
1046    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
1047    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
1048    #[expect(clippy::type_complexity, reason = "stream markers")]
1049    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1050        &self,
1051        from: &External<L>,
1052    ) -> (
1053        ExternalBincodeBidi<InT, OutT, Many>,
1054        KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1055        KeyedStream<
1056            u64,
1057            MembershipEvent,
1058            Self::DropConsistency,
1059            Unbounded,
1060            TotalOrder,
1061            ExactlyOnce,
1062        >,
1063        ForwardHandle<
1064            'a,
1065            KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1066        >,
1067    )
1068    where
1069        Self: TopLevel<'a> + Sized,
1070    {
1071        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1072
1073        let target_consistency = self.drop_consistency();
1074        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1075            u64,
1076            OutT,
1077            Self::DropConsistency,
1078            Unbounded,
1079            NoOrder,
1080            ExactlyOnce,
1081        >>();
1082        let mut flow_state_borrow = self.flow_state().borrow_mut();
1083
1084        let root = get_this_crate();
1085
1086        let out_t_type = quote_type::<OutT>();
1087        let ser_fn: syn::Expr = syn::parse_quote! {
1088            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1089                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1090            )
1091        };
1092
1093        flow_state_borrow.push_root(HydroRoot::SendExternal {
1094            to_external_key: from.key,
1095            to_port_id: next_external_port_id,
1096            to_many: true,
1097            unpaired: false,
1098            serialize_fn: Some(ser_fn.into()),
1099            instantiate_fn: DebugInstantiate::Building,
1100            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1101            op_metadata: HydroIrOpMetadata::new(),
1102        });
1103
1104        let in_t_type = quote_type::<InT>();
1105
1106        let deser_fn: syn::Expr = syn::parse_quote! {
1107            |res| {
1108                let (id, b) = res.unwrap();
1109                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1110            }
1111        };
1112
1113        let raw_stream: KeyedStream<
1114            u64,
1115            InT,
1116            Self::DropConsistency,
1117            Unbounded,
1118            TotalOrder,
1119            ExactlyOnce,
1120        > = KeyedStream::new(
1121            target_consistency.clone(),
1122            HydroNode::ExternalInput {
1123                from_external_key: from.key,
1124                from_port_id: next_external_port_id,
1125                from_many: true,
1126                codec_type: quote_type::<LengthDelimitedCodec>().into(),
1127                port_hint: NetworkHint::Auto,
1128                instantiate_fn: DebugInstantiate::Building,
1129                deserialize_fn: Some(deser_fn.into()),
1130                metadata: target_consistency.new_node_metadata(KeyedStream::<
1131                    u64,
1132                    InT,
1133                    Self::DropConsistency,
1134                    Unbounded,
1135                    TotalOrder,
1136                    ExactlyOnce,
1137                >::collection_kind(
1138                )),
1139            },
1140        );
1141
1142        let membership_stream_ident = syn::Ident::new(
1143            &format!(
1144                "__hydro_deploy_many_{}_{}_membership",
1145                from.key, next_external_port_id
1146            ),
1147            Span::call_site(),
1148        );
1149        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1150        let raw_membership_stream: KeyedStream<
1151            u64,
1152            bool,
1153            Self::DropConsistency,
1154            Unbounded,
1155            TotalOrder,
1156            ExactlyOnce,
1157        > = KeyedStream::new(
1158            target_consistency.clone(),
1159            HydroNode::Source {
1160                source: HydroSource::Stream(membership_stream_expr.into()),
1161                metadata: target_consistency.new_node_metadata(KeyedStream::<
1162                    u64,
1163                    bool,
1164                    Self::DropConsistency,
1165                    Unbounded,
1166                    TotalOrder,
1167                    ExactlyOnce,
1168                >::collection_kind(
1169                )),
1170            },
1171        );
1172
1173        (
1174            ExternalBincodeBidi {
1175                process_key: from.key,
1176                port_id: next_external_port_id,
1177                _phantom: PhantomData,
1178            },
1179            raw_stream,
1180            raw_membership_stream.map(q!(|join| {
1181                if join {
1182                    MembershipEvent::Joined
1183                } else {
1184                    MembershipEvent::Left
1185                }
1186            })),
1187            fwd_ref,
1188        )
1189    }
1190
1191    /// Bridges user-owned async code to the dataflow as a **bidirectional sidecar**.
1192    ///
1193    /// The closure is called once at startup and must return a
1194    /// `(Stream<InT>, Sink<OutT>)` pair. The framework reads from the stream
1195    /// (items flowing *into* the dataflow) and writes to the sink (items flowing
1196    /// *out* to the sidecar). The user controls buffering, backpressure, and
1197    /// internal lifecycle — Hydro only sees the stream/sink interface.
1198    ///
1199    /// This will hopefully make it easy to integrate hydro with existing frameworks,
1200    /// for example grpc code generated service endpoints.
1201    ///
1202    /// # Returns
1203    /// - A `Stream<InT>` carrying items from the sidecar into the dataflow.
1204    /// - A [`ForwardHandle`] expecting a `Stream<OutT>` that the user completes
1205    ///   with items destined for the sidecar.
1206    ///
1207    /// # Example
1208    ///
1209    /// ```rust
1210    /// # #[cfg(feature = "deploy")] {
1211    /// # use hydro_lang::prelude::*;
1212    /// # use futures::StreamExt;
1213    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1214    /// // Sidecar that echoes whatever it receives back into the dataflow.
1215    /// let (inbound, response_handle) = process.sidecar_bidi::<String, String, _>(q!(|| {
1216    ///     let (to_df_tx, to_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1217    ///     let (from_df_tx, mut from_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1218    ///
1219    ///     // Spawn the sidecar: echoes items from the dataflow back into it.
1220    ///     tokio::spawn(async move {
1221    ///         while let Some(msg) = from_df_rx.recv().await {
1222    ///             to_df_tx.send(msg).await.ok();
1223    ///         }
1224    ///     });
1225    ///
1226    ///     // Return the framework-facing ends (concrete types, no boxing needed).
1227    ///     let stream = tokio_stream::wrappers::ReceiverStream::new(to_df_rx);
1228    ///     let sink = tokio_util::sync::PollSender::new(from_df_tx);
1229    ///     (stream, sink)
1230    /// }));
1231    ///
1232    /// // Send "hello" into the sidecar via the response channel.
1233    /// let input = process.source_stream(q!(futures::stream::iter(vec!["hello".to_string()])));
1234    /// response_handle.complete(input);
1235    ///
1236    /// // The sidecar echoes it back — assert we get "hello" out.
1237    /// inbound
1238    /// # }, |mut stream| async move {
1239    /// #     assert_eq!(stream.next().await.unwrap(), "hello");
1240    /// # }));
1241    /// # }
1242    /// ```
1243    #[expect(clippy::type_complexity, reason = "stream markers")]
1244    fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1245        &self,
1246        sidecar: impl QuotedWithContext<'a, F, Self>,
1247    ) -> (
1248        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1249        ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1250    )
1251    where
1252        Self: Sized + TopLevel<'a>,
1253    {
1254        let location_key = Location::id(self).key();
1255
1256        let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1257        let (stream_ident, sink_ident) = sidecar_id.idents();
1258
1259        let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1260        self.flow_state()
1261            .borrow_mut()
1262            .sidecars
1263            .push(crate::compile::builder::Sidecar::Bidi {
1264                location_key,
1265                sidecar_id,
1266                sidecar_closure: Box::new(sidecar_closure),
1267            });
1268
1269        // Inbound stream: reads from the stream returned by the sidecar closure
1270        let source_expr: syn::Expr = parse_quote! {
1271            #stream_ident
1272        };
1273        let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1274            self.clone(),
1275            HydroNode::Source {
1276                source: HydroSource::Stream(source_expr.into()),
1277                metadata: self.new_node_metadata(Stream::<
1278                    InT,
1279                    Self,
1280                    Unbounded,  // TODO: maybe bounded sidecars are interesting..?
1281                    TotalOrder, // TODO: NoOrder..?
1282                    ExactlyOnce,
1283                >::collection_kind()),
1284            },
1285        );
1286
1287        // Outbound: forward_ref cycle feeding the sink returned by the sidecar closure
1288        let (fwd_ref, to_sink): (
1289            ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1290            Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1291        ) = self.forward_ref();
1292
1293        let sink_expr: syn::Expr = parse_quote! {
1294            #sink_ident
1295        };
1296
1297        let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1298        self.flow_state()
1299            .borrow_mut()
1300            .try_push_root(HydroRoot::DestSink {
1301                sink: sink_expr.into(),
1302                input: Box::new(sink_input_ir),
1303                op_metadata: HydroIrOpMetadata::new(),
1304            });
1305
1306        (inbound, fwd_ref)
1307    }
1308
1309    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1310    ///
1311    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1312    /// `T: Clone`.
1313    ///
1314    /// # Example
1315    /// ```rust
1316    /// # #[cfg(feature = "deploy")] {
1317    /// # use hydro_lang::prelude::*;
1318    /// # use futures::StreamExt;
1319    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1320    /// let singleton = process.singleton(q!(5));
1321    /// # singleton.into_stream()
1322    /// # }, |mut stream| async move {
1323    /// // 5
1324    /// # assert_eq!(stream.next().await.unwrap(), 5);
1325    /// # }));
1326    /// # }
1327    /// ```
1328    fn singleton<T>(
1329        &self,
1330        e: impl QuotedWithContext<'a, T, Self>,
1331    ) -> Singleton<T, Self::DropConsistency, Bounded>
1332    where
1333        Self: Sized,
1334    {
1335        let e = e.splice_untyped_ctx(self);
1336
1337        let target_location = self.drop_consistency();
1338        Singleton::new(
1339            target_location.clone(),
1340            HydroNode::SingletonSource {
1341                value: e.into(),
1342                first_tick_only: false,
1343                metadata: target_location.new_node_metadata(Singleton::<
1344                    T,
1345                    Self::DropConsistency,
1346                    Bounded,
1347                >::collection_kind()),
1348            },
1349        )
1350    }
1351
1352    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1353    ///
1354    /// This is a convenience method equivalent to
1355    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1356    /// pattern when initializing a singleton from an async computation.
1357    ///
1358    /// # Example
1359    /// ```rust
1360    /// # #[cfg(feature = "deploy")] {
1361    /// # use hydro_lang::prelude::*;
1362    /// # use futures::StreamExt;
1363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1364    /// let singleton = process.singleton_future(q!(async { 42 }));
1365    /// singleton.into_stream()
1366    /// # }, |mut stream| async move {
1367    /// // 42
1368    /// # assert_eq!(stream.next().await.unwrap(), 42);
1369    /// # }));
1370    /// # }
1371    /// ```
1372    ///
1373    /// [`Future`]: std::future::Future
1374    fn singleton_future<F>(
1375        &self,
1376        e: impl QuotedWithContext<'a, F, Self>,
1377    ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1378    where
1379        F: Future,
1380        Self: Sized,
1381    {
1382        self.singleton(e).resolve_future_blocking()
1383    }
1384
1385    /// Generates a stream that emits `()` at a fixed interval.
1386    ///
1387    /// The first tick completes immediately. Missed ticks will be scheduled
1388    /// as soon as possible.
1389    ///
1390    /// Because this only emits `()`, the non-determinism of *when* events fire
1391    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1392    /// [`NonDet`] guard is required.
1393    fn source_interval(
1394        &self,
1395        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1396    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1397    where
1398        Self: TopLevel<'a> + Sized,
1399    {
1400        self.source_stream(q!(tokio_stream::StreamExt::map(
1401            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1402            |_| ()
1403        )))
1404        .assert_has_consistency_of_trusted(
1405            manual_proof!(/** interval does not reveal timestamps */),
1406        )
1407    }
1408
1409    /// Generates a stream that emits `()` at a fixed interval, after an
1410    /// initial delay.
1411    ///
1412    /// Because this only emits `()`, the non-determinism of *when* events fire
1413    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1414    /// [`NonDet`] guard is required.
1415    fn source_interval_delayed(
1416        &self,
1417        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1418        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1419    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1420    where
1421        Self: TopLevel<'a> + Sized,
1422    {
1423        self.source_stream(q!(tokio_stream::StreamExt::map(
1424            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1425                tokio::time::Instant::now() + delay,
1426                interval,
1427            )),
1428            |_| ()
1429        )))
1430        .assert_has_consistency_of_trusted(
1431            manual_proof!(/** interval does not reveal timestamps */),
1432        )
1433    }
1434
1435    /// Creates a forward reference, allowing a stream to be used before its source is defined.
1436    ///
1437    /// Returns a `(handle, placeholder)` pair. Use the placeholder in the dataflow graph,
1438    /// then call `handle.complete(actual_stream)` to wire in the real source.
1439    ///
1440    /// This is useful for mutually-dependent dataflows or when the definition order
1441    /// doesn't match the data flow direction. For feedback loops, prefer [`Tick::cycle`]
1442    /// instead, which automatically defers values by one tick.
1443    ///
1444    /// # Panics
1445    /// Panics if the forward reference creates a synchronous cycle (i.e., the completed
1446    /// stream transitively depends on the placeholder without a `defer_tick` or network
1447    /// hop in between).
1448    ///
1449    /// # Example
1450    /// ```rust
1451    /// # #[cfg(feature = "deploy")] {
1452    /// # use hydro_lang::prelude::*;
1453    /// # use hydro_lang::live_collections::stream::NoOrder;
1454    /// # use futures::StreamExt;
1455    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1456    /// // Create a forward reference to define a stream that will be completed later
1457    /// let (complete, forward_stream) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1458    ///
1459    /// // Use the forward reference as input to another computation
1460    /// let output: Stream<_, _, _, NoOrder> = forward_stream.map(q!(|x| x * 2));
1461    ///
1462    /// // Complete the forward reference with the actual source
1463    /// let source: Stream<_, _, Unbounded> = process.source_iter(q!([1, 2, 3])).into();
1464    /// complete.complete(source);
1465    /// output
1466    /// # }, |mut stream| async move {
1467    /// // 2, 4, 6
1468    /// # assert_eq!(stream.next().await.unwrap(), 2);
1469    /// # assert_eq!(stream.next().await.unwrap(), 4);
1470    /// # assert_eq!(stream.next().await.unwrap(), 6);
1471    /// # }));
1472    /// # }
1473    /// ```
1474    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1475    where
1476        S: CycleCollection<'a, ForwardRef, Location = Self>,
1477    {
1478        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1479        (
1480            ForwardHandle::new(cycle_id, Location::id(self)),
1481            S::create_source(cycle_id, self.clone()),
1482        )
1483    }
1484}
1485
1486#[cfg(feature = "deploy")]
1487#[cfg(test)]
1488mod tests {
1489    use std::collections::HashSet;
1490
1491    use futures::{SinkExt, StreamExt};
1492    use hydro_deploy::Deployment;
1493    use stageleft::q;
1494    use tokio_util::codec::LengthDelimitedCodec;
1495
1496    use crate::compile::builder::FlowBuilder;
1497    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1498    use crate::location::{Location, NetworkHint};
1499    use crate::nondet::nondet;
1500
1501    #[tokio::test]
1502    async fn top_level_singleton_replay_cardinality() {
1503        let mut deployment = Deployment::new();
1504
1505        let mut flow = FlowBuilder::new();
1506        let node = flow.process::<()>();
1507        let external = flow.external::<()>();
1508
1509        let (in_port, input) =
1510            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1511        let singleton = node.singleton(q!(123));
1512        let tick = node.tick();
1513        let out = input
1514            .batch(&tick, nondet!(/** test */))
1515            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1516            .cross_singleton(
1517                singleton
1518                    .snapshot(&tick, nondet!(/** test */))
1519                    .into_stream()
1520                    .count(),
1521            )
1522            .all_ticks()
1523            .send_bincode_external(&external);
1524
1525        let nodes = flow
1526            .with_process(&node, deployment.Localhost())
1527            .with_external(&external, deployment.Localhost())
1528            .deploy(&mut deployment);
1529
1530        deployment.deploy().await.unwrap();
1531
1532        let mut external_in = nodes.connect(in_port).await;
1533        let mut external_out = nodes.connect(out).await;
1534
1535        deployment.start().await.unwrap();
1536
1537        external_in.send(1).await.unwrap();
1538        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1539
1540        external_in.send(2).await.unwrap();
1541        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1542    }
1543
1544    #[tokio::test]
1545    async fn tick_singleton_replay_cardinality() {
1546        let mut deployment = Deployment::new();
1547
1548        let mut flow = FlowBuilder::new();
1549        let node = flow.process::<()>();
1550        let external = flow.external::<()>();
1551
1552        let (in_port, input) =
1553            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1554        let tick = node.tick();
1555        let singleton = tick.singleton(q!(123));
1556        let out = input
1557            .batch(&tick, nondet!(/** test */))
1558            .cross_singleton(singleton.clone())
1559            .cross_singleton(singleton.into_stream().count())
1560            .all_ticks()
1561            .send_bincode_external(&external);
1562
1563        let nodes = flow
1564            .with_process(&node, deployment.Localhost())
1565            .with_external(&external, deployment.Localhost())
1566            .deploy(&mut deployment);
1567
1568        deployment.deploy().await.unwrap();
1569
1570        let mut external_in = nodes.connect(in_port).await;
1571        let mut external_out = nodes.connect(out).await;
1572
1573        deployment.start().await.unwrap();
1574
1575        external_in.send(1).await.unwrap();
1576        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1577
1578        external_in.send(2).await.unwrap();
1579        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1580    }
1581
1582    #[tokio::test]
1583    async fn external_bytes() {
1584        let mut deployment = Deployment::new();
1585
1586        let mut flow = FlowBuilder::new();
1587        let first_node = flow.process::<()>();
1588        let external = flow.external::<()>();
1589
1590        let (in_port, input) = first_node.source_external_bytes(&external);
1591        let out = input.send_bincode_external(&external);
1592
1593        let nodes = flow
1594            .with_process(&first_node, deployment.Localhost())
1595            .with_external(&external, deployment.Localhost())
1596            .deploy(&mut deployment);
1597
1598        deployment.deploy().await.unwrap();
1599
1600        let mut external_in = nodes.connect(in_port).await.1;
1601        let mut external_out = nodes.connect(out).await;
1602
1603        deployment.start().await.unwrap();
1604
1605        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1606
1607        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1608    }
1609
1610    #[tokio::test]
1611    async fn multi_external_source() {
1612        let mut deployment = Deployment::new();
1613
1614        let mut flow = FlowBuilder::new();
1615        let first_node = flow.process::<()>();
1616        let external = flow.external::<()>();
1617
1618        let (in_port, input, _membership, complete_sink) =
1619            first_node.bidi_external_many_bincode(&external);
1620        let out = input.entries().send_bincode_external(&external);
1621        complete_sink.complete(
1622            first_node
1623                .source_iter::<(u64, ()), _>(q!([]))
1624                .into_keyed()
1625                .weaken_ordering(),
1626        );
1627
1628        let nodes = flow
1629            .with_process(&first_node, deployment.Localhost())
1630            .with_external(&external, deployment.Localhost())
1631            .deploy(&mut deployment);
1632
1633        deployment.deploy().await.unwrap();
1634
1635        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1636        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1637        let external_out = nodes.connect(out).await;
1638
1639        deployment.start().await.unwrap();
1640
1641        external_in_1.send(123).await.unwrap();
1642        external_in_2.send(456).await.unwrap();
1643
1644        assert_eq!(
1645            external_out.take(2).collect::<HashSet<_>>().await,
1646            vec![(0, 123), (1, 456)].into_iter().collect()
1647        );
1648    }
1649
1650    #[tokio::test]
1651    async fn second_connection_only_multi_source() {
1652        let mut deployment = Deployment::new();
1653
1654        let mut flow = FlowBuilder::new();
1655        let first_node = flow.process::<()>();
1656        let external = flow.external::<()>();
1657
1658        let (in_port, input, _membership, complete_sink) =
1659            first_node.bidi_external_many_bincode(&external);
1660        let out = input.entries().send_bincode_external(&external);
1661        complete_sink.complete(
1662            first_node
1663                .source_iter::<(u64, ()), _>(q!([]))
1664                .into_keyed()
1665                .weaken_ordering(),
1666        );
1667
1668        let nodes = flow
1669            .with_process(&first_node, deployment.Localhost())
1670            .with_external(&external, deployment.Localhost())
1671            .deploy(&mut deployment);
1672
1673        deployment.deploy().await.unwrap();
1674
1675        // intentionally skipped to test stream waking logic
1676        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1677        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1678        let mut external_out = nodes.connect(out).await;
1679
1680        deployment.start().await.unwrap();
1681
1682        external_in_2.send(456).await.unwrap();
1683
1684        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1685    }
1686
1687    #[tokio::test]
1688    async fn multi_external_bytes() {
1689        let mut deployment = Deployment::new();
1690
1691        let mut flow = FlowBuilder::new();
1692        let first_node = flow.process::<()>();
1693        let external = flow.external::<()>();
1694
1695        let (in_port, input, _membership, complete_sink) = first_node
1696            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1697        let out = input.entries().send_bincode_external(&external);
1698        complete_sink.complete(
1699            first_node
1700                .source_iter(q!([]))
1701                .into_keyed()
1702                .weaken_ordering(),
1703        );
1704
1705        let nodes = flow
1706            .with_process(&first_node, deployment.Localhost())
1707            .with_external(&external, deployment.Localhost())
1708            .deploy(&mut deployment);
1709
1710        deployment.deploy().await.unwrap();
1711
1712        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1713        let mut external_in_2 = nodes.connect(in_port).await.1;
1714        let external_out = nodes.connect(out).await;
1715
1716        deployment.start().await.unwrap();
1717
1718        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1719        external_in_2.send(vec![4, 5].into()).await.unwrap();
1720
1721        assert_eq!(
1722            external_out.take(2).collect::<HashSet<_>>().await,
1723            vec![
1724                (0, (&[1u8, 2, 3] as &[u8]).into()),
1725                (1, (&[4u8, 5] as &[u8]).into())
1726            ]
1727            .into_iter()
1728            .collect()
1729        );
1730    }
1731
1732    #[tokio::test]
1733    async fn single_client_external_bytes() {
1734        let mut deployment = Deployment::new();
1735        let mut flow = FlowBuilder::new();
1736        let first_node = flow.process::<()>();
1737        let external = flow.external::<()>();
1738        let (port, input, complete_sink) = first_node
1739            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1740        complete_sink.complete(input.map(q!(|data| {
1741            let mut resp: Vec<u8> = data.into();
1742            resp.push(42);
1743            resp.into() // : Bytes
1744        })));
1745
1746        let nodes = flow
1747            .with_process(&first_node, deployment.Localhost())
1748            .with_external(&external, deployment.Localhost())
1749            .deploy(&mut deployment);
1750
1751        deployment.deploy().await.unwrap();
1752        deployment.start().await.unwrap();
1753
1754        let (mut external_out, mut external_in) = nodes.connect(port).await;
1755
1756        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1757        assert_eq!(
1758            external_out.next().await.unwrap().unwrap(),
1759            vec![1, 2, 3, 42]
1760        );
1761    }
1762
1763    #[tokio::test]
1764    async fn echo_external_bytes() {
1765        let mut deployment = Deployment::new();
1766
1767        let mut flow = FlowBuilder::new();
1768        let first_node = flow.process::<()>();
1769        let external = flow.external::<()>();
1770
1771        let (port, input, _membership, complete_sink) = first_node
1772            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1773        complete_sink
1774            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1775
1776        let nodes = flow
1777            .with_process(&first_node, deployment.Localhost())
1778            .with_external(&external, deployment.Localhost())
1779            .deploy(&mut deployment);
1780
1781        deployment.deploy().await.unwrap();
1782
1783        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1784        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1785
1786        deployment.start().await.unwrap();
1787
1788        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1789        external_in_2.send(vec![4, 5].into()).await.unwrap();
1790
1791        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1792        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1793    }
1794
1795    #[tokio::test]
1796    async fn echo_external_bincode() {
1797        let mut deployment = Deployment::new();
1798
1799        let mut flow = FlowBuilder::new();
1800        let first_node = flow.process::<()>();
1801        let external = flow.external::<()>();
1802
1803        let (port, input, _membership, complete_sink) =
1804            first_node.bidi_external_many_bincode(&external);
1805        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1806
1807        let nodes = flow
1808            .with_process(&first_node, deployment.Localhost())
1809            .with_external(&external, deployment.Localhost())
1810            .deploy(&mut deployment);
1811
1812        deployment.deploy().await.unwrap();
1813
1814        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1815        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1816
1817        deployment.start().await.unwrap();
1818
1819        external_in_1.send("hi".to_owned()).await.unwrap();
1820        external_in_2.send("hello".to_owned()).await.unwrap();
1821
1822        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1823        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1824    }
1825
1826    #[tokio::test]
1827    async fn closure_location_name() {
1828        let mut deployment = Deployment::new();
1829        let mut flow = FlowBuilder::new();
1830
1831        enum ClosureProcess {}
1832
1833        let node = flow.process::<ClosureProcess>();
1834        let external = flow.external::<()>();
1835
1836        let (in_port, input) =
1837            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1838        let out = input.send_bincode_external(&external);
1839
1840        let nodes = flow
1841            .with_process(&node, deployment.Localhost())
1842            .with_external(&external, deployment.Localhost())
1843            .deploy(&mut deployment);
1844
1845        deployment.deploy().await.unwrap();
1846
1847        let mut external_in = nodes.connect(in_port).await;
1848        let mut external_out = nodes.connect(out).await;
1849
1850        deployment.start().await.unwrap();
1851
1852        external_in.send(42).await.unwrap();
1853        assert_eq!(external_out.next().await.unwrap(), 42);
1854    }
1855}