hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::dynamic::LocationId;
40use crate::location::external_process::{
41    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
42};
43use crate::nondet::NonDet;
44#[cfg(feature = "sim")]
45use crate::sim::SimSender;
46use crate::staging_util::get_this_crate;
47
48pub mod dynamic;
49
50#[expect(missing_docs, reason = "TODO")]
51pub mod external_process;
52pub use external_process::External;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod process;
56pub use process::Process;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod cluster;
60pub use cluster::Cluster;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod member_id;
64pub use member_id::{MemberId, TaglessMemberId};
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72    Joined,
73    Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79    Auto,
80    TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89    private_bounds,
90    reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93    type Root: Location<'a>;
94
95    fn root(&self) -> Self::Root;
96
97    fn try_tick(&self) -> Option<Tick<Self>> {
98        if Self::is_top_level() {
99            let next_id = self.flow_state().borrow_mut().next_clock_id;
100            self.flow_state().borrow_mut().next_clock_id += 1;
101            Some(Tick {
102                id: next_id,
103                l: self.clone(),
104            })
105        } else {
106            None
107        }
108    }
109
110    fn id(&self) -> LocationId {
111        dynamic::DynLocation::id(self)
112    }
113
114    fn tick(&self) -> Tick<Self>
115    where
116        Self: NoTick,
117    {
118        let next_id = self.flow_state().borrow_mut().next_clock_id;
119        self.flow_state().borrow_mut().next_clock_id += 1;
120        Tick {
121            id: next_id,
122            l: self.clone(),
123        }
124    }
125
126    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127    where
128        Self: Sized + NoTick,
129    {
130        Stream::new(
131            self.clone(),
132            HydroNode::Source {
133                source: HydroSource::Spin(),
134                metadata: self.new_node_metadata(Stream::<
135                    (),
136                    Self,
137                    Unbounded,
138                    TotalOrder,
139                    ExactlyOnce,
140                >::collection_kind()),
141            },
142        )
143    }
144
145    fn source_stream<T, E>(
146        &self,
147        e: impl QuotedWithContext<'a, E, Self>,
148    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149    where
150        E: FuturesStream<Item = T> + Unpin,
151        Self: Sized + NoTick,
152    {
153        let e = e.splice_untyped_ctx(self);
154
155        Stream::new(
156            self.clone(),
157            HydroNode::Source {
158                source: HydroSource::Stream(e.into()),
159                metadata: self.new_node_metadata(Stream::<
160                    T,
161                    Self,
162                    Unbounded,
163                    TotalOrder,
164                    ExactlyOnce,
165                >::collection_kind()),
166            },
167        )
168    }
169
170    fn source_iter<T, E>(
171        &self,
172        e: impl QuotedWithContext<'a, E, Self>,
173    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174    where
175        E: IntoIterator<Item = T>,
176        Self: Sized + NoTick,
177    {
178        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
179        // for bounded top-level streams, and this is the only way to generate one
180        let e = e.splice_typed_ctx(self);
181
182        Stream::new(
183            self.clone(),
184            HydroNode::Source {
185                source: HydroSource::Iter(e.into()),
186                metadata: self.new_node_metadata(Stream::<
187                    T,
188                    Self,
189                    Unbounded,
190                    TotalOrder,
191                    ExactlyOnce,
192                >::collection_kind()),
193            },
194        )
195    }
196
197    fn source_cluster_members<C: 'a>(
198        &self,
199        cluster: &Cluster<'a, C>,
200    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201    where
202        Self: Sized + NoTick,
203    {
204        Stream::new(
205            self.clone(),
206            HydroNode::Source {
207                source: HydroSource::ClusterMembers(cluster.id()),
208                metadata: self.new_node_metadata(Stream::<
209                    (TaglessMemberId, MembershipEvent),
210                    Self,
211                    Unbounded,
212                    TotalOrder,
213                    ExactlyOnce,
214                >::collection_kind()),
215            },
216        )
217        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
218        .into_keyed()
219    }
220
221    fn source_external_bytes<L>(
222        &self,
223        from: &External<L>,
224    ) -> (
225        ExternalBytesPort,
226        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
227    )
228    where
229        Self: Sized + NoTick,
230    {
231        let (port, stream, sink) =
232            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
233
234        sink.complete(self.source_iter(q!([])));
235
236        (port, stream)
237    }
238
239    #[expect(clippy::type_complexity, reason = "stream markers")]
240    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
241        &self,
242        from: &External<L>,
243    ) -> (
244        ExternalBincodeSink<T, NotMany, O, R>,
245        Stream<T, Self, Unbounded, O, R>,
246    )
247    where
248        Self: Sized + NoTick,
249        T: Serialize + DeserializeOwned,
250    {
251        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
252        sink.complete(self.source_iter(q!([])));
253
254        (
255            ExternalBincodeSink {
256                process_id: from.id,
257                port_id: port.port_id,
258                _phantom: PhantomData,
259            },
260            stream.weaken_ordering().weaken_retries(),
261        )
262    }
263
264    #[cfg(feature = "sim")]
265    #[expect(clippy::type_complexity, reason = "stream markers")]
266    /// Sets up a simulated input port on this location, returning a handle to send messages to
267    /// the location as well as a stream of received messages.
268    fn sim_input<T, O: Ordering, R: Retries>(
269        &self,
270    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
271    where
272        Self: Sized + NoTick,
273        T: Serialize + DeserializeOwned,
274    {
275        let external_location: External<'a, ()> = External {
276            id: 0,
277            flow_state: self.flow_state().clone(),
278            _phantom: PhantomData,
279        };
280
281        let (external, stream) = self.source_external_bincode(&external_location);
282
283        (SimSender(external.port_id, PhantomData), stream)
284    }
285
286    /// Establishes a server on this location to receive a bidirectional connection from a single
287    /// client, identified by the given `External` handle. Returns a port handle for the external
288    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
289    /// messages.
290    ///
291    /// # Example
292    /// ```rust
293    /// # #[cfg(feature = "deploy")] {
294    /// # use hydro_lang::prelude::*;
295    /// # use hydro_deploy::Deployment;
296    /// # use futures::{SinkExt, StreamExt};
297    /// # tokio_test::block_on(async {
298    /// # use bytes::Bytes;
299    /// # use hydro_lang::location::NetworkHint;
300    /// # use tokio_util::codec::LengthDelimitedCodec;
301    /// # let flow = FlowBuilder::new();
302    /// let node = flow.process::<()>();
303    /// let external = flow.external::<()>();
304    /// let (port, incoming, outgoing) =
305    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
306    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
307    ///     let mut resp: Vec<u8> = data.into();
308    ///     resp.push(42);
309    ///     resp.into() // : Bytes
310    /// })));
311    ///
312    /// # let mut deployment = Deployment::new();
313    /// let nodes = flow // ... with_process and with_external
314    /// #     .with_process(&node, deployment.Localhost())
315    /// #     .with_external(&external, deployment.Localhost())
316    /// #     .deploy(&mut deployment);
317    ///
318    /// deployment.deploy().await.unwrap();
319    /// deployment.start().await.unwrap();
320    ///
321    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
322    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
323    /// assert_eq!(
324    ///     external_out.next().await.unwrap().unwrap(),
325    ///     vec![1, 2, 3, 42]
326    /// );
327    /// # });
328    /// # }
329    /// ```
330    #[expect(clippy::type_complexity, reason = "stream markers")]
331    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
332        &self,
333        from: &External<L>,
334        port_hint: NetworkHint,
335    ) -> (
336        ExternalBytesPort<NotMany>,
337        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
338        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
339    )
340    where
341        Self: Sized + NoTick,
342    {
343        let next_external_port_id = {
344            let mut flow_state = from.flow_state.borrow_mut();
345            let id = flow_state.next_external_out;
346            flow_state.next_external_out += 1;
347            id
348        };
349
350        let (fwd_ref, to_sink) =
351            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
352        let mut flow_state_borrow = self.flow_state().borrow_mut();
353
354        flow_state_borrow.push_root(HydroRoot::SendExternal {
355            to_external_id: from.id,
356            to_key: next_external_port_id,
357            to_many: false,
358            unpaired: false,
359            serialize_fn: None,
360            instantiate_fn: DebugInstantiate::Building,
361            input: Box::new(to_sink.ir_node.into_inner()),
362            op_metadata: HydroIrOpMetadata::new(),
363        });
364
365        let raw_stream: Stream<
366            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
367            Self,
368            Unbounded,
369            TotalOrder,
370            ExactlyOnce,
371        > = Stream::new(
372            self.clone(),
373            HydroNode::ExternalInput {
374                from_external_id: from.id,
375                from_key: next_external_port_id,
376                from_many: false,
377                codec_type: quote_type::<Codec>().into(),
378                port_hint,
379                instantiate_fn: DebugInstantiate::Building,
380                deserialize_fn: None,
381                metadata: self.new_node_metadata(Stream::<
382                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
383                    Self,
384                    Unbounded,
385                    TotalOrder,
386                    ExactlyOnce,
387                >::collection_kind()),
388            },
389        );
390
391        (
392            ExternalBytesPort {
393                process_id: from.id,
394                port_id: next_external_port_id,
395                _phantom: PhantomData,
396            },
397            raw_stream.flatten_ordered(),
398            fwd_ref,
399        )
400    }
401
402    #[expect(clippy::type_complexity, reason = "stream markers")]
403    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
404        &self,
405        from: &External<L>,
406    ) -> (
407        ExternalBincodeBidi<InT, OutT, NotMany>,
408        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
409        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
410    )
411    where
412        Self: Sized + NoTick,
413    {
414        let next_external_port_id = {
415            let mut flow_state = from.flow_state.borrow_mut();
416            let id = flow_state.next_external_out;
417            flow_state.next_external_out += 1;
418            id
419        };
420
421        let (fwd_ref, to_sink) =
422            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
423        let mut flow_state_borrow = self.flow_state().borrow_mut();
424
425        let root = get_this_crate();
426
427        let out_t_type = quote_type::<OutT>();
428        let ser_fn: syn::Expr = syn::parse_quote! {
429            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
430                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
431            )
432        };
433
434        flow_state_borrow.push_root(HydroRoot::SendExternal {
435            to_external_id: from.id,
436            to_key: next_external_port_id,
437            to_many: false,
438            unpaired: false,
439            serialize_fn: Some(ser_fn.into()),
440            instantiate_fn: DebugInstantiate::Building,
441            input: Box::new(to_sink.ir_node.into_inner()),
442            op_metadata: HydroIrOpMetadata::new(),
443        });
444
445        let in_t_type = quote_type::<InT>();
446
447        let deser_fn: syn::Expr = syn::parse_quote! {
448            |res| {
449                let b = res.unwrap();
450                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
451            }
452        };
453
454        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
455            self.clone(),
456            HydroNode::ExternalInput {
457                from_external_id: from.id,
458                from_key: next_external_port_id,
459                from_many: false,
460                codec_type: quote_type::<LengthDelimitedCodec>().into(),
461                port_hint: NetworkHint::Auto,
462                instantiate_fn: DebugInstantiate::Building,
463                deserialize_fn: Some(deser_fn.into()),
464                metadata: self.new_node_metadata(Stream::<
465                    InT,
466                    Self,
467                    Unbounded,
468                    TotalOrder,
469                    ExactlyOnce,
470                >::collection_kind()),
471            },
472        );
473
474        (
475            ExternalBincodeBidi {
476                process_id: from.id,
477                port_id: next_external_port_id,
478                _phantom: PhantomData,
479            },
480            raw_stream,
481            fwd_ref,
482        )
483    }
484
485    #[expect(clippy::type_complexity, reason = "stream markers")]
486    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
487        &self,
488        from: &External<L>,
489        port_hint: NetworkHint,
490    ) -> (
491        ExternalBytesPort<Many>,
492        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
493        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
494        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
495    )
496    where
497        Self: Sized + NoTick,
498    {
499        let next_external_port_id = {
500            let mut flow_state = from.flow_state.borrow_mut();
501            let id = flow_state.next_external_out;
502            flow_state.next_external_out += 1;
503            id
504        };
505
506        let (fwd_ref, to_sink) =
507            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
508        let mut flow_state_borrow = self.flow_state().borrow_mut();
509
510        flow_state_borrow.push_root(HydroRoot::SendExternal {
511            to_external_id: from.id,
512            to_key: next_external_port_id,
513            to_many: true,
514            unpaired: false,
515            serialize_fn: None,
516            instantiate_fn: DebugInstantiate::Building,
517            input: Box::new(to_sink.entries().ir_node.into_inner()),
518            op_metadata: HydroIrOpMetadata::new(),
519        });
520
521        let raw_stream: Stream<
522            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
523            Self,
524            Unbounded,
525            TotalOrder,
526            ExactlyOnce,
527        > = Stream::new(
528            self.clone(),
529            HydroNode::ExternalInput {
530                from_external_id: from.id,
531                from_key: next_external_port_id,
532                from_many: true,
533                codec_type: quote_type::<Codec>().into(),
534                port_hint,
535                instantiate_fn: DebugInstantiate::Building,
536                deserialize_fn: None,
537                metadata: self.new_node_metadata(Stream::<
538                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
539                    Self,
540                    Unbounded,
541                    TotalOrder,
542                    ExactlyOnce,
543                >::collection_kind()),
544            },
545        );
546
547        let membership_stream_ident = syn::Ident::new(
548            &format!(
549                "__hydro_deploy_many_{}_{}_membership",
550                from.id, next_external_port_id
551            ),
552            Span::call_site(),
553        );
554        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
555        let raw_membership_stream: KeyedStream<
556            u64,
557            bool,
558            Self,
559            Unbounded,
560            TotalOrder,
561            ExactlyOnce,
562        > = KeyedStream::new(
563            self.clone(),
564            HydroNode::Source {
565                source: HydroSource::Stream(membership_stream_expr.into()),
566                metadata: self.new_node_metadata(KeyedStream::<
567                    u64,
568                    bool,
569                    Self,
570                    Unbounded,
571                    TotalOrder,
572                    ExactlyOnce,
573                >::collection_kind()),
574            },
575        );
576
577        (
578            ExternalBytesPort {
579                process_id: from.id,
580                port_id: next_external_port_id,
581                _phantom: PhantomData,
582            },
583            raw_stream
584                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
585                .into_keyed(),
586            raw_membership_stream.map(q!(|join| {
587                if join {
588                    MembershipEvent::Joined
589                } else {
590                    MembershipEvent::Left
591                }
592            })),
593            fwd_ref,
594        )
595    }
596
597    #[expect(clippy::type_complexity, reason = "stream markers")]
598    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
599        &self,
600        from: &External<L>,
601    ) -> (
602        ExternalBincodeBidi<InT, OutT, Many>,
603        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
604        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
605        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
606    )
607    where
608        Self: Sized + NoTick,
609    {
610        let next_external_port_id = {
611            let mut flow_state = from.flow_state.borrow_mut();
612            let id = flow_state.next_external_out;
613            flow_state.next_external_out += 1;
614            id
615        };
616
617        let root = get_this_crate();
618
619        let (fwd_ref, to_sink) =
620            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
621        let mut flow_state_borrow = self.flow_state().borrow_mut();
622
623        let out_t_type = quote_type::<OutT>();
624        let ser_fn: syn::Expr = syn::parse_quote! {
625            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
626                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
627            )
628        };
629
630        flow_state_borrow.push_root(HydroRoot::SendExternal {
631            to_external_id: from.id,
632            to_key: next_external_port_id,
633            to_many: true,
634            unpaired: false,
635            serialize_fn: Some(ser_fn.into()),
636            instantiate_fn: DebugInstantiate::Building,
637            input: Box::new(to_sink.entries().ir_node.into_inner()),
638            op_metadata: HydroIrOpMetadata::new(),
639        });
640
641        let in_t_type = quote_type::<InT>();
642
643        let deser_fn: syn::Expr = syn::parse_quote! {
644            |res| {
645                let (id, b) = res.unwrap();
646                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
647            }
648        };
649
650        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
651            KeyedStream::new(
652                self.clone(),
653                HydroNode::ExternalInput {
654                    from_external_id: from.id,
655                    from_key: next_external_port_id,
656                    from_many: true,
657                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
658                    port_hint: NetworkHint::Auto,
659                    instantiate_fn: DebugInstantiate::Building,
660                    deserialize_fn: Some(deser_fn.into()),
661                    metadata: self.new_node_metadata(KeyedStream::<
662                        u64,
663                        InT,
664                        Self,
665                        Unbounded,
666                        TotalOrder,
667                        ExactlyOnce,
668                    >::collection_kind()),
669                },
670            );
671
672        let membership_stream_ident = syn::Ident::new(
673            &format!(
674                "__hydro_deploy_many_{}_{}_membership",
675                from.id, next_external_port_id
676            ),
677            Span::call_site(),
678        );
679        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
680        let raw_membership_stream: KeyedStream<
681            u64,
682            bool,
683            Self,
684            Unbounded,
685            TotalOrder,
686            ExactlyOnce,
687        > = KeyedStream::new(
688            self.clone(),
689            HydroNode::Source {
690                source: HydroSource::Stream(membership_stream_expr.into()),
691                metadata: self.new_node_metadata(KeyedStream::<
692                    u64,
693                    bool,
694                    Self,
695                    Unbounded,
696                    TotalOrder,
697                    ExactlyOnce,
698                >::collection_kind()),
699            },
700        );
701
702        (
703            ExternalBincodeBidi {
704                process_id: from.id,
705                port_id: next_external_port_id,
706                _phantom: PhantomData,
707            },
708            raw_stream,
709            raw_membership_stream.map(q!(|join| {
710                if join {
711                    MembershipEvent::Joined
712                } else {
713                    MembershipEvent::Left
714                }
715            })),
716            fwd_ref,
717        )
718    }
719
720    /// Constructs a [`Singleton`] materialized at this location with the given static value.
721    ///
722    /// # Example
723    /// ```rust
724    /// # #[cfg(feature = "deploy")] {
725    /// # use hydro_lang::prelude::*;
726    /// # use futures::StreamExt;
727    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
728    /// let tick = process.tick();
729    /// let singleton = tick.singleton(q!(5));
730    /// # singleton.all_ticks()
731    /// # }, |mut stream| async move {
732    /// // 5
733    /// # assert_eq!(stream.next().await.unwrap(), 5);
734    /// # }));
735    /// # }
736    /// ```
737    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
738    where
739        T: Clone,
740        Self: Sized,
741    {
742        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
743        // for bounded top-level singletons, and this is the only way to generate one
744
745        let e = e.splice_untyped_ctx(self);
746
747        Singleton::new(
748            self.clone(),
749            HydroNode::SingletonSource {
750                value: e.into(),
751                metadata: self
752                    .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
753            },
754        )
755    }
756
757    /// Generates a stream with values emitted at a fixed interval, with
758    /// each value being the current time (as an [`tokio::time::Instant`]).
759    ///
760    /// The clock source used is monotonic, so elements will be emitted in
761    /// increasing order.
762    ///
763    /// # Non-Determinism
764    /// Because this stream is generated by an OS timer, it will be
765    /// non-deterministic because each timestamp will be arbitrary.
766    fn source_interval(
767        &self,
768        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
769        _nondet: NonDet,
770    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
771    where
772        Self: Sized + NoTick,
773    {
774        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
775            tokio::time::interval(interval)
776        )))
777    }
778
779    /// Generates a stream with values emitted at a fixed interval (with an
780    /// initial delay), with each value being the current time
781    /// (as an [`tokio::time::Instant`]).
782    ///
783    /// The clock source used is monotonic, so elements will be emitted in
784    /// increasing order.
785    ///
786    /// # Non-Determinism
787    /// Because this stream is generated by an OS timer, it will be
788    /// non-deterministic because each timestamp will be arbitrary.
789    fn source_interval_delayed(
790        &self,
791        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
792        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
793        _nondet: NonDet,
794    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
795    where
796        Self: Sized + NoTick,
797    {
798        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
799            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
800        )))
801    }
802
803    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
804    where
805        S: CycleCollection<'a, ForwardRef, Location = Self>,
806        Self: NoTick,
807    {
808        let next_id = self.flow_state().borrow_mut().next_cycle_id();
809        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
810
811        (
812            ForwardHandle {
813                completed: false,
814                ident: ident.clone(),
815                expected_location: Location::id(self),
816                _phantom: PhantomData,
817            },
818            S::create_source(ident, self.clone()),
819        )
820    }
821}
822
823#[cfg(feature = "deploy")]
824#[cfg(test)]
825mod tests {
826    use std::collections::HashSet;
827
828    use futures::{SinkExt, StreamExt};
829    use hydro_deploy::Deployment;
830    use stageleft::q;
831    use tokio_util::codec::LengthDelimitedCodec;
832
833    use crate::compile::builder::FlowBuilder;
834    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
835    use crate::location::{Location, NetworkHint};
836    use crate::nondet::nondet;
837
838    #[tokio::test]
839    async fn top_level_singleton_replay_cardinality() {
840        let mut deployment = Deployment::new();
841
842        let flow = FlowBuilder::new();
843        let node = flow.process::<()>();
844        let external = flow.external::<()>();
845
846        let (in_port, input) =
847            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
848        let singleton = node.singleton(q!(123));
849        let tick = node.tick();
850        let out = input
851            .batch(&tick, nondet!(/** test */))
852            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
853            .cross_singleton(
854                singleton
855                    .snapshot(&tick, nondet!(/** test */))
856                    .into_stream()
857                    .count(),
858            )
859            .all_ticks()
860            .send_bincode_external(&external);
861
862        let nodes = flow
863            .with_process(&node, deployment.Localhost())
864            .with_external(&external, deployment.Localhost())
865            .deploy(&mut deployment);
866
867        deployment.deploy().await.unwrap();
868
869        let mut external_in = nodes.connect(in_port).await;
870        let mut external_out = nodes.connect(out).await;
871
872        deployment.start().await.unwrap();
873
874        external_in.send(1).await.unwrap();
875        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
876
877        external_in.send(2).await.unwrap();
878        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
879    }
880
881    #[tokio::test]
882    async fn tick_singleton_replay_cardinality() {
883        let mut deployment = Deployment::new();
884
885        let flow = FlowBuilder::new();
886        let node = flow.process::<()>();
887        let external = flow.external::<()>();
888
889        let (in_port, input) =
890            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
891        let tick = node.tick();
892        let singleton = tick.singleton(q!(123));
893        let out = input
894            .batch(&tick, nondet!(/** test */))
895            .cross_singleton(singleton.clone())
896            .cross_singleton(singleton.into_stream().count())
897            .all_ticks()
898            .send_bincode_external(&external);
899
900        let nodes = flow
901            .with_process(&node, deployment.Localhost())
902            .with_external(&external, deployment.Localhost())
903            .deploy(&mut deployment);
904
905        deployment.deploy().await.unwrap();
906
907        let mut external_in = nodes.connect(in_port).await;
908        let mut external_out = nodes.connect(out).await;
909
910        deployment.start().await.unwrap();
911
912        external_in.send(1).await.unwrap();
913        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
914
915        external_in.send(2).await.unwrap();
916        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
917    }
918
919    #[tokio::test]
920    async fn external_bytes() {
921        let mut deployment = Deployment::new();
922
923        let flow = FlowBuilder::new();
924        let first_node = flow.process::<()>();
925        let external = flow.external::<()>();
926
927        let (in_port, input) = first_node.source_external_bytes(&external);
928        let out = input.send_bincode_external(&external);
929
930        let nodes = flow
931            .with_process(&first_node, deployment.Localhost())
932            .with_external(&external, deployment.Localhost())
933            .deploy(&mut deployment);
934
935        deployment.deploy().await.unwrap();
936
937        let mut external_in = nodes.connect(in_port).await.1;
938        let mut external_out = nodes.connect(out).await;
939
940        deployment.start().await.unwrap();
941
942        external_in.send(vec![1, 2, 3].into()).await.unwrap();
943
944        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
945    }
946
947    #[tokio::test]
948    async fn multi_external_source() {
949        let mut deployment = Deployment::new();
950
951        let flow = FlowBuilder::new();
952        let first_node = flow.process::<()>();
953        let external = flow.external::<()>();
954
955        let (in_port, input, _membership, complete_sink) =
956            first_node.bidi_external_many_bincode(&external);
957        let out = input.entries().send_bincode_external(&external);
958        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
959
960        let nodes = flow
961            .with_process(&first_node, deployment.Localhost())
962            .with_external(&external, deployment.Localhost())
963            .deploy(&mut deployment);
964
965        deployment.deploy().await.unwrap();
966
967        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
968        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
969        let external_out = nodes.connect(out).await;
970
971        deployment.start().await.unwrap();
972
973        external_in_1.send(123).await.unwrap();
974        external_in_2.send(456).await.unwrap();
975
976        assert_eq!(
977            external_out.take(2).collect::<HashSet<_>>().await,
978            vec![(0, 123), (1, 456)].into_iter().collect()
979        );
980    }
981
982    #[tokio::test]
983    async fn second_connection_only_multi_source() {
984        let mut deployment = Deployment::new();
985
986        let flow = FlowBuilder::new();
987        let first_node = flow.process::<()>();
988        let external = flow.external::<()>();
989
990        let (in_port, input, _membership, complete_sink) =
991            first_node.bidi_external_many_bincode(&external);
992        let out = input.entries().send_bincode_external(&external);
993        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
994
995        let nodes = flow
996            .with_process(&first_node, deployment.Localhost())
997            .with_external(&external, deployment.Localhost())
998            .deploy(&mut deployment);
999
1000        deployment.deploy().await.unwrap();
1001
1002        // intentionally skipped to test stream waking logic
1003        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1004        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1005        let mut external_out = nodes.connect(out).await;
1006
1007        deployment.start().await.unwrap();
1008
1009        external_in_2.send(456).await.unwrap();
1010
1011        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1012    }
1013
1014    #[tokio::test]
1015    async fn multi_external_bytes() {
1016        let mut deployment = Deployment::new();
1017
1018        let flow = FlowBuilder::new();
1019        let first_node = flow.process::<()>();
1020        let external = flow.external::<()>();
1021
1022        let (in_port, input, _membership, complete_sink) = first_node
1023            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1024        let out = input.entries().send_bincode_external(&external);
1025        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
1026
1027        let nodes = flow
1028            .with_process(&first_node, deployment.Localhost())
1029            .with_external(&external, deployment.Localhost())
1030            .deploy(&mut deployment);
1031
1032        deployment.deploy().await.unwrap();
1033
1034        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1035        let mut external_in_2 = nodes.connect(in_port).await.1;
1036        let external_out = nodes.connect(out).await;
1037
1038        deployment.start().await.unwrap();
1039
1040        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1041        external_in_2.send(vec![4, 5].into()).await.unwrap();
1042
1043        assert_eq!(
1044            external_out.take(2).collect::<HashSet<_>>().await,
1045            vec![
1046                (0, (&[1u8, 2, 3] as &[u8]).into()),
1047                (1, (&[4u8, 5] as &[u8]).into())
1048            ]
1049            .into_iter()
1050            .collect()
1051        );
1052    }
1053
1054    #[tokio::test]
1055    async fn single_client_external_bytes() {
1056        let mut deployment = Deployment::new();
1057        let flow = FlowBuilder::new();
1058        let first_node = flow.process::<()>();
1059        let external = flow.external::<()>();
1060        let (port, input, complete_sink) = first_node
1061            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1062        complete_sink.complete(input.map(q!(|data| {
1063            let mut resp: Vec<u8> = data.into();
1064            resp.push(42);
1065            resp.into() // : Bytes
1066        })));
1067
1068        let nodes = flow
1069            .with_process(&first_node, deployment.Localhost())
1070            .with_external(&external, deployment.Localhost())
1071            .deploy(&mut deployment);
1072
1073        deployment.deploy().await.unwrap();
1074        deployment.start().await.unwrap();
1075
1076        let (mut external_out, mut external_in) = nodes.connect(port).await;
1077
1078        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1079        assert_eq!(
1080            external_out.next().await.unwrap().unwrap(),
1081            vec![1, 2, 3, 42]
1082        );
1083    }
1084
1085    #[tokio::test]
1086    async fn echo_external_bytes() {
1087        let mut deployment = Deployment::new();
1088
1089        let flow = FlowBuilder::new();
1090        let first_node = flow.process::<()>();
1091        let external = flow.external::<()>();
1092
1093        let (port, input, _membership, complete_sink) = first_node
1094            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1095        complete_sink
1096            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1097
1098        let nodes = flow
1099            .with_process(&first_node, deployment.Localhost())
1100            .with_external(&external, deployment.Localhost())
1101            .deploy(&mut deployment);
1102
1103        deployment.deploy().await.unwrap();
1104
1105        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1106        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1107
1108        deployment.start().await.unwrap();
1109
1110        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1111        external_in_2.send(vec![4, 5].into()).await.unwrap();
1112
1113        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1114        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1115    }
1116
1117    #[tokio::test]
1118    async fn echo_external_bincode() {
1119        let mut deployment = Deployment::new();
1120
1121        let flow = FlowBuilder::new();
1122        let first_node = flow.process::<()>();
1123        let external = flow.external::<()>();
1124
1125        let (port, input, _membership, complete_sink) =
1126            first_node.bidi_external_many_bincode(&external);
1127        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1128
1129        let nodes = flow
1130            .with_process(&first_node, deployment.Localhost())
1131            .with_external(&external, deployment.Localhost())
1132            .deploy(&mut deployment);
1133
1134        deployment.deploy().await.unwrap();
1135
1136        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1137        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1138
1139        deployment.start().await.unwrap();
1140
1141        external_in_1.send("hi".to_string()).await.unwrap();
1142        external_in_2.send("hello".to_string()).await.unwrap();
1143
1144        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1145        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1146    }
1147}