hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::cluster::ClusterIds;
40use crate::location::dynamic::LocationId;
41use crate::location::external_process::{
42    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
43};
44use crate::nondet::NonDet;
45use crate::staging_util::get_this_crate;
46
47pub mod dynamic;
48
49#[expect(missing_docs, reason = "TODO")]
50pub mod external_process;
51pub use external_process::External;
52
53#[expect(missing_docs, reason = "TODO")]
54pub mod process;
55pub use process::Process;
56
57#[expect(missing_docs, reason = "TODO")]
58pub mod cluster;
59pub use cluster::Cluster;
60
61#[expect(missing_docs, reason = "TODO")]
62pub mod member_id;
63pub use member_id::MemberId;
64
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72    Joined,
73    Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79    Auto,
80    TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89    private_bounds,
90    reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93    type Root: Location<'a>;
94
95    fn root(&self) -> Self::Root;
96
97    fn try_tick(&self) -> Option<Tick<Self>> {
98        if Self::is_top_level() {
99            let next_id = self.flow_state().borrow_mut().next_clock_id;
100            self.flow_state().borrow_mut().next_clock_id += 1;
101            Some(Tick {
102                id: next_id,
103                l: self.clone(),
104            })
105        } else {
106            None
107        }
108    }
109
110    fn id(&self) -> LocationId {
111        dynamic::DynLocation::id(self)
112    }
113
114    fn tick(&self) -> Tick<Self>
115    where
116        Self: NoTick,
117    {
118        let next_id = self.flow_state().borrow_mut().next_clock_id;
119        self.flow_state().borrow_mut().next_clock_id += 1;
120        Tick {
121            id: next_id,
122            l: self.clone(),
123        }
124    }
125
126    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127    where
128        Self: Sized + NoTick,
129    {
130        Stream::new(
131            self.clone(),
132            HydroNode::Source {
133                source: HydroSource::Spin(),
134                metadata: self.new_node_metadata(Stream::<
135                    (),
136                    Self,
137                    Unbounded,
138                    TotalOrder,
139                    ExactlyOnce,
140                >::collection_kind()),
141            },
142        )
143    }
144
145    fn source_stream<T, E>(
146        &self,
147        e: impl QuotedWithContext<'a, E, Self>,
148    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149    where
150        E: FuturesStream<Item = T> + Unpin,
151        Self: Sized + NoTick,
152    {
153        let e = e.splice_untyped_ctx(self);
154
155        Stream::new(
156            self.clone(),
157            HydroNode::Source {
158                source: HydroSource::Stream(e.into()),
159                metadata: self.new_node_metadata(Stream::<
160                    T,
161                    Self,
162                    Unbounded,
163                    TotalOrder,
164                    ExactlyOnce,
165                >::collection_kind()),
166            },
167        )
168    }
169
170    fn source_iter<T, E>(
171        &self,
172        e: impl QuotedWithContext<'a, E, Self>,
173    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174    where
175        E: IntoIterator<Item = T>,
176        Self: Sized + NoTick,
177    {
178        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
179        // for bounded top-level streams, and this is the only way to generate one
180        let e = e.splice_typed_ctx(self);
181
182        Stream::new(
183            self.clone(),
184            HydroNode::Source {
185                source: HydroSource::Iter(e.into()),
186                metadata: self.new_node_metadata(Stream::<
187                    T,
188                    Self,
189                    Unbounded,
190                    TotalOrder,
191                    ExactlyOnce,
192                >::collection_kind()),
193            },
194        )
195    }
196
197    fn source_cluster_members<C: 'a>(
198        &self,
199        cluster: &Cluster<'a, C>,
200    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201    where
202        Self: Sized + NoTick,
203    {
204        let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
205            id: cluster.id,
206            _phantom: PhantomData,
207        };
208
209        self.source_iter(q!(underlying_memberids))
210            .map(q!(|id| (*id, MembershipEvent::Joined)))
211            .into_keyed()
212    }
213
214    fn source_external_bytes<L>(
215        &self,
216        from: &External<L>,
217    ) -> (
218        ExternalBytesPort,
219        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
220    )
221    where
222        Self: Sized + NoTick,
223    {
224        let (port, stream, sink) =
225            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
226
227        sink.complete(self.source_iter(q!([])));
228
229        (port, stream)
230    }
231
232    #[expect(clippy::type_complexity, reason = "stream markers")]
233    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
234        &self,
235        from: &External<L>,
236    ) -> (
237        ExternalBincodeSink<T, NotMany, O, R>,
238        Stream<T, Self, Unbounded, O, R>,
239    )
240    where
241        Self: Sized + NoTick,
242        T: Serialize + DeserializeOwned,
243    {
244        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
245        sink.complete(self.source_iter(q!([])));
246
247        (
248            ExternalBincodeSink {
249                process_id: from.id,
250                port_id: port.port_id,
251                _phantom: PhantomData,
252            },
253            stream.weaken_ordering().weaken_retries(),
254        )
255    }
256
257    /// Establishes a server on this location to receive a bidirectional connection from a single
258    /// client, identified by the given `External` handle. Returns a port handle for the external
259    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
260    /// messages.
261    ///
262    /// # Example
263    /// ```rust
264    /// # use hydro_lang::prelude::*;
265    /// # use hydro_deploy::Deployment;
266    /// # use futures::{SinkExt, StreamExt};
267    /// # tokio_test::block_on(async {
268    /// # use bytes::Bytes;
269    /// # use hydro_lang::location::NetworkHint;
270    /// # use tokio_util::codec::LengthDelimitedCodec;
271    /// # let flow = FlowBuilder::new();
272    /// let node = flow.process::<()>();
273    /// let external = flow.external::<()>();
274    /// let (port, incoming, outgoing) =
275    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
276    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
277    ///     let mut resp: Vec<u8> = data.into();
278    ///     resp.push(42);
279    ///     resp.into() // : Bytes
280    /// })));
281    ///
282    /// # let mut deployment = Deployment::new();
283    /// let nodes = flow // ... with_process and with_external
284    /// #     .with_process(&node, deployment.Localhost())
285    /// #     .with_external(&external, deployment.Localhost())
286    /// #     .deploy(&mut deployment);
287    ///
288    /// deployment.deploy().await.unwrap();
289    /// deployment.start().await.unwrap();
290    ///
291    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
292    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
293    /// assert_eq!(
294    ///     external_out.next().await.unwrap().unwrap(),
295    ///     vec![1, 2, 3, 42]
296    /// );
297    /// # });
298    /// ```
299    #[expect(clippy::type_complexity, reason = "stream markers")]
300    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
301        &self,
302        from: &External<L>,
303        port_hint: NetworkHint,
304    ) -> (
305        ExternalBytesPort<NotMany>,
306        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
307        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
308    )
309    where
310        Self: Sized + NoTick,
311    {
312        let next_external_port_id = {
313            let mut flow_state = from.flow_state.borrow_mut();
314            let id = flow_state.next_external_out;
315            flow_state.next_external_out += 1;
316            id
317        };
318
319        let (fwd_ref, to_sink) =
320            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
321        let mut flow_state_borrow = self.flow_state().borrow_mut();
322
323        flow_state_borrow.push_root(HydroRoot::SendExternal {
324            to_external_id: from.id,
325            to_key: next_external_port_id,
326            to_many: false,
327            unpaired: false,
328            serialize_fn: None,
329            instantiate_fn: DebugInstantiate::Building,
330            input: Box::new(to_sink.ir_node.into_inner()),
331            op_metadata: HydroIrOpMetadata::new(),
332        });
333
334        let raw_stream: Stream<
335            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
336            Self,
337            Unbounded,
338            TotalOrder,
339            ExactlyOnce,
340        > = Stream::new(
341            self.clone(),
342            HydroNode::ExternalInput {
343                from_external_id: from.id,
344                from_key: next_external_port_id,
345                from_many: false,
346                codec_type: quote_type::<Codec>().into(),
347                port_hint,
348                instantiate_fn: DebugInstantiate::Building,
349                deserialize_fn: None,
350                metadata: self.new_node_metadata(Stream::<
351                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
352                    Self,
353                    Unbounded,
354                    TotalOrder,
355                    ExactlyOnce,
356                >::collection_kind()),
357            },
358        );
359
360        (
361            ExternalBytesPort {
362                process_id: from.id,
363                port_id: next_external_port_id,
364                _phantom: PhantomData,
365            },
366            raw_stream.flatten_ordered(),
367            fwd_ref,
368        )
369    }
370
371    #[expect(clippy::type_complexity, reason = "stream markers")]
372    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
373        &self,
374        from: &External<L>,
375    ) -> (
376        ExternalBincodeBidi<InT, OutT, NotMany>,
377        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
378        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
379    )
380    where
381        Self: Sized + NoTick,
382    {
383        let next_external_port_id = {
384            let mut flow_state = from.flow_state.borrow_mut();
385            let id = flow_state.next_external_out;
386            flow_state.next_external_out += 1;
387            id
388        };
389
390        let (fwd_ref, to_sink) =
391            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
392        let mut flow_state_borrow = self.flow_state().borrow_mut();
393
394        let root = get_this_crate();
395
396        let out_t_type = quote_type::<OutT>();
397        let ser_fn: syn::Expr = syn::parse_quote! {
398            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
399                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
400            )
401        };
402
403        flow_state_borrow.push_root(HydroRoot::SendExternal {
404            to_external_id: from.id,
405            to_key: next_external_port_id,
406            to_many: false,
407            unpaired: false,
408            serialize_fn: Some(ser_fn.into()),
409            instantiate_fn: DebugInstantiate::Building,
410            input: Box::new(to_sink.ir_node.into_inner()),
411            op_metadata: HydroIrOpMetadata::new(),
412        });
413
414        let in_t_type = quote_type::<InT>();
415
416        let deser_fn: syn::Expr = syn::parse_quote! {
417            |res| {
418                let b = res.unwrap();
419                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
420            }
421        };
422
423        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
424            self.clone(),
425            HydroNode::ExternalInput {
426                from_external_id: from.id,
427                from_key: next_external_port_id,
428                from_many: false,
429                codec_type: quote_type::<LengthDelimitedCodec>().into(),
430                port_hint: NetworkHint::Auto,
431                instantiate_fn: DebugInstantiate::Building,
432                deserialize_fn: Some(deser_fn.into()),
433                metadata: self.new_node_metadata(Stream::<
434                    InT,
435                    Self,
436                    Unbounded,
437                    TotalOrder,
438                    ExactlyOnce,
439                >::collection_kind()),
440            },
441        );
442
443        (
444            ExternalBincodeBidi {
445                process_id: from.id,
446                port_id: next_external_port_id,
447                _phantom: PhantomData,
448            },
449            raw_stream,
450            fwd_ref,
451        )
452    }
453
454    #[expect(clippy::type_complexity, reason = "stream markers")]
455    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
456        &self,
457        from: &External<L>,
458        port_hint: NetworkHint,
459    ) -> (
460        ExternalBytesPort<Many>,
461        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
462        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
463        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
464    )
465    where
466        Self: Sized + NoTick,
467    {
468        let next_external_port_id = {
469            let mut flow_state = from.flow_state.borrow_mut();
470            let id = flow_state.next_external_out;
471            flow_state.next_external_out += 1;
472            id
473        };
474
475        let (fwd_ref, to_sink) =
476            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
477        let mut flow_state_borrow = self.flow_state().borrow_mut();
478
479        flow_state_borrow.push_root(HydroRoot::SendExternal {
480            to_external_id: from.id,
481            to_key: next_external_port_id,
482            to_many: true,
483            unpaired: false,
484            serialize_fn: None,
485            instantiate_fn: DebugInstantiate::Building,
486            input: Box::new(to_sink.entries().ir_node.into_inner()),
487            op_metadata: HydroIrOpMetadata::new(),
488        });
489
490        let raw_stream: Stream<
491            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
492            Self,
493            Unbounded,
494            TotalOrder,
495            ExactlyOnce,
496        > = Stream::new(
497            self.clone(),
498            HydroNode::ExternalInput {
499                from_external_id: from.id,
500                from_key: next_external_port_id,
501                from_many: true,
502                codec_type: quote_type::<Codec>().into(),
503                port_hint,
504                instantiate_fn: DebugInstantiate::Building,
505                deserialize_fn: None,
506                metadata: self.new_node_metadata(Stream::<
507                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
508                    Self,
509                    Unbounded,
510                    TotalOrder,
511                    ExactlyOnce,
512                >::collection_kind()),
513            },
514        );
515
516        let membership_stream_ident = syn::Ident::new(
517            &format!(
518                "__hydro_deploy_many_{}_{}_membership",
519                from.id, next_external_port_id
520            ),
521            Span::call_site(),
522        );
523        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
524        let raw_membership_stream: KeyedStream<
525            u64,
526            bool,
527            Self,
528            Unbounded,
529            TotalOrder,
530            ExactlyOnce,
531        > = KeyedStream::new(
532            self.clone(),
533            HydroNode::Source {
534                source: HydroSource::Stream(membership_stream_expr.into()),
535                metadata: self.new_node_metadata(KeyedStream::<
536                    u64,
537                    bool,
538                    Self,
539                    Unbounded,
540                    TotalOrder,
541                    ExactlyOnce,
542                >::collection_kind()),
543            },
544        );
545
546        (
547            ExternalBytesPort {
548                process_id: from.id,
549                port_id: next_external_port_id,
550                _phantom: PhantomData,
551            },
552            raw_stream
553                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
554                .into_keyed(),
555            raw_membership_stream.map(q!(|join| {
556                if join {
557                    MembershipEvent::Joined
558                } else {
559                    MembershipEvent::Left
560                }
561            })),
562            fwd_ref,
563        )
564    }
565
566    #[expect(clippy::type_complexity, reason = "stream markers")]
567    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
568        &self,
569        from: &External<L>,
570    ) -> (
571        ExternalBincodeBidi<InT, OutT, Many>,
572        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
573        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
574        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
575    )
576    where
577        Self: Sized + NoTick,
578    {
579        let next_external_port_id = {
580            let mut flow_state = from.flow_state.borrow_mut();
581            let id = flow_state.next_external_out;
582            flow_state.next_external_out += 1;
583            id
584        };
585
586        let root = get_this_crate();
587
588        let (fwd_ref, to_sink) =
589            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
590        let mut flow_state_borrow = self.flow_state().borrow_mut();
591
592        let out_t_type = quote_type::<OutT>();
593        let ser_fn: syn::Expr = syn::parse_quote! {
594            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
595                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
596            )
597        };
598
599        flow_state_borrow.push_root(HydroRoot::SendExternal {
600            to_external_id: from.id,
601            to_key: next_external_port_id,
602            to_many: true,
603            unpaired: false,
604            serialize_fn: Some(ser_fn.into()),
605            instantiate_fn: DebugInstantiate::Building,
606            input: Box::new(to_sink.entries().ir_node.into_inner()),
607            op_metadata: HydroIrOpMetadata::new(),
608        });
609
610        let in_t_type = quote_type::<InT>();
611
612        let deser_fn: syn::Expr = syn::parse_quote! {
613            |res| {
614                let (id, b) = res.unwrap();
615                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
616            }
617        };
618
619        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
620            KeyedStream::new(
621                self.clone(),
622                HydroNode::ExternalInput {
623                    from_external_id: from.id,
624                    from_key: next_external_port_id,
625                    from_many: true,
626                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
627                    port_hint: NetworkHint::Auto,
628                    instantiate_fn: DebugInstantiate::Building,
629                    deserialize_fn: Some(deser_fn.into()),
630                    metadata: self.new_node_metadata(KeyedStream::<
631                        u64,
632                        InT,
633                        Self,
634                        Unbounded,
635                        TotalOrder,
636                        ExactlyOnce,
637                    >::collection_kind()),
638                },
639            );
640
641        let membership_stream_ident = syn::Ident::new(
642            &format!(
643                "__hydro_deploy_many_{}_{}_membership",
644                from.id, next_external_port_id
645            ),
646            Span::call_site(),
647        );
648        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
649        let raw_membership_stream: KeyedStream<
650            u64,
651            bool,
652            Self,
653            Unbounded,
654            TotalOrder,
655            ExactlyOnce,
656        > = KeyedStream::new(
657            self.clone(),
658            HydroNode::Source {
659                source: HydroSource::Stream(membership_stream_expr.into()),
660                metadata: self.new_node_metadata(KeyedStream::<
661                    u64,
662                    bool,
663                    Self,
664                    Unbounded,
665                    TotalOrder,
666                    ExactlyOnce,
667                >::collection_kind()),
668            },
669        );
670
671        (
672            ExternalBincodeBidi {
673                process_id: from.id,
674                port_id: next_external_port_id,
675                _phantom: PhantomData,
676            },
677            raw_stream,
678            raw_membership_stream.map(q!(|join| {
679                if join {
680                    MembershipEvent::Joined
681                } else {
682                    MembershipEvent::Left
683                }
684            })),
685            fwd_ref,
686        )
687    }
688
689    /// Constructs a [`Singleton`] materialized at this location with the given static value.
690    ///
691    /// # Example
692    /// ```rust
693    /// # use hydro_lang::prelude::*;
694    /// # use futures::StreamExt;
695    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
696    /// let tick = process.tick();
697    /// let singleton = tick.singleton(q!(5));
698    /// # singleton.all_ticks()
699    /// # }, |mut stream| async move {
700    /// // 5
701    /// # assert_eq!(stream.next().await.unwrap(), 5);
702    /// # }));
703    /// ```
704    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
705    where
706        T: Clone,
707        Self: Sized,
708    {
709        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
710        // for bounded top-level singletons, and this is the only way to generate one
711
712        let e = e.splice_untyped_ctx(self);
713
714        Singleton::new(
715            self.clone(),
716            HydroNode::SingletonSource {
717                value: e.into(),
718                metadata: self
719                    .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
720            },
721        )
722    }
723
724    /// Generates a stream with values emitted at a fixed interval, with
725    /// each value being the current time (as an [`tokio::time::Instant`]).
726    ///
727    /// The clock source used is monotonic, so elements will be emitted in
728    /// increasing order.
729    ///
730    /// # Non-Determinism
731    /// Because this stream is generated by an OS timer, it will be
732    /// non-deterministic because each timestamp will be arbitrary.
733    fn source_interval(
734        &self,
735        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
736        _nondet: NonDet,
737    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
738    where
739        Self: Sized + NoTick,
740    {
741        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
742            tokio::time::interval(interval)
743        )))
744    }
745
746    /// Generates a stream with values emitted at a fixed interval (with an
747    /// initial delay), with each value being the current time
748    /// (as an [`tokio::time::Instant`]).
749    ///
750    /// The clock source used is monotonic, so elements will be emitted in
751    /// increasing order.
752    ///
753    /// # Non-Determinism
754    /// Because this stream is generated by an OS timer, it will be
755    /// non-deterministic because each timestamp will be arbitrary.
756    fn source_interval_delayed(
757        &self,
758        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
759        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
760        _nondet: NonDet,
761    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
762    where
763        Self: Sized + NoTick,
764    {
765        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
766            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
767        )))
768    }
769
770    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
771    where
772        S: CycleCollection<'a, ForwardRef, Location = Self>,
773        Self: NoTick,
774    {
775        let next_id = self.flow_state().borrow_mut().next_cycle_id();
776        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
777
778        (
779            ForwardHandle {
780                completed: false,
781                ident: ident.clone(),
782                expected_location: Location::id(self),
783                _phantom: PhantomData,
784            },
785            S::create_source(ident, self.clone()),
786        )
787    }
788}
789
790#[cfg(feature = "deploy")]
791#[cfg(test)]
792mod tests {
793    use std::collections::HashSet;
794
795    use futures::{SinkExt, StreamExt};
796    use hydro_deploy::Deployment;
797    use stageleft::q;
798    use tokio_util::codec::LengthDelimitedCodec;
799
800    use crate::compile::builder::FlowBuilder;
801    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
802    use crate::location::{Location, NetworkHint};
803    use crate::nondet::nondet;
804
805    #[tokio::test]
806    async fn top_level_singleton_replay_cardinality() {
807        let mut deployment = Deployment::new();
808
809        let flow = FlowBuilder::new();
810        let node = flow.process::<()>();
811        let external = flow.external::<()>();
812
813        let (in_port, input) =
814            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
815        let singleton = node.singleton(q!(123));
816        let tick = node.tick();
817        let out = input
818            .batch(&tick, nondet!(/** test */))
819            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
820            .cross_singleton(
821                singleton
822                    .snapshot(&tick, nondet!(/** test */))
823                    .into_stream()
824                    .count(),
825            )
826            .all_ticks()
827            .send_bincode_external(&external);
828
829        let nodes = flow
830            .with_process(&node, deployment.Localhost())
831            .with_external(&external, deployment.Localhost())
832            .deploy(&mut deployment);
833
834        deployment.deploy().await.unwrap();
835
836        let mut external_in = nodes.connect(in_port).await;
837        let mut external_out = nodes.connect(out).await;
838
839        deployment.start().await.unwrap();
840
841        external_in.send(1).await.unwrap();
842        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
843
844        external_in.send(2).await.unwrap();
845        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
846    }
847
848    #[tokio::test]
849    async fn tick_singleton_replay_cardinality() {
850        let mut deployment = Deployment::new();
851
852        let flow = FlowBuilder::new();
853        let node = flow.process::<()>();
854        let external = flow.external::<()>();
855
856        let (in_port, input) =
857            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
858        let tick = node.tick();
859        let singleton = tick.singleton(q!(123));
860        let out = input
861            .batch(&tick, nondet!(/** test */))
862            .cross_singleton(singleton.clone())
863            .cross_singleton(singleton.into_stream().count())
864            .all_ticks()
865            .send_bincode_external(&external);
866
867        let nodes = flow
868            .with_process(&node, deployment.Localhost())
869            .with_external(&external, deployment.Localhost())
870            .deploy(&mut deployment);
871
872        deployment.deploy().await.unwrap();
873
874        let mut external_in = nodes.connect(in_port).await;
875        let mut external_out = nodes.connect(out).await;
876
877        deployment.start().await.unwrap();
878
879        external_in.send(1).await.unwrap();
880        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
881
882        external_in.send(2).await.unwrap();
883        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
884    }
885
886    #[tokio::test]
887    async fn external_bytes() {
888        let mut deployment = Deployment::new();
889
890        let flow = FlowBuilder::new();
891        let first_node = flow.process::<()>();
892        let external = flow.external::<()>();
893
894        let (in_port, input) = first_node.source_external_bytes(&external);
895        let out = input.send_bincode_external(&external);
896
897        let nodes = flow
898            .with_process(&first_node, deployment.Localhost())
899            .with_external(&external, deployment.Localhost())
900            .deploy(&mut deployment);
901
902        deployment.deploy().await.unwrap();
903
904        let mut external_in = nodes.connect(in_port).await.1;
905        let mut external_out = nodes.connect(out).await;
906
907        deployment.start().await.unwrap();
908
909        external_in.send(vec![1, 2, 3].into()).await.unwrap();
910
911        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
912    }
913
914    #[tokio::test]
915    async fn multi_external_source() {
916        let mut deployment = Deployment::new();
917
918        let flow = FlowBuilder::new();
919        let first_node = flow.process::<()>();
920        let external = flow.external::<()>();
921
922        let (in_port, input, _membership, complete_sink) =
923            first_node.bidi_external_many_bincode(&external);
924        let out = input.entries().send_bincode_external(&external);
925        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
926
927        let nodes = flow
928            .with_process(&first_node, deployment.Localhost())
929            .with_external(&external, deployment.Localhost())
930            .deploy(&mut deployment);
931
932        deployment.deploy().await.unwrap();
933
934        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
935        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
936        let external_out = nodes.connect(out).await;
937
938        deployment.start().await.unwrap();
939
940        external_in_1.send(123).await.unwrap();
941        external_in_2.send(456).await.unwrap();
942
943        assert_eq!(
944            external_out.take(2).collect::<HashSet<_>>().await,
945            vec![(0, 123), (1, 456)].into_iter().collect()
946        );
947    }
948
949    #[tokio::test]
950    async fn second_connection_only_multi_source() {
951        let mut deployment = Deployment::new();
952
953        let flow = FlowBuilder::new();
954        let first_node = flow.process::<()>();
955        let external = flow.external::<()>();
956
957        let (in_port, input, _membership, complete_sink) =
958            first_node.bidi_external_many_bincode(&external);
959        let out = input.entries().send_bincode_external(&external);
960        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
961
962        let nodes = flow
963            .with_process(&first_node, deployment.Localhost())
964            .with_external(&external, deployment.Localhost())
965            .deploy(&mut deployment);
966
967        deployment.deploy().await.unwrap();
968
969        // intentionally skipped to test stream waking logic
970        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
971        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
972        let mut external_out = nodes.connect(out).await;
973
974        deployment.start().await.unwrap();
975
976        external_in_2.send(456).await.unwrap();
977
978        assert_eq!(external_out.next().await.unwrap(), (1, 456));
979    }
980
981    #[tokio::test]
982    async fn multi_external_bytes() {
983        let mut deployment = Deployment::new();
984
985        let flow = FlowBuilder::new();
986        let first_node = flow.process::<()>();
987        let external = flow.external::<()>();
988
989        let (in_port, input, _membership, complete_sink) = first_node
990            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
991        let out = input.entries().send_bincode_external(&external);
992        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
993
994        let nodes = flow
995            .with_process(&first_node, deployment.Localhost())
996            .with_external(&external, deployment.Localhost())
997            .deploy(&mut deployment);
998
999        deployment.deploy().await.unwrap();
1000
1001        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1002        let mut external_in_2 = nodes.connect(in_port).await.1;
1003        let external_out = nodes.connect(out).await;
1004
1005        deployment.start().await.unwrap();
1006
1007        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1008        external_in_2.send(vec![4, 5].into()).await.unwrap();
1009
1010        assert_eq!(
1011            external_out.take(2).collect::<HashSet<_>>().await,
1012            vec![
1013                (0, (&[1u8, 2, 3] as &[u8]).into()),
1014                (1, (&[4u8, 5] as &[u8]).into())
1015            ]
1016            .into_iter()
1017            .collect()
1018        );
1019    }
1020
1021    #[tokio::test]
1022    async fn single_client_external_bytes() {
1023        let mut deployment = Deployment::new();
1024        let flow = FlowBuilder::new();
1025        let first_node = flow.process::<()>();
1026        let external = flow.external::<()>();
1027        let (port, input, complete_sink) = first_node
1028            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1029        complete_sink.complete(input.map(q!(|data| {
1030            let mut resp: Vec<u8> = data.into();
1031            resp.push(42);
1032            resp.into() // : Bytes
1033        })));
1034
1035        let nodes = flow
1036            .with_process(&first_node, deployment.Localhost())
1037            .with_external(&external, deployment.Localhost())
1038            .deploy(&mut deployment);
1039
1040        deployment.deploy().await.unwrap();
1041        deployment.start().await.unwrap();
1042
1043        let (mut external_out, mut external_in) = nodes.connect(port).await;
1044
1045        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1046        assert_eq!(
1047            external_out.next().await.unwrap().unwrap(),
1048            vec![1, 2, 3, 42]
1049        );
1050    }
1051
1052    #[tokio::test]
1053    async fn echo_external_bytes() {
1054        let mut deployment = Deployment::new();
1055
1056        let flow = FlowBuilder::new();
1057        let first_node = flow.process::<()>();
1058        let external = flow.external::<()>();
1059
1060        let (port, input, _membership, complete_sink) = first_node
1061            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1062        complete_sink
1063            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1064
1065        let nodes = flow
1066            .with_process(&first_node, deployment.Localhost())
1067            .with_external(&external, deployment.Localhost())
1068            .deploy(&mut deployment);
1069
1070        deployment.deploy().await.unwrap();
1071
1072        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1073        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1074
1075        deployment.start().await.unwrap();
1076
1077        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1078        external_in_2.send(vec![4, 5].into()).await.unwrap();
1079
1080        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1081        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1082    }
1083
1084    #[tokio::test]
1085    async fn echo_external_bincode() {
1086        let mut deployment = Deployment::new();
1087
1088        let flow = FlowBuilder::new();
1089        let first_node = flow.process::<()>();
1090        let external = flow.external::<()>();
1091
1092        let (port, input, _membership, complete_sink) =
1093            first_node.bidi_external_many_bincode(&external);
1094        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1095
1096        let nodes = flow
1097            .with_process(&first_node, deployment.Localhost())
1098            .with_external(&external, deployment.Localhost())
1099            .deploy(&mut deployment);
1100
1101        deployment.deploy().await.unwrap();
1102
1103        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1104        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1105
1106        deployment.start().await.unwrap();
1107
1108        external_in_1.send("hi".to_string()).await.unwrap();
1109        external_in_2.send("hello".to_string()).await.unwrap();
1110
1111        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1112        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1113    }
1114}