1use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46use crate::location::dynamic::LocationId;
47use crate::location::external_process::{
48 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
49};
50use crate::nondet::NonDet;
51#[cfg(feature = "sim")]
52use crate::sim::SimSender;
53use crate::staging_util::get_this_crate;
54
55pub mod dynamic;
56
57pub mod external_process;
58pub use external_process::External;
59
60pub mod process;
61pub use process::Process;
62
63pub mod cluster;
64pub use cluster::Cluster;
65
66pub mod member_id;
67pub use member_id::{MemberId, TaglessMemberId};
68
69pub mod tick;
70pub use tick::{Atomic, NoTick, Tick};
71
72#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
75pub enum MembershipEvent {
76 Joined,
78 Left,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum NetworkHint {
89 Auto,
91 TcpPort(Option<u16>),
96}
97
98pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
99 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
100}
101
102#[stageleft::export(LocationKey)]
103new_key_type! {
104 pub struct LocationKey;
106}
107
108impl std::fmt::Display for LocationKey {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 write!(f, "loc{:?}", self.data()) }
112}
113
114impl std::str::FromStr for LocationKey {
117 type Err = Option<ParseIntError>;
118
119 fn from_str(s: &str) -> Result<Self, Self::Err> {
120 let nvn = s.strip_prefix("loc").ok_or(None)?;
121 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
122 let idx: u64 = idx.parse()?;
123 let ver: u64 = ver.parse()?;
124 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
125 }
126}
127
128impl LocationKey {
129 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
135 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
139 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
141
142impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
144 type O = LocationKey;
145
146 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
147 where
148 Self: Sized,
149 {
150 let root = get_this_crate();
151 let n = Key::data(&self).as_ffi();
152 (
153 QuoteTokens {
154 prelude: None,
155 expr: Some(quote! {
156 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
157 }),
158 },
159 (),
160 )
161 }
162}
163
164#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
166pub enum LocationType {
167 Process,
169 Cluster,
171 External,
173}
174
175#[expect(
189 private_bounds,
190 reason = "only internal Hydro code can define location types"
191)]
192pub trait Location<'a>: dynamic::DynLocation {
193 type Root: Location<'a>;
198
199 fn root(&self) -> Self::Root;
204
205 fn try_tick(&self) -> Option<Tick<Self>> {
212 if Self::is_top_level() {
213 let id = self.flow_state().borrow_mut().next_clock_id();
214 Some(Tick {
215 id,
216 l: self.clone(),
217 })
218 } else {
219 None
220 }
221 }
222
223 fn id(&self) -> LocationId {
225 dynamic::DynLocation::id(self)
226 }
227
228 fn tick(&self) -> Tick<Self>
254 where
255 Self: NoTick,
256 {
257 let id = self.flow_state().borrow_mut().next_clock_id();
258 Tick {
259 id,
260 l: self.clone(),
261 }
262 }
263
264 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
289 where
290 Self: Sized + NoTick,
291 {
292 Stream::new(
293 self.clone(),
294 HydroNode::Source {
295 source: HydroSource::Spin(),
296 metadata: self.new_node_metadata(Stream::<
297 (),
298 Self,
299 Unbounded,
300 TotalOrder,
301 ExactlyOnce,
302 >::collection_kind()),
303 },
304 )
305 }
306
307 fn source_stream<T, E>(
328 &self,
329 e: impl QuotedWithContext<'a, E, Self>,
330 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
331 where
332 E: FuturesStream<Item = T> + Unpin,
333 Self: Sized + NoTick,
334 {
335 let e = e.splice_untyped_ctx(self);
336
337 Stream::new(
338 self.clone(),
339 HydroNode::Source {
340 source: HydroSource::Stream(e.into()),
341 metadata: self.new_node_metadata(Stream::<
342 T,
343 Self,
344 Unbounded,
345 TotalOrder,
346 ExactlyOnce,
347 >::collection_kind()),
348 },
349 )
350 }
351
352 fn source_iter<T, E>(
374 &self,
375 e: impl QuotedWithContext<'a, E, Self>,
376 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
377 where
378 E: IntoIterator<Item = T>,
379 Self: Sized,
380 {
381 let e = e.splice_typed_ctx(self);
382
383 Stream::new(
384 self.clone(),
385 HydroNode::Source {
386 source: HydroSource::Iter(e.into()),
387 metadata: self.new_node_metadata(
388 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
389 ),
390 },
391 )
392 }
393
394 fn source_cluster_members<C: 'a>(
428 &self,
429 cluster: &Cluster<'a, C>,
430 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
431 where
432 Self: Sized + NoTick,
433 {
434 Stream::new(
435 self.clone(),
436 HydroNode::Source {
437 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
438 metadata: self.new_node_metadata(Stream::<
439 (TaglessMemberId, MembershipEvent),
440 Self,
441 Unbounded,
442 TotalOrder,
443 ExactlyOnce,
444 >::collection_kind()),
445 },
446 )
447 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
448 .into_keyed()
449 }
450
451 fn source_external_bytes<L>(
459 &self,
460 from: &External<L>,
461 ) -> (
462 ExternalBytesPort,
463 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
464 )
465 where
466 Self: Sized + NoTick,
467 {
468 let (port, stream, sink) =
469 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
470
471 sink.complete(self.source_iter(q!([])));
472
473 (port, stream)
474 }
475
476 #[expect(clippy::type_complexity, reason = "stream markers")]
483 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
484 &self,
485 from: &External<L>,
486 ) -> (
487 ExternalBincodeSink<T, NotMany, O, R>,
488 Stream<T, Self, Unbounded, O, R>,
489 )
490 where
491 Self: Sized + NoTick,
492 T: Serialize + DeserializeOwned,
493 {
494 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
495 sink.complete(self.source_iter(q!([])));
496
497 (
498 ExternalBincodeSink {
499 process_key: from.key,
500 port_id: port.port_id,
501 _phantom: PhantomData,
502 },
503 stream.weaken_ordering().weaken_retries(),
504 )
505 }
506
507 #[cfg(feature = "sim")]
512 #[expect(clippy::type_complexity, reason = "stream markers")]
513 fn sim_input<T, O: Ordering, R: Retries>(
514 &self,
515 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
516 where
517 Self: Sized + NoTick,
518 T: Serialize + DeserializeOwned,
519 {
520 let external_location: External<'a, ()> = External {
521 key: LocationKey::FIRST,
522 flow_state: self.flow_state().clone(),
523 _phantom: PhantomData,
524 };
525
526 let (external, stream) = self.source_external_bincode(&external_location);
527
528 (SimSender(external.port_id, PhantomData), stream)
529 }
530
531 fn embedded_input<T>(
537 &self,
538 name: impl Into<String>,
539 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
540 where
541 Self: Sized + NoTick,
542 {
543 let ident = syn::Ident::new(&name.into(), Span::call_site());
544
545 Stream::new(
546 self.clone(),
547 HydroNode::Source {
548 source: HydroSource::Embedded(ident),
549 metadata: self.new_node_metadata(Stream::<
550 T,
551 Self,
552 Unbounded,
553 TotalOrder,
554 ExactlyOnce,
555 >::collection_kind()),
556 },
557 )
558 }
559
560 fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
566 where
567 Self: Sized + NoTick,
568 {
569 let ident = syn::Ident::new(&name.into(), Span::call_site());
570
571 Singleton::new(
572 self.clone(),
573 HydroNode::Source {
574 source: HydroSource::EmbeddedSingleton(ident),
575 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
576 },
577 )
578 }
579
580 #[expect(clippy::type_complexity, reason = "stream markers")]
625 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
626 &self,
627 from: &External<L>,
628 port_hint: NetworkHint,
629 ) -> (
630 ExternalBytesPort<NotMany>,
631 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
632 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
633 )
634 where
635 Self: Sized + NoTick,
636 {
637 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
638
639 let (fwd_ref, to_sink) =
640 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
641 let mut flow_state_borrow = self.flow_state().borrow_mut();
642
643 flow_state_borrow.push_root(HydroRoot::SendExternal {
644 to_external_key: from.key,
645 to_port_id: next_external_port_id,
646 to_many: false,
647 unpaired: false,
648 serialize_fn: None,
649 instantiate_fn: DebugInstantiate::Building,
650 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
651 op_metadata: HydroIrOpMetadata::new(),
652 });
653
654 let raw_stream: Stream<
655 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
656 Self,
657 Unbounded,
658 TotalOrder,
659 ExactlyOnce,
660 > = Stream::new(
661 self.clone(),
662 HydroNode::ExternalInput {
663 from_external_key: from.key,
664 from_port_id: next_external_port_id,
665 from_many: false,
666 codec_type: quote_type::<Codec>().into(),
667 port_hint,
668 instantiate_fn: DebugInstantiate::Building,
669 deserialize_fn: None,
670 metadata: self.new_node_metadata(Stream::<
671 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
672 Self,
673 Unbounded,
674 TotalOrder,
675 ExactlyOnce,
676 >::collection_kind()),
677 },
678 );
679
680 (
681 ExternalBytesPort {
682 process_key: from.key,
683 port_id: next_external_port_id,
684 _phantom: PhantomData,
685 },
686 raw_stream.flatten_ordered(),
687 fwd_ref,
688 )
689 }
690
691 #[expect(clippy::type_complexity, reason = "stream markers")]
701 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
702 &self,
703 from: &External<L>,
704 ) -> (
705 ExternalBincodeBidi<InT, OutT, NotMany>,
706 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
707 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
708 )
709 where
710 Self: Sized + NoTick,
711 {
712 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
713
714 let (fwd_ref, to_sink) =
715 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
716 let mut flow_state_borrow = self.flow_state().borrow_mut();
717
718 let root = get_this_crate();
719
720 let out_t_type = quote_type::<OutT>();
721 let ser_fn: syn::Expr = syn::parse_quote! {
722 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
723 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
724 )
725 };
726
727 flow_state_borrow.push_root(HydroRoot::SendExternal {
728 to_external_key: from.key,
729 to_port_id: next_external_port_id,
730 to_many: false,
731 unpaired: false,
732 serialize_fn: Some(ser_fn.into()),
733 instantiate_fn: DebugInstantiate::Building,
734 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
735 op_metadata: HydroIrOpMetadata::new(),
736 });
737
738 let in_t_type = quote_type::<InT>();
739
740 let deser_fn: syn::Expr = syn::parse_quote! {
741 |res| {
742 let b = res.unwrap();
743 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
744 }
745 };
746
747 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
748 self.clone(),
749 HydroNode::ExternalInput {
750 from_external_key: from.key,
751 from_port_id: next_external_port_id,
752 from_many: false,
753 codec_type: quote_type::<LengthDelimitedCodec>().into(),
754 port_hint: NetworkHint::Auto,
755 instantiate_fn: DebugInstantiate::Building,
756 deserialize_fn: Some(deser_fn.into()),
757 metadata: self.new_node_metadata(Stream::<
758 InT,
759 Self,
760 Unbounded,
761 TotalOrder,
762 ExactlyOnce,
763 >::collection_kind()),
764 },
765 );
766
767 (
768 ExternalBincodeBidi {
769 process_key: from.key,
770 port_id: next_external_port_id,
771 _phantom: PhantomData,
772 },
773 raw_stream,
774 fwd_ref,
775 )
776 }
777
778 #[expect(clippy::type_complexity, reason = "stream markers")]
790 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
791 &self,
792 from: &External<L>,
793 port_hint: NetworkHint,
794 ) -> (
795 ExternalBytesPort<Many>,
796 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
797 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
798 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
799 )
800 where
801 Self: Sized + NoTick,
802 {
803 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
804
805 let (fwd_ref, to_sink) =
806 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
807 let mut flow_state_borrow = self.flow_state().borrow_mut();
808
809 flow_state_borrow.push_root(HydroRoot::SendExternal {
810 to_external_key: from.key,
811 to_port_id: next_external_port_id,
812 to_many: true,
813 unpaired: false,
814 serialize_fn: None,
815 instantiate_fn: DebugInstantiate::Building,
816 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
817 op_metadata: HydroIrOpMetadata::new(),
818 });
819
820 let raw_stream: Stream<
821 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
822 Self,
823 Unbounded,
824 TotalOrder,
825 ExactlyOnce,
826 > = Stream::new(
827 self.clone(),
828 HydroNode::ExternalInput {
829 from_external_key: from.key,
830 from_port_id: next_external_port_id,
831 from_many: true,
832 codec_type: quote_type::<Codec>().into(),
833 port_hint,
834 instantiate_fn: DebugInstantiate::Building,
835 deserialize_fn: None,
836 metadata: self.new_node_metadata(Stream::<
837 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
838 Self,
839 Unbounded,
840 TotalOrder,
841 ExactlyOnce,
842 >::collection_kind()),
843 },
844 );
845
846 let membership_stream_ident = syn::Ident::new(
847 &format!(
848 "__hydro_deploy_many_{}_{}_membership",
849 from.key, next_external_port_id
850 ),
851 Span::call_site(),
852 );
853 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
854 let raw_membership_stream: KeyedStream<
855 u64,
856 bool,
857 Self,
858 Unbounded,
859 TotalOrder,
860 ExactlyOnce,
861 > = KeyedStream::new(
862 self.clone(),
863 HydroNode::Source {
864 source: HydroSource::Stream(membership_stream_expr.into()),
865 metadata: self.new_node_metadata(KeyedStream::<
866 u64,
867 bool,
868 Self,
869 Unbounded,
870 TotalOrder,
871 ExactlyOnce,
872 >::collection_kind()),
873 },
874 );
875
876 (
877 ExternalBytesPort {
878 process_key: from.key,
879 port_id: next_external_port_id,
880 _phantom: PhantomData,
881 },
882 raw_stream
883 .flatten_ordered() .into_keyed(),
885 raw_membership_stream.map(q!(|join| {
886 if join {
887 MembershipEvent::Joined
888 } else {
889 MembershipEvent::Left
890 }
891 })),
892 fwd_ref,
893 )
894 }
895
896 #[expect(clippy::type_complexity, reason = "stream markers")]
912 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
913 &self,
914 from: &External<L>,
915 ) -> (
916 ExternalBincodeBidi<InT, OutT, Many>,
917 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
918 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
919 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
920 )
921 where
922 Self: Sized + NoTick,
923 {
924 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
925
926 let (fwd_ref, to_sink) =
927 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
928 let mut flow_state_borrow = self.flow_state().borrow_mut();
929
930 let root = get_this_crate();
931
932 let out_t_type = quote_type::<OutT>();
933 let ser_fn: syn::Expr = syn::parse_quote! {
934 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
935 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
936 )
937 };
938
939 flow_state_borrow.push_root(HydroRoot::SendExternal {
940 to_external_key: from.key,
941 to_port_id: next_external_port_id,
942 to_many: true,
943 unpaired: false,
944 serialize_fn: Some(ser_fn.into()),
945 instantiate_fn: DebugInstantiate::Building,
946 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
947 op_metadata: HydroIrOpMetadata::new(),
948 });
949
950 let in_t_type = quote_type::<InT>();
951
952 let deser_fn: syn::Expr = syn::parse_quote! {
953 |res| {
954 let (id, b) = res.unwrap();
955 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
956 }
957 };
958
959 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
960 KeyedStream::new(
961 self.clone(),
962 HydroNode::ExternalInput {
963 from_external_key: from.key,
964 from_port_id: next_external_port_id,
965 from_many: true,
966 codec_type: quote_type::<LengthDelimitedCodec>().into(),
967 port_hint: NetworkHint::Auto,
968 instantiate_fn: DebugInstantiate::Building,
969 deserialize_fn: Some(deser_fn.into()),
970 metadata: self.new_node_metadata(KeyedStream::<
971 u64,
972 InT,
973 Self,
974 Unbounded,
975 TotalOrder,
976 ExactlyOnce,
977 >::collection_kind()),
978 },
979 );
980
981 let membership_stream_ident = syn::Ident::new(
982 &format!(
983 "__hydro_deploy_many_{}_{}_membership",
984 from.key, next_external_port_id
985 ),
986 Span::call_site(),
987 );
988 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
989 let raw_membership_stream: KeyedStream<
990 u64,
991 bool,
992 Self,
993 Unbounded,
994 TotalOrder,
995 ExactlyOnce,
996 > = KeyedStream::new(
997 self.clone(),
998 HydroNode::Source {
999 source: HydroSource::Stream(membership_stream_expr.into()),
1000 metadata: self.new_node_metadata(KeyedStream::<
1001 u64,
1002 bool,
1003 Self,
1004 Unbounded,
1005 TotalOrder,
1006 ExactlyOnce,
1007 >::collection_kind()),
1008 },
1009 );
1010
1011 (
1012 ExternalBincodeBidi {
1013 process_key: from.key,
1014 port_id: next_external_port_id,
1015 _phantom: PhantomData,
1016 },
1017 raw_stream,
1018 raw_membership_stream.map(q!(|join| {
1019 if join {
1020 MembershipEvent::Joined
1021 } else {
1022 MembershipEvent::Left
1023 }
1024 })),
1025 fwd_ref,
1026 )
1027 }
1028
1029 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1049 where
1050 Self: Sized + NoTick,
1051 {
1052 let e = e.splice_untyped_ctx(self);
1053
1054 Singleton::new(
1055 self.clone(),
1056 HydroNode::SingletonSource {
1057 value: e.into(),
1058 first_tick_only: false,
1059 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1060 },
1061 )
1062 }
1063
1064 fn singleton_future<F>(
1087 &self,
1088 e: impl QuotedWithContext<'a, F, Self>,
1089 ) -> Singleton<F::Output, Self, Bounded>
1090 where
1091 F: Future,
1092 Self: Sized + NoTick,
1093 {
1094 self.singleton(e).resolve_future_blocking()
1095 }
1096
1097 fn source_interval(
1107 &self,
1108 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1109 _nondet: NonDet,
1110 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1111 where
1112 Self: Sized + NoTick,
1113 {
1114 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1115 tokio::time::interval(interval)
1116 )))
1117 }
1118
1119 fn source_interval_delayed(
1130 &self,
1131 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1132 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1133 _nondet: NonDet,
1134 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1135 where
1136 Self: Sized + NoTick,
1137 {
1138 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1139 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1140 )))
1141 }
1142
1143 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1183 where
1184 S: CycleCollection<'a, ForwardRef, Location = Self>,
1185 {
1186 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1187 (
1188 ForwardHandle::new(cycle_id, Location::id(self)),
1189 S::create_source(cycle_id, self.clone()),
1190 )
1191 }
1192}
1193
1194#[cfg(feature = "deploy")]
1195#[cfg(test)]
1196mod tests {
1197 use std::collections::HashSet;
1198
1199 use futures::{SinkExt, StreamExt};
1200 use hydro_deploy::Deployment;
1201 use stageleft::q;
1202 use tokio_util::codec::LengthDelimitedCodec;
1203
1204 use crate::compile::builder::FlowBuilder;
1205 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1206 use crate::location::{Location, NetworkHint};
1207 use crate::nondet::nondet;
1208
1209 #[tokio::test]
1210 async fn top_level_singleton_replay_cardinality() {
1211 let mut deployment = Deployment::new();
1212
1213 let mut flow = FlowBuilder::new();
1214 let node = flow.process::<()>();
1215 let external = flow.external::<()>();
1216
1217 let (in_port, input) =
1218 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1219 let singleton = node.singleton(q!(123));
1220 let tick = node.tick();
1221 let out = input
1222 .batch(&tick, nondet!())
1223 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1224 .cross_singleton(
1225 singleton
1226 .snapshot(&tick, nondet!())
1227 .into_stream()
1228 .count(),
1229 )
1230 .all_ticks()
1231 .send_bincode_external(&external);
1232
1233 let nodes = flow
1234 .with_process(&node, deployment.Localhost())
1235 .with_external(&external, deployment.Localhost())
1236 .deploy(&mut deployment);
1237
1238 deployment.deploy().await.unwrap();
1239
1240 let mut external_in = nodes.connect(in_port).await;
1241 let mut external_out = nodes.connect(out).await;
1242
1243 deployment.start().await.unwrap();
1244
1245 external_in.send(1).await.unwrap();
1246 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1247
1248 external_in.send(2).await.unwrap();
1249 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1250 }
1251
1252 #[tokio::test]
1253 async fn tick_singleton_replay_cardinality() {
1254 let mut deployment = Deployment::new();
1255
1256 let mut flow = FlowBuilder::new();
1257 let node = flow.process::<()>();
1258 let external = flow.external::<()>();
1259
1260 let (in_port, input) =
1261 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1262 let tick = node.tick();
1263 let singleton = tick.singleton(q!(123));
1264 let out = input
1265 .batch(&tick, nondet!())
1266 .cross_singleton(singleton.clone())
1267 .cross_singleton(singleton.into_stream().count())
1268 .all_ticks()
1269 .send_bincode_external(&external);
1270
1271 let nodes = flow
1272 .with_process(&node, deployment.Localhost())
1273 .with_external(&external, deployment.Localhost())
1274 .deploy(&mut deployment);
1275
1276 deployment.deploy().await.unwrap();
1277
1278 let mut external_in = nodes.connect(in_port).await;
1279 let mut external_out = nodes.connect(out).await;
1280
1281 deployment.start().await.unwrap();
1282
1283 external_in.send(1).await.unwrap();
1284 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1285
1286 external_in.send(2).await.unwrap();
1287 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1288 }
1289
1290 #[tokio::test]
1291 async fn external_bytes() {
1292 let mut deployment = Deployment::new();
1293
1294 let mut flow = FlowBuilder::new();
1295 let first_node = flow.process::<()>();
1296 let external = flow.external::<()>();
1297
1298 let (in_port, input) = first_node.source_external_bytes(&external);
1299 let out = input.send_bincode_external(&external);
1300
1301 let nodes = flow
1302 .with_process(&first_node, deployment.Localhost())
1303 .with_external(&external, deployment.Localhost())
1304 .deploy(&mut deployment);
1305
1306 deployment.deploy().await.unwrap();
1307
1308 let mut external_in = nodes.connect(in_port).await.1;
1309 let mut external_out = nodes.connect(out).await;
1310
1311 deployment.start().await.unwrap();
1312
1313 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1314
1315 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1316 }
1317
1318 #[tokio::test]
1319 async fn multi_external_source() {
1320 let mut deployment = Deployment::new();
1321
1322 let mut flow = FlowBuilder::new();
1323 let first_node = flow.process::<()>();
1324 let external = flow.external::<()>();
1325
1326 let (in_port, input, _membership, complete_sink) =
1327 first_node.bidi_external_many_bincode(&external);
1328 let out = input.entries().send_bincode_external(&external);
1329 complete_sink.complete(
1330 first_node
1331 .source_iter::<(u64, ()), _>(q!([]))
1332 .into_keyed()
1333 .weaken_ordering(),
1334 );
1335
1336 let nodes = flow
1337 .with_process(&first_node, deployment.Localhost())
1338 .with_external(&external, deployment.Localhost())
1339 .deploy(&mut deployment);
1340
1341 deployment.deploy().await.unwrap();
1342
1343 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1344 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1345 let external_out = nodes.connect(out).await;
1346
1347 deployment.start().await.unwrap();
1348
1349 external_in_1.send(123).await.unwrap();
1350 external_in_2.send(456).await.unwrap();
1351
1352 assert_eq!(
1353 external_out.take(2).collect::<HashSet<_>>().await,
1354 vec![(0, 123), (1, 456)].into_iter().collect()
1355 );
1356 }
1357
1358 #[tokio::test]
1359 async fn second_connection_only_multi_source() {
1360 let mut deployment = Deployment::new();
1361
1362 let mut flow = FlowBuilder::new();
1363 let first_node = flow.process::<()>();
1364 let external = flow.external::<()>();
1365
1366 let (in_port, input, _membership, complete_sink) =
1367 first_node.bidi_external_many_bincode(&external);
1368 let out = input.entries().send_bincode_external(&external);
1369 complete_sink.complete(
1370 first_node
1371 .source_iter::<(u64, ()), _>(q!([]))
1372 .into_keyed()
1373 .weaken_ordering(),
1374 );
1375
1376 let nodes = flow
1377 .with_process(&first_node, deployment.Localhost())
1378 .with_external(&external, deployment.Localhost())
1379 .deploy(&mut deployment);
1380
1381 deployment.deploy().await.unwrap();
1382
1383 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1385 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1386 let mut external_out = nodes.connect(out).await;
1387
1388 deployment.start().await.unwrap();
1389
1390 external_in_2.send(456).await.unwrap();
1391
1392 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1393 }
1394
1395 #[tokio::test]
1396 async fn multi_external_bytes() {
1397 let mut deployment = Deployment::new();
1398
1399 let mut flow = FlowBuilder::new();
1400 let first_node = flow.process::<()>();
1401 let external = flow.external::<()>();
1402
1403 let (in_port, input, _membership, complete_sink) = first_node
1404 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1405 let out = input.entries().send_bincode_external(&external);
1406 complete_sink.complete(
1407 first_node
1408 .source_iter(q!([]))
1409 .into_keyed()
1410 .weaken_ordering(),
1411 );
1412
1413 let nodes = flow
1414 .with_process(&first_node, deployment.Localhost())
1415 .with_external(&external, deployment.Localhost())
1416 .deploy(&mut deployment);
1417
1418 deployment.deploy().await.unwrap();
1419
1420 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1421 let mut external_in_2 = nodes.connect(in_port).await.1;
1422 let external_out = nodes.connect(out).await;
1423
1424 deployment.start().await.unwrap();
1425
1426 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1427 external_in_2.send(vec![4, 5].into()).await.unwrap();
1428
1429 assert_eq!(
1430 external_out.take(2).collect::<HashSet<_>>().await,
1431 vec![
1432 (0, (&[1u8, 2, 3] as &[u8]).into()),
1433 (1, (&[4u8, 5] as &[u8]).into())
1434 ]
1435 .into_iter()
1436 .collect()
1437 );
1438 }
1439
1440 #[tokio::test]
1441 async fn single_client_external_bytes() {
1442 let mut deployment = Deployment::new();
1443 let mut flow = FlowBuilder::new();
1444 let first_node = flow.process::<()>();
1445 let external = flow.external::<()>();
1446 let (port, input, complete_sink) = first_node
1447 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1448 complete_sink.complete(input.map(q!(|data| {
1449 let mut resp: Vec<u8> = data.into();
1450 resp.push(42);
1451 resp.into() })));
1453
1454 let nodes = flow
1455 .with_process(&first_node, deployment.Localhost())
1456 .with_external(&external, deployment.Localhost())
1457 .deploy(&mut deployment);
1458
1459 deployment.deploy().await.unwrap();
1460 deployment.start().await.unwrap();
1461
1462 let (mut external_out, mut external_in) = nodes.connect(port).await;
1463
1464 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1465 assert_eq!(
1466 external_out.next().await.unwrap().unwrap(),
1467 vec![1, 2, 3, 42]
1468 );
1469 }
1470
1471 #[tokio::test]
1472 async fn echo_external_bytes() {
1473 let mut deployment = Deployment::new();
1474
1475 let mut flow = FlowBuilder::new();
1476 let first_node = flow.process::<()>();
1477 let external = flow.external::<()>();
1478
1479 let (port, input, _membership, complete_sink) = first_node
1480 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1481 complete_sink
1482 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1483
1484 let nodes = flow
1485 .with_process(&first_node, deployment.Localhost())
1486 .with_external(&external, deployment.Localhost())
1487 .deploy(&mut deployment);
1488
1489 deployment.deploy().await.unwrap();
1490
1491 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1492 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1493
1494 deployment.start().await.unwrap();
1495
1496 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1497 external_in_2.send(vec![4, 5].into()).await.unwrap();
1498
1499 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1500 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1501 }
1502
1503 #[tokio::test]
1504 async fn echo_external_bincode() {
1505 let mut deployment = Deployment::new();
1506
1507 let mut flow = FlowBuilder::new();
1508 let first_node = flow.process::<()>();
1509 let external = flow.external::<()>();
1510
1511 let (port, input, _membership, complete_sink) =
1512 first_node.bidi_external_many_bincode(&external);
1513 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1514
1515 let nodes = flow
1516 .with_process(&first_node, deployment.Localhost())
1517 .with_external(&external, deployment.Localhost())
1518 .deploy(&mut deployment);
1519
1520 deployment.deploy().await.unwrap();
1521
1522 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1523 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1524
1525 deployment.start().await.unwrap();
1526
1527 external_in_1.send("hi".to_owned()).await.unwrap();
1528 external_in_2.send("hello".to_owned()).await.unwrap();
1529
1530 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1531 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1532 }
1533
1534 #[tokio::test]
1535 async fn closure_location_name() {
1536 let mut deployment = Deployment::new();
1537 let mut flow = FlowBuilder::new();
1538
1539 enum ClosureProcess {}
1540
1541 let node = flow.process::<ClosureProcess>();
1542 let external = flow.external::<()>();
1543
1544 let (in_port, input) =
1545 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1546 let out = input.send_bincode_external(&external);
1547
1548 let nodes = flow
1549 .with_process(&node, deployment.Localhost())
1550 .with_external(&external, deployment.Localhost())
1551 .deploy(&mut deployment);
1552
1553 deployment.deploy().await.unwrap();
1554
1555 let mut external_in = nodes.connect(in_port).await;
1556 let mut external_out = nodes.connect(out).await;
1557
1558 deployment.start().await.unwrap();
1559
1560 external_in.send(42).await.unwrap();
1561 assert_eq!(external_out.next().await.unwrap(), 42);
1562 }
1563}