hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::{Bounded, Unbounded};
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::dynamic::LocationId;
40use crate::location::external_process::{
41    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
42};
43use crate::nondet::NonDet;
44#[cfg(feature = "sim")]
45use crate::sim::SimSender;
46use crate::staging_util::get_this_crate;
47
48pub mod dynamic;
49
50#[expect(missing_docs, reason = "TODO")]
51pub mod external_process;
52pub use external_process::External;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod process;
56pub use process::Process;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod cluster;
60pub use cluster::Cluster;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod member_id;
64pub use member_id::{MemberId, TaglessMemberId};
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72    Joined,
73    Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79    Auto,
80    TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89    private_bounds,
90    reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93    type Root: Location<'a>;
94
95    fn root(&self) -> Self::Root;
96
97    fn try_tick(&self) -> Option<Tick<Self>> {
98        if Self::is_top_level() {
99            let next_id = self.flow_state().borrow_mut().next_clock_id;
100            self.flow_state().borrow_mut().next_clock_id += 1;
101            Some(Tick {
102                id: next_id,
103                l: self.clone(),
104            })
105        } else {
106            None
107        }
108    }
109
110    fn id(&self) -> LocationId {
111        dynamic::DynLocation::id(self)
112    }
113
114    fn tick(&self) -> Tick<Self>
115    where
116        Self: NoTick,
117    {
118        let next_id = self.flow_state().borrow_mut().next_clock_id;
119        self.flow_state().borrow_mut().next_clock_id += 1;
120        Tick {
121            id: next_id,
122            l: self.clone(),
123        }
124    }
125
126    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127    where
128        Self: Sized + NoTick,
129    {
130        Stream::new(
131            self.clone(),
132            HydroNode::Source {
133                source: HydroSource::Spin(),
134                metadata: self.new_node_metadata(Stream::<
135                    (),
136                    Self,
137                    Unbounded,
138                    TotalOrder,
139                    ExactlyOnce,
140                >::collection_kind()),
141            },
142        )
143    }
144
145    fn source_stream<T, E>(
146        &self,
147        e: impl QuotedWithContext<'a, E, Self>,
148    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149    where
150        E: FuturesStream<Item = T> + Unpin,
151        Self: Sized + NoTick,
152    {
153        let e = e.splice_untyped_ctx(self);
154
155        Stream::new(
156            self.clone(),
157            HydroNode::Source {
158                source: HydroSource::Stream(e.into()),
159                metadata: self.new_node_metadata(Stream::<
160                    T,
161                    Self,
162                    Unbounded,
163                    TotalOrder,
164                    ExactlyOnce,
165                >::collection_kind()),
166            },
167        )
168    }
169
170    fn source_iter<T, E>(
171        &self,
172        e: impl QuotedWithContext<'a, E, Self>,
173    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
174    where
175        E: IntoIterator<Item = T>,
176        Self: Sized + NoTick,
177    {
178        let e = e.splice_typed_ctx(self);
179
180        Stream::new(
181            self.clone(),
182            HydroNode::Source {
183                source: HydroSource::Iter(e.into()),
184                metadata: self.new_node_metadata(
185                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
186                ),
187            },
188        )
189    }
190
191    fn source_cluster_members<C: 'a>(
192        &self,
193        cluster: &Cluster<'a, C>,
194    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
195    where
196        Self: Sized + NoTick,
197    {
198        Stream::new(
199            self.clone(),
200            HydroNode::Source {
201                source: HydroSource::ClusterMembers(cluster.id()),
202                metadata: self.new_node_metadata(Stream::<
203                    (TaglessMemberId, MembershipEvent),
204                    Self,
205                    Unbounded,
206                    TotalOrder,
207                    ExactlyOnce,
208                >::collection_kind()),
209            },
210        )
211        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
212        .into_keyed()
213    }
214
215    fn source_external_bytes<L>(
216        &self,
217        from: &External<L>,
218    ) -> (
219        ExternalBytesPort,
220        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
221    )
222    where
223        Self: Sized + NoTick,
224    {
225        let (port, stream, sink) =
226            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
227
228        sink.complete(self.source_iter(q!([])));
229
230        (port, stream)
231    }
232
233    #[expect(clippy::type_complexity, reason = "stream markers")]
234    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
235        &self,
236        from: &External<L>,
237    ) -> (
238        ExternalBincodeSink<T, NotMany, O, R>,
239        Stream<T, Self, Unbounded, O, R>,
240    )
241    where
242        Self: Sized + NoTick,
243        T: Serialize + DeserializeOwned,
244    {
245        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
246        sink.complete(self.source_iter(q!([])));
247
248        (
249            ExternalBincodeSink {
250                process_id: from.id,
251                port_id: port.port_id,
252                _phantom: PhantomData,
253            },
254            stream.weaken_ordering().weaken_retries(),
255        )
256    }
257
258    #[cfg(feature = "sim")]
259    #[expect(clippy::type_complexity, reason = "stream markers")]
260    /// Sets up a simulated input port on this location, returning a handle to send messages to
261    /// the location as well as a stream of received messages.
262    fn sim_input<T, O: Ordering, R: Retries>(
263        &self,
264    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
265    where
266        Self: Sized + NoTick,
267        T: Serialize + DeserializeOwned,
268    {
269        let external_location: External<'a, ()> = External {
270            id: 0,
271            flow_state: self.flow_state().clone(),
272            _phantom: PhantomData,
273        };
274
275        let (external, stream) = self.source_external_bincode(&external_location);
276
277        (SimSender(external.port_id, PhantomData), stream)
278    }
279
280    /// Establishes a server on this location to receive a bidirectional connection from a single
281    /// client, identified by the given `External` handle. Returns a port handle for the external
282    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
283    /// messages.
284    ///
285    /// # Example
286    /// ```rust
287    /// # #[cfg(feature = "deploy")] {
288    /// # use hydro_lang::prelude::*;
289    /// # use hydro_deploy::Deployment;
290    /// # use futures::{SinkExt, StreamExt};
291    /// # tokio_test::block_on(async {
292    /// # use bytes::Bytes;
293    /// # use hydro_lang::location::NetworkHint;
294    /// # use tokio_util::codec::LengthDelimitedCodec;
295    /// # let flow = FlowBuilder::new();
296    /// let node = flow.process::<()>();
297    /// let external = flow.external::<()>();
298    /// let (port, incoming, outgoing) =
299    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
300    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
301    ///     let mut resp: Vec<u8> = data.into();
302    ///     resp.push(42);
303    ///     resp.into() // : Bytes
304    /// })));
305    ///
306    /// # let mut deployment = Deployment::new();
307    /// let nodes = flow // ... with_process and with_external
308    /// #     .with_process(&node, deployment.Localhost())
309    /// #     .with_external(&external, deployment.Localhost())
310    /// #     .deploy(&mut deployment);
311    ///
312    /// deployment.deploy().await.unwrap();
313    /// deployment.start().await.unwrap();
314    ///
315    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
316    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
317    /// assert_eq!(
318    ///     external_out.next().await.unwrap().unwrap(),
319    ///     vec![1, 2, 3, 42]
320    /// );
321    /// # });
322    /// # }
323    /// ```
324    #[expect(clippy::type_complexity, reason = "stream markers")]
325    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
326        &self,
327        from: &External<L>,
328        port_hint: NetworkHint,
329    ) -> (
330        ExternalBytesPort<NotMany>,
331        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
332        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
333    )
334    where
335        Self: Sized + NoTick,
336    {
337        let next_external_port_id = from
338            .flow_state
339            .borrow_mut()
340            .next_external_port
341            .get_and_increment();
342
343        let (fwd_ref, to_sink) =
344            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
345        let mut flow_state_borrow = self.flow_state().borrow_mut();
346
347        flow_state_borrow.push_root(HydroRoot::SendExternal {
348            to_external_id: from.id,
349            to_port_id: next_external_port_id,
350            to_many: false,
351            unpaired: false,
352            serialize_fn: None,
353            instantiate_fn: DebugInstantiate::Building,
354            input: Box::new(to_sink.ir_node.into_inner()),
355            op_metadata: HydroIrOpMetadata::new(),
356        });
357
358        let raw_stream: Stream<
359            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
360            Self,
361            Unbounded,
362            TotalOrder,
363            ExactlyOnce,
364        > = Stream::new(
365            self.clone(),
366            HydroNode::ExternalInput {
367                from_external_id: from.id,
368                from_port_id: next_external_port_id,
369                from_many: false,
370                codec_type: quote_type::<Codec>().into(),
371                port_hint,
372                instantiate_fn: DebugInstantiate::Building,
373                deserialize_fn: None,
374                metadata: self.new_node_metadata(Stream::<
375                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
376                    Self,
377                    Unbounded,
378                    TotalOrder,
379                    ExactlyOnce,
380                >::collection_kind()),
381            },
382        );
383
384        (
385            ExternalBytesPort {
386                process_id: from.id,
387                port_id: next_external_port_id,
388                _phantom: PhantomData,
389            },
390            raw_stream.flatten_ordered(),
391            fwd_ref,
392        )
393    }
394
395    #[expect(clippy::type_complexity, reason = "stream markers")]
396    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
397        &self,
398        from: &External<L>,
399    ) -> (
400        ExternalBincodeBidi<InT, OutT, NotMany>,
401        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
402        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
403    )
404    where
405        Self: Sized + NoTick,
406    {
407        let next_external_port_id = from
408            .flow_state
409            .borrow_mut()
410            .next_external_port
411            .get_and_increment();
412
413        let (fwd_ref, to_sink) =
414            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
415        let mut flow_state_borrow = self.flow_state().borrow_mut();
416
417        let root = get_this_crate();
418
419        let out_t_type = quote_type::<OutT>();
420        let ser_fn: syn::Expr = syn::parse_quote! {
421            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
422                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
423            )
424        };
425
426        flow_state_borrow.push_root(HydroRoot::SendExternal {
427            to_external_id: from.id,
428            to_port_id: next_external_port_id,
429            to_many: false,
430            unpaired: false,
431            serialize_fn: Some(ser_fn.into()),
432            instantiate_fn: DebugInstantiate::Building,
433            input: Box::new(to_sink.ir_node.into_inner()),
434            op_metadata: HydroIrOpMetadata::new(),
435        });
436
437        let in_t_type = quote_type::<InT>();
438
439        let deser_fn: syn::Expr = syn::parse_quote! {
440            |res| {
441                let b = res.unwrap();
442                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
443            }
444        };
445
446        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
447            self.clone(),
448            HydroNode::ExternalInput {
449                from_external_id: from.id,
450                from_port_id: next_external_port_id,
451                from_many: false,
452                codec_type: quote_type::<LengthDelimitedCodec>().into(),
453                port_hint: NetworkHint::Auto,
454                instantiate_fn: DebugInstantiate::Building,
455                deserialize_fn: Some(deser_fn.into()),
456                metadata: self.new_node_metadata(Stream::<
457                    InT,
458                    Self,
459                    Unbounded,
460                    TotalOrder,
461                    ExactlyOnce,
462                >::collection_kind()),
463            },
464        );
465
466        (
467            ExternalBincodeBidi {
468                process_id: from.id,
469                port_id: next_external_port_id,
470                _phantom: PhantomData,
471            },
472            raw_stream,
473            fwd_ref,
474        )
475    }
476
477    #[expect(clippy::type_complexity, reason = "stream markers")]
478    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
479        &self,
480        from: &External<L>,
481        port_hint: NetworkHint,
482    ) -> (
483        ExternalBytesPort<Many>,
484        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
485        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
486        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
487    )
488    where
489        Self: Sized + NoTick,
490    {
491        let next_external_port_id = from
492            .flow_state
493            .borrow_mut()
494            .next_external_port
495            .get_and_increment();
496
497        let (fwd_ref, to_sink) =
498            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
499        let mut flow_state_borrow = self.flow_state().borrow_mut();
500
501        flow_state_borrow.push_root(HydroRoot::SendExternal {
502            to_external_id: from.id,
503            to_port_id: next_external_port_id,
504            to_many: true,
505            unpaired: false,
506            serialize_fn: None,
507            instantiate_fn: DebugInstantiate::Building,
508            input: Box::new(to_sink.entries().ir_node.into_inner()),
509            op_metadata: HydroIrOpMetadata::new(),
510        });
511
512        let raw_stream: Stream<
513            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
514            Self,
515            Unbounded,
516            TotalOrder,
517            ExactlyOnce,
518        > = Stream::new(
519            self.clone(),
520            HydroNode::ExternalInput {
521                from_external_id: from.id,
522                from_port_id: next_external_port_id,
523                from_many: true,
524                codec_type: quote_type::<Codec>().into(),
525                port_hint,
526                instantiate_fn: DebugInstantiate::Building,
527                deserialize_fn: None,
528                metadata: self.new_node_metadata(Stream::<
529                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
530                    Self,
531                    Unbounded,
532                    TotalOrder,
533                    ExactlyOnce,
534                >::collection_kind()),
535            },
536        );
537
538        let membership_stream_ident = syn::Ident::new(
539            &format!(
540                "__hydro_deploy_many_{}_{}_membership",
541                from.id, next_external_port_id
542            ),
543            Span::call_site(),
544        );
545        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
546        let raw_membership_stream: KeyedStream<
547            u64,
548            bool,
549            Self,
550            Unbounded,
551            TotalOrder,
552            ExactlyOnce,
553        > = KeyedStream::new(
554            self.clone(),
555            HydroNode::Source {
556                source: HydroSource::Stream(membership_stream_expr.into()),
557                metadata: self.new_node_metadata(KeyedStream::<
558                    u64,
559                    bool,
560                    Self,
561                    Unbounded,
562                    TotalOrder,
563                    ExactlyOnce,
564                >::collection_kind()),
565            },
566        );
567
568        (
569            ExternalBytesPort {
570                process_id: from.id,
571                port_id: next_external_port_id,
572                _phantom: PhantomData,
573            },
574            raw_stream
575                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
576                .into_keyed(),
577            raw_membership_stream.map(q!(|join| {
578                if join {
579                    MembershipEvent::Joined
580                } else {
581                    MembershipEvent::Left
582                }
583            })),
584            fwd_ref,
585        )
586    }
587
588    #[expect(clippy::type_complexity, reason = "stream markers")]
589    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
590        &self,
591        from: &External<L>,
592    ) -> (
593        ExternalBincodeBidi<InT, OutT, Many>,
594        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
595        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
596        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
597    )
598    where
599        Self: Sized + NoTick,
600    {
601        let next_external_port_id = from
602            .flow_state
603            .borrow_mut()
604            .next_external_port
605            .get_and_increment();
606
607        let (fwd_ref, to_sink) =
608            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
609        let mut flow_state_borrow = self.flow_state().borrow_mut();
610
611        let root = get_this_crate();
612
613        let out_t_type = quote_type::<OutT>();
614        let ser_fn: syn::Expr = syn::parse_quote! {
615            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
616                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
617            )
618        };
619
620        flow_state_borrow.push_root(HydroRoot::SendExternal {
621            to_external_id: from.id,
622            to_port_id: next_external_port_id,
623            to_many: true,
624            unpaired: false,
625            serialize_fn: Some(ser_fn.into()),
626            instantiate_fn: DebugInstantiate::Building,
627            input: Box::new(to_sink.entries().ir_node.into_inner()),
628            op_metadata: HydroIrOpMetadata::new(),
629        });
630
631        let in_t_type = quote_type::<InT>();
632
633        let deser_fn: syn::Expr = syn::parse_quote! {
634            |res| {
635                let (id, b) = res.unwrap();
636                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
637            }
638        };
639
640        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
641            KeyedStream::new(
642                self.clone(),
643                HydroNode::ExternalInput {
644                    from_external_id: from.id,
645                    from_port_id: next_external_port_id,
646                    from_many: true,
647                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
648                    port_hint: NetworkHint::Auto,
649                    instantiate_fn: DebugInstantiate::Building,
650                    deserialize_fn: Some(deser_fn.into()),
651                    metadata: self.new_node_metadata(KeyedStream::<
652                        u64,
653                        InT,
654                        Self,
655                        Unbounded,
656                        TotalOrder,
657                        ExactlyOnce,
658                    >::collection_kind()),
659                },
660            );
661
662        let membership_stream_ident = syn::Ident::new(
663            &format!(
664                "__hydro_deploy_many_{}_{}_membership",
665                from.id, next_external_port_id
666            ),
667            Span::call_site(),
668        );
669        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
670        let raw_membership_stream: KeyedStream<
671            u64,
672            bool,
673            Self,
674            Unbounded,
675            TotalOrder,
676            ExactlyOnce,
677        > = KeyedStream::new(
678            self.clone(),
679            HydroNode::Source {
680                source: HydroSource::Stream(membership_stream_expr.into()),
681                metadata: self.new_node_metadata(KeyedStream::<
682                    u64,
683                    bool,
684                    Self,
685                    Unbounded,
686                    TotalOrder,
687                    ExactlyOnce,
688                >::collection_kind()),
689            },
690        );
691
692        (
693            ExternalBincodeBidi {
694                process_id: from.id,
695                port_id: next_external_port_id,
696                _phantom: PhantomData,
697            },
698            raw_stream,
699            raw_membership_stream.map(q!(|join| {
700                if join {
701                    MembershipEvent::Joined
702                } else {
703                    MembershipEvent::Left
704                }
705            })),
706            fwd_ref,
707        )
708    }
709
710    /// Constructs a [`Singleton`] materialized at this location with the given static value.
711    ///
712    /// # Example
713    /// ```rust
714    /// # #[cfg(feature = "deploy")] {
715    /// # use hydro_lang::prelude::*;
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718    /// let tick = process.tick();
719    /// let singleton = tick.singleton(q!(5));
720    /// # singleton.all_ticks()
721    /// # }, |mut stream| async move {
722    /// // 5
723    /// # assert_eq!(stream.next().await.unwrap(), 5);
724    /// # }));
725    /// # }
726    /// ```
727    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
728    where
729        T: Clone,
730        Self: Sized,
731    {
732        let e = e.splice_untyped_ctx(self);
733
734        Singleton::new(
735            self.clone(),
736            HydroNode::SingletonSource {
737                value: e.into(),
738                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
739            },
740        )
741    }
742
743    /// Generates a stream with values emitted at a fixed interval, with
744    /// each value being the current time (as an [`tokio::time::Instant`]).
745    ///
746    /// The clock source used is monotonic, so elements will be emitted in
747    /// increasing order.
748    ///
749    /// # Non-Determinism
750    /// Because this stream is generated by an OS timer, it will be
751    /// non-deterministic because each timestamp will be arbitrary.
752    fn source_interval(
753        &self,
754        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
755        _nondet: NonDet,
756    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
757    where
758        Self: Sized + NoTick,
759    {
760        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
761            tokio::time::interval(interval)
762        )))
763    }
764
765    /// Generates a stream with values emitted at a fixed interval (with an
766    /// initial delay), with each value being the current time
767    /// (as an [`tokio::time::Instant`]).
768    ///
769    /// The clock source used is monotonic, so elements will be emitted in
770    /// increasing order.
771    ///
772    /// # Non-Determinism
773    /// Because this stream is generated by an OS timer, it will be
774    /// non-deterministic because each timestamp will be arbitrary.
775    fn source_interval_delayed(
776        &self,
777        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
778        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
779        _nondet: NonDet,
780    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
781    where
782        Self: Sized + NoTick,
783    {
784        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
785            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
786        )))
787    }
788
789    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
790    where
791        S: CycleCollection<'a, ForwardRef, Location = Self>,
792    {
793        let next_id = self.flow_state().borrow_mut().next_cycle_id();
794        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
795
796        (
797            ForwardHandle {
798                completed: false,
799                ident: ident.clone(),
800                expected_location: Location::id(self),
801                _phantom: PhantomData,
802            },
803            S::create_source(ident, self.clone()),
804        )
805    }
806}
807
808#[cfg(feature = "deploy")]
809#[cfg(test)]
810mod tests {
811    use std::collections::HashSet;
812
813    use futures::{SinkExt, StreamExt};
814    use hydro_deploy::Deployment;
815    use stageleft::q;
816    use tokio_util::codec::LengthDelimitedCodec;
817
818    use crate::compile::builder::FlowBuilder;
819    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
820    use crate::location::{Location, NetworkHint};
821    use crate::nondet::nondet;
822
823    #[tokio::test]
824    async fn top_level_singleton_replay_cardinality() {
825        let mut deployment = Deployment::new();
826
827        let flow = FlowBuilder::new();
828        let node = flow.process::<()>();
829        let external = flow.external::<()>();
830
831        let (in_port, input) =
832            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
833        let singleton = node.singleton(q!(123));
834        let tick = node.tick();
835        let out = input
836            .batch(&tick, nondet!(/** test */))
837            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
838            .cross_singleton(
839                singleton
840                    .snapshot(&tick, nondet!(/** test */))
841                    .into_stream()
842                    .count(),
843            )
844            .all_ticks()
845            .send_bincode_external(&external);
846
847        let nodes = flow
848            .with_process(&node, deployment.Localhost())
849            .with_external(&external, deployment.Localhost())
850            .deploy(&mut deployment);
851
852        deployment.deploy().await.unwrap();
853
854        let mut external_in = nodes.connect(in_port).await;
855        let mut external_out = nodes.connect(out).await;
856
857        deployment.start().await.unwrap();
858
859        external_in.send(1).await.unwrap();
860        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
861
862        external_in.send(2).await.unwrap();
863        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
864    }
865
866    #[tokio::test]
867    async fn tick_singleton_replay_cardinality() {
868        let mut deployment = Deployment::new();
869
870        let flow = FlowBuilder::new();
871        let node = flow.process::<()>();
872        let external = flow.external::<()>();
873
874        let (in_port, input) =
875            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
876        let tick = node.tick();
877        let singleton = tick.singleton(q!(123));
878        let out = input
879            .batch(&tick, nondet!(/** test */))
880            .cross_singleton(singleton.clone())
881            .cross_singleton(singleton.into_stream().count())
882            .all_ticks()
883            .send_bincode_external(&external);
884
885        let nodes = flow
886            .with_process(&node, deployment.Localhost())
887            .with_external(&external, deployment.Localhost())
888            .deploy(&mut deployment);
889
890        deployment.deploy().await.unwrap();
891
892        let mut external_in = nodes.connect(in_port).await;
893        let mut external_out = nodes.connect(out).await;
894
895        deployment.start().await.unwrap();
896
897        external_in.send(1).await.unwrap();
898        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
899
900        external_in.send(2).await.unwrap();
901        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
902    }
903
904    #[tokio::test]
905    async fn external_bytes() {
906        let mut deployment = Deployment::new();
907
908        let flow = FlowBuilder::new();
909        let first_node = flow.process::<()>();
910        let external = flow.external::<()>();
911
912        let (in_port, input) = first_node.source_external_bytes(&external);
913        let out = input.send_bincode_external(&external);
914
915        let nodes = flow
916            .with_process(&first_node, deployment.Localhost())
917            .with_external(&external, deployment.Localhost())
918            .deploy(&mut deployment);
919
920        deployment.deploy().await.unwrap();
921
922        let mut external_in = nodes.connect(in_port).await.1;
923        let mut external_out = nodes.connect(out).await;
924
925        deployment.start().await.unwrap();
926
927        external_in.send(vec![1, 2, 3].into()).await.unwrap();
928
929        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
930    }
931
932    #[tokio::test]
933    async fn multi_external_source() {
934        let mut deployment = Deployment::new();
935
936        let flow = FlowBuilder::new();
937        let first_node = flow.process::<()>();
938        let external = flow.external::<()>();
939
940        let (in_port, input, _membership, complete_sink) =
941            first_node.bidi_external_many_bincode(&external);
942        let out = input.entries().send_bincode_external(&external);
943        complete_sink.complete(
944            first_node
945                .source_iter::<(u64, ()), _>(q!([]))
946                .into_keyed()
947                .weakest_ordering(),
948        );
949
950        let nodes = flow
951            .with_process(&first_node, deployment.Localhost())
952            .with_external(&external, deployment.Localhost())
953            .deploy(&mut deployment);
954
955        deployment.deploy().await.unwrap();
956
957        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
958        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
959        let external_out = nodes.connect(out).await;
960
961        deployment.start().await.unwrap();
962
963        external_in_1.send(123).await.unwrap();
964        external_in_2.send(456).await.unwrap();
965
966        assert_eq!(
967            external_out.take(2).collect::<HashSet<_>>().await,
968            vec![(0, 123), (1, 456)].into_iter().collect()
969        );
970    }
971
972    #[tokio::test]
973    async fn second_connection_only_multi_source() {
974        let mut deployment = Deployment::new();
975
976        let flow = FlowBuilder::new();
977        let first_node = flow.process::<()>();
978        let external = flow.external::<()>();
979
980        let (in_port, input, _membership, complete_sink) =
981            first_node.bidi_external_many_bincode(&external);
982        let out = input.entries().send_bincode_external(&external);
983        complete_sink.complete(
984            first_node
985                .source_iter::<(u64, ()), _>(q!([]))
986                .into_keyed()
987                .weakest_ordering(),
988        );
989
990        let nodes = flow
991            .with_process(&first_node, deployment.Localhost())
992            .with_external(&external, deployment.Localhost())
993            .deploy(&mut deployment);
994
995        deployment.deploy().await.unwrap();
996
997        // intentionally skipped to test stream waking logic
998        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
999        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1000        let mut external_out = nodes.connect(out).await;
1001
1002        deployment.start().await.unwrap();
1003
1004        external_in_2.send(456).await.unwrap();
1005
1006        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1007    }
1008
1009    #[tokio::test]
1010    async fn multi_external_bytes() {
1011        let mut deployment = Deployment::new();
1012
1013        let flow = FlowBuilder::new();
1014        let first_node = flow.process::<()>();
1015        let external = flow.external::<()>();
1016
1017        let (in_port, input, _membership, complete_sink) = first_node
1018            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1019        let out = input.entries().send_bincode_external(&external);
1020        complete_sink.complete(
1021            first_node
1022                .source_iter(q!([]))
1023                .into_keyed()
1024                .weakest_ordering(),
1025        );
1026
1027        let nodes = flow
1028            .with_process(&first_node, deployment.Localhost())
1029            .with_external(&external, deployment.Localhost())
1030            .deploy(&mut deployment);
1031
1032        deployment.deploy().await.unwrap();
1033
1034        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1035        let mut external_in_2 = nodes.connect(in_port).await.1;
1036        let external_out = nodes.connect(out).await;
1037
1038        deployment.start().await.unwrap();
1039
1040        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1041        external_in_2.send(vec![4, 5].into()).await.unwrap();
1042
1043        assert_eq!(
1044            external_out.take(2).collect::<HashSet<_>>().await,
1045            vec![
1046                (0, (&[1u8, 2, 3] as &[u8]).into()),
1047                (1, (&[4u8, 5] as &[u8]).into())
1048            ]
1049            .into_iter()
1050            .collect()
1051        );
1052    }
1053
1054    #[tokio::test]
1055    async fn single_client_external_bytes() {
1056        let mut deployment = Deployment::new();
1057        let flow = FlowBuilder::new();
1058        let first_node = flow.process::<()>();
1059        let external = flow.external::<()>();
1060        let (port, input, complete_sink) = first_node
1061            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1062        complete_sink.complete(input.map(q!(|data| {
1063            let mut resp: Vec<u8> = data.into();
1064            resp.push(42);
1065            resp.into() // : Bytes
1066        })));
1067
1068        let nodes = flow
1069            .with_process(&first_node, deployment.Localhost())
1070            .with_external(&external, deployment.Localhost())
1071            .deploy(&mut deployment);
1072
1073        deployment.deploy().await.unwrap();
1074        deployment.start().await.unwrap();
1075
1076        let (mut external_out, mut external_in) = nodes.connect(port).await;
1077
1078        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1079        assert_eq!(
1080            external_out.next().await.unwrap().unwrap(),
1081            vec![1, 2, 3, 42]
1082        );
1083    }
1084
1085    #[tokio::test]
1086    async fn echo_external_bytes() {
1087        let mut deployment = Deployment::new();
1088
1089        let flow = FlowBuilder::new();
1090        let first_node = flow.process::<()>();
1091        let external = flow.external::<()>();
1092
1093        let (port, input, _membership, complete_sink) = first_node
1094            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1095        complete_sink
1096            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1097
1098        let nodes = flow
1099            .with_process(&first_node, deployment.Localhost())
1100            .with_external(&external, deployment.Localhost())
1101            .deploy(&mut deployment);
1102
1103        deployment.deploy().await.unwrap();
1104
1105        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1106        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1107
1108        deployment.start().await.unwrap();
1109
1110        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1111        external_in_2.send(vec![4, 5].into()).await.unwrap();
1112
1113        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1114        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1115    }
1116
1117    #[tokio::test]
1118    async fn echo_external_bincode() {
1119        let mut deployment = Deployment::new();
1120
1121        let flow = FlowBuilder::new();
1122        let first_node = flow.process::<()>();
1123        let external = flow.external::<()>();
1124
1125        let (port, input, _membership, complete_sink) =
1126            first_node.bidi_external_many_bincode(&external);
1127        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1128
1129        let nodes = flow
1130            .with_process(&first_node, deployment.Localhost())
1131            .with_external(&external, deployment.Localhost())
1132            .deploy(&mut deployment);
1133
1134        deployment.deploy().await.unwrap();
1135
1136        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1137        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1138
1139        deployment.start().await.unwrap();
1140
1141        external_in_1.send("hi".to_string()).await.unwrap();
1142        external_in_2.send("hello".to_string()).await.unwrap();
1143
1144        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1145        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1146    }
1147}