1use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53use crate::properties::manual_proof;
54#[cfg(feature = "sim")]
55use crate::sim::SimSender;
56use crate::staging_util::get_this_crate;
57
58pub mod dynamic;
59
60pub mod external_process;
61pub use external_process::External;
62
63pub mod process;
64pub use process::Process;
65
66pub mod cluster;
67pub use cluster::Cluster;
68
69pub mod member_id;
70pub use member_id::{MemberId, TaglessMemberId};
71
72pub mod tick;
73pub use tick::{Atomic, Tick};
74
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
78pub enum MembershipEvent {
79 Joined,
81 Left,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub enum NetworkHint {
92 Auto,
94 TcpPort(Option<u16>),
99}
100
101pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
102 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
103}
104
105#[stageleft::export(LocationKey)]
106new_key_type! {
107 pub struct LocationKey;
109}
110
111impl std::fmt::Display for LocationKey {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "loc{:?}", self.data()) }
115}
116
117impl std::str::FromStr for LocationKey {
120 type Err = Option<ParseIntError>;
121
122 fn from_str(s: &str) -> Result<Self, Self::Err> {
123 let nvn = s.strip_prefix("loc").ok_or(None)?;
124 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
125 let idx: u64 = idx.parse()?;
126 let ver: u64 = ver.parse()?;
127 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
128 }
129}
130
131impl LocationKey {
132 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
138 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); #[cfg(test)]
142 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); }
144
145impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
147 type O = LocationKey;
148
149 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
150 where
151 Self: Sized,
152 {
153 let root = get_this_crate();
154 let n = Key::data(&self).as_ffi();
155 (
156 QuoteTokens {
157 prelude: None,
158 expr: Some(quote! {
159 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
160 }),
161 },
162 (),
163 )
164 }
165}
166
167#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
169pub enum LocationType {
170 Process,
172 Cluster,
174 External,
176}
177
178pub trait TopLevel<'a>: Location<'a> {}
180
181#[expect(
195 private_bounds,
196 reason = "only internal Hydro code can define location types"
197)]
198pub trait Location<'a>: DynLocation {
199 type Root: Location<'a>;
204
205 type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
207
208 fn root(&self) -> Self::Root;
213
214 fn drop_consistency(&self) -> Self::DropConsistency;
216 fn consistency() -> Option<ClusterConsistency>;
218
219 fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
221 L2::from_drop_consistency(self.drop_consistency())
222 }
223
224 #[doc(hidden)]
225 fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
226
227 fn try_tick(&self) -> Option<Tick<Self>> {
234 if Self::is_top_level() {
235 let id = self.flow_state().borrow_mut().next_clock_id();
236 Some(Tick {
237 id,
238 l: self.clone(),
239 })
240 } else {
241 None
242 }
243 }
244
245 fn id(&self) -> LocationId {
247 DynLocation::dyn_id(self)
248 }
249
250 fn tick(&self) -> Tick<Self> {
276 if let LocationId::Tick(_, _) = self.id() {
277 panic!("cannot create nested ticks");
278 }
279
280 let id = self.flow_state().borrow_mut().next_clock_id();
281 Tick {
282 id,
283 l: self.clone(),
284 }
285 }
286
287 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
312 where
313 Self: TopLevel<'a> + Sized,
314 {
315 Stream::new(
316 self.clone(),
317 HydroNode::Source {
318 source: HydroSource::Spin(),
319 metadata: self.new_node_metadata(Stream::<
320 (),
321 Self,
322 Unbounded,
323 TotalOrder,
324 ExactlyOnce,
325 >::collection_kind()),
326 },
327 )
328 }
329
330 fn source_stream<T, E>(
351 &self,
352 e: impl QuotedWithContext<'a, E, Self>,
353 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
354 where
355 E: FuturesStream<Item = T> + Unpin,
356 Self: TopLevel<'a> + Sized,
357 {
358 let e = e.splice_untyped_ctx(self);
359
360 let target_location = self.drop_consistency();
361 Stream::new(
362 target_location.clone(),
363 HydroNode::Source {
364 source: HydroSource::Stream(e.into()),
365 metadata: target_location.new_node_metadata(Stream::<
366 T,
367 Self::DropConsistency,
368 Unbounded,
369 TotalOrder,
370 ExactlyOnce,
371 >::collection_kind()),
372 },
373 )
374 }
375
376 fn source_iter<T, E>(
398 &self,
399 e: impl QuotedWithContext<'a, E, Self>,
400 ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
401 where
402 E: IntoIterator<Item = T>,
403 Self: Sized,
404 {
405 let e = e.splice_typed_ctx(self);
406
407 let target_location = self.drop_consistency();
408 Stream::new(
409 target_location.clone(),
410 HydroNode::Source {
411 source: HydroSource::Iter(e.into()),
412 metadata: target_location.new_node_metadata(Stream::<
413 T,
414 Self::DropConsistency,
415 Bounded,
416 TotalOrder,
417 ExactlyOnce,
418 >::collection_kind()),
419 },
420 )
421 }
422
423 #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
424 fn source_cluster_members<C: 'a>(
463 &self,
464 cluster: &Cluster<'a, C>,
465 nondet_start: NonDet,
466 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
467 where
468 Self: TopLevel<'a> + Sized,
469 {
470 self.source_cluster_membership_stream(cluster, nondet_start)
471 }
472
473 fn source_cluster_membership_stream<C: 'a>(
512 &self,
513 cluster: &Cluster<'a, C>,
514 _nondet_start: NonDet,
515 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
516 where
517 Self: TopLevel<'a> + Sized,
518 {
519 let target_consistency = self.drop_consistency();
520 Stream::new(
521 target_consistency.clone(),
522 HydroNode::Source {
523 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
524 metadata: target_consistency.new_node_metadata(Stream::<
525 (TaglessMemberId, MembershipEvent),
526 Self,
527 Unbounded,
528 TotalOrder,
529 ExactlyOnce,
530 >::collection_kind(
531 )),
532 },
533 )
534 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
535 .into_keyed()
536 }
537
538 fn source_external_bytes<L>(
546 &self,
547 from: &External<L>,
548 ) -> (
549 ExternalBytesPort,
550 Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
551 )
552 where
553 Self: TopLevel<'a> + Sized,
554 {
555 let (port, stream, sink) =
556 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
557
558 sink.complete(stream.location().source_iter(q!([])));
559
560 (port, stream)
561 }
562
563 #[expect(clippy::type_complexity, reason = "stream markers")]
570 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
571 &self,
572 from: &External<L>,
573 ) -> (
574 ExternalBincodeSink<T, NotMany, O, R>,
575 Stream<T, Self::DropConsistency, Unbounded, O, R>,
576 )
577 where
578 Self: TopLevel<'a> + Sized,
579 T: Serialize + DeserializeOwned,
580 {
581 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
582 sink.complete(stream.location().source_iter(q!([])));
583
584 (
585 ExternalBincodeSink {
586 process_key: from.key,
587 port_id: port.port_id,
588 _phantom: PhantomData,
589 },
590 stream.weaken_ordering().weaken_retries(),
591 )
592 }
593
594 #[cfg(feature = "sim")]
599 #[expect(clippy::type_complexity, reason = "stream markers")]
600 fn sim_input<T, O: Ordering, R: Retries>(
601 &self,
602 ) -> (
603 SimSender<T, O, R>,
604 Stream<T, Self::DropConsistency, Unbounded, O, R>,
605 )
606 where
607 Self: TopLevel<'a> + Sized,
608 T: Serialize + DeserializeOwned,
609 {
610 let external_location: External<'a, ()> = External {
611 key: LocationKey::FIRST,
612 flow_state: self.flow_state().clone(),
613 _phantom: PhantomData,
614 };
615
616 let (external, stream) = self.source_external_bincode(&external_location);
617
618 (SimSender(external.port_id, PhantomData), stream)
619 }
620
621 fn embedded_input<T>(
627 &self,
628 name: impl Into<String>,
629 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
630 where
631 Self: TopLevel<'a> + Sized,
632 {
633 let ident = syn::Ident::new(&name.into(), Span::call_site());
634
635 let target_location = self.drop_consistency();
636 Stream::new(
637 target_location.clone(),
638 HydroNode::Source {
639 source: HydroSource::Embedded(ident),
640 metadata: target_location.new_node_metadata(Stream::<
641 T,
642 Self,
643 Unbounded,
644 TotalOrder,
645 ExactlyOnce,
646 >::collection_kind()),
647 },
648 )
649 }
650
651 fn embedded_singleton_input<T>(
657 &self,
658 name: impl Into<String>,
659 ) -> Singleton<T, Self::DropConsistency, Bounded>
660 where
661 Self: TopLevel<'a> + Sized,
662 {
663 let ident = syn::Ident::new(&name.into(), Span::call_site());
664
665 let target_location = self.drop_consistency();
666 Singleton::new(
667 target_location.clone(),
668 HydroNode::Source {
669 source: HydroSource::EmbeddedSingleton(ident),
670 metadata: target_location
671 .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
672 },
673 )
674 }
675
676 #[expect(clippy::type_complexity, reason = "stream markers")]
721 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
722 &self,
723 from: &External<L>,
724 port_hint: NetworkHint,
725 ) -> (
726 ExternalBytesPort<NotMany>,
727 Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
728 ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
729 )
730 where
731 Self: TopLevel<'a> + Sized,
732 {
733 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
734 let target_consistency = self.drop_consistency();
735
736 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
737 T,
738 Self::DropConsistency,
739 Unbounded,
740 TotalOrder,
741 ExactlyOnce,
742 >>();
743 let mut flow_state_borrow = self.flow_state().borrow_mut();
744
745 flow_state_borrow.push_root(HydroRoot::SendExternal {
746 to_external_key: from.key,
747 to_port_id: next_external_port_id,
748 to_many: false,
749 unpaired: false,
750 serialize_fn: None,
751 instantiate_fn: DebugInstantiate::Building,
752 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
753 op_metadata: HydroIrOpMetadata::new(),
754 });
755
756 let raw_stream: Stream<
757 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
758 Self::DropConsistency,
759 Unbounded,
760 TotalOrder,
761 ExactlyOnce,
762 > = Stream::new(
763 target_consistency.clone(),
764 HydroNode::ExternalInput {
765 from_external_key: from.key,
766 from_port_id: next_external_port_id,
767 from_many: false,
768 codec_type: quote_type::<Codec>().into(),
769 port_hint,
770 instantiate_fn: DebugInstantiate::Building,
771 deserialize_fn: None,
772 metadata: target_consistency.new_node_metadata(Stream::<
773 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
774 Self::DropConsistency,
775 Unbounded,
776 TotalOrder,
777 ExactlyOnce,
778 >::collection_kind(
779 )),
780 },
781 );
782
783 (
784 ExternalBytesPort {
785 process_key: from.key,
786 port_id: next_external_port_id,
787 _phantom: PhantomData,
788 },
789 raw_stream.flatten_ordered(),
790 fwd_ref,
791 )
792 }
793
794 #[expect(clippy::type_complexity, reason = "stream markers")]
804 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
805 &self,
806 from: &External<L>,
807 ) -> (
808 ExternalBincodeBidi<InT, OutT, NotMany>,
809 Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
810 ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
811 )
812 where
813 Self: TopLevel<'a> + Sized,
814 {
815 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
816
817 let target_consistency = self.drop_consistency();
818 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
819 OutT,
820 Self::DropConsistency,
821 Unbounded,
822 TotalOrder,
823 ExactlyOnce,
824 >>();
825 let mut flow_state_borrow = self.flow_state().borrow_mut();
826
827 let root = get_this_crate();
828
829 let out_t_type = quote_type::<OutT>();
830 let ser_fn: syn::Expr = syn::parse_quote! {
831 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
832 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
833 )
834 };
835
836 flow_state_borrow.push_root(HydroRoot::SendExternal {
837 to_external_key: from.key,
838 to_port_id: next_external_port_id,
839 to_many: false,
840 unpaired: false,
841 serialize_fn: Some(ser_fn.into()),
842 instantiate_fn: DebugInstantiate::Building,
843 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
844 op_metadata: HydroIrOpMetadata::new(),
845 });
846
847 let in_t_type = quote_type::<InT>();
848
849 let deser_fn: syn::Expr = syn::parse_quote! {
850 |res| {
851 let b = res.unwrap();
852 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
853 }
854 };
855
856 let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
857 Stream::new(
858 target_consistency.clone(),
859 HydroNode::ExternalInput {
860 from_external_key: from.key,
861 from_port_id: next_external_port_id,
862 from_many: false,
863 codec_type: quote_type::<LengthDelimitedCodec>().into(),
864 port_hint: NetworkHint::Auto,
865 instantiate_fn: DebugInstantiate::Building,
866 deserialize_fn: Some(deser_fn.into()),
867 metadata: target_consistency.new_node_metadata(Stream::<
868 InT,
869 Self::DropConsistency,
870 Unbounded,
871 TotalOrder,
872 ExactlyOnce,
873 >::collection_kind(
874 )),
875 },
876 );
877
878 (
879 ExternalBincodeBidi {
880 process_key: from.key,
881 port_id: next_external_port_id,
882 _phantom: PhantomData,
883 },
884 raw_stream,
885 fwd_ref,
886 )
887 }
888
889 #[expect(clippy::type_complexity, reason = "stream markers")]
901 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
902 &self,
903 from: &External<L>,
904 port_hint: NetworkHint,
905 ) -> (
906 ExternalBytesPort<Many>,
907 KeyedStream<
908 u64,
909 <Codec as Decoder>::Item,
910 Self::DropConsistency,
911 Unbounded,
912 TotalOrder,
913 ExactlyOnce,
914 >,
915 KeyedStream<
916 u64,
917 MembershipEvent,
918 Self::DropConsistency,
919 Unbounded,
920 TotalOrder,
921 ExactlyOnce,
922 >,
923 ForwardHandle<
924 'a,
925 KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
926 >,
927 )
928 where
929 Self: TopLevel<'a> + Sized,
930 {
931 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
932
933 let target_consistency = self.drop_consistency();
934 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
935 u64,
936 T,
937 Self::DropConsistency,
938 Unbounded,
939 NoOrder,
940 ExactlyOnce,
941 >>();
942 let mut flow_state_borrow = self.flow_state().borrow_mut();
943
944 flow_state_borrow.push_root(HydroRoot::SendExternal {
945 to_external_key: from.key,
946 to_port_id: next_external_port_id,
947 to_many: true,
948 unpaired: false,
949 serialize_fn: None,
950 instantiate_fn: DebugInstantiate::Building,
951 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
952 op_metadata: HydroIrOpMetadata::new(),
953 });
954
955 let raw_stream: Stream<
956 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
957 Self::DropConsistency,
958 Unbounded,
959 TotalOrder,
960 ExactlyOnce,
961 > = Stream::new(
962 target_consistency.clone(),
963 HydroNode::ExternalInput {
964 from_external_key: from.key,
965 from_port_id: next_external_port_id,
966 from_many: true,
967 codec_type: quote_type::<Codec>().into(),
968 port_hint,
969 instantiate_fn: DebugInstantiate::Building,
970 deserialize_fn: None,
971 metadata: target_consistency.new_node_metadata(Stream::<
972 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
973 Self::DropConsistency,
974 Unbounded,
975 TotalOrder,
976 ExactlyOnce,
977 >::collection_kind(
978 )),
979 },
980 );
981
982 let membership_stream_ident = syn::Ident::new(
983 &format!(
984 "__hydro_deploy_many_{}_{}_membership",
985 from.key, next_external_port_id
986 ),
987 Span::call_site(),
988 );
989 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
990 let raw_membership_stream: KeyedStream<
991 u64,
992 bool,
993 Self::DropConsistency,
994 Unbounded,
995 TotalOrder,
996 ExactlyOnce,
997 > = KeyedStream::new(
998 target_consistency.clone(),
999 HydroNode::Source {
1000 source: HydroSource::Stream(membership_stream_expr.into()),
1001 metadata: target_consistency.new_node_metadata(KeyedStream::<
1002 u64,
1003 bool,
1004 Self::DropConsistency,
1005 Unbounded,
1006 TotalOrder,
1007 ExactlyOnce,
1008 >::collection_kind(
1009 )),
1010 },
1011 );
1012
1013 (
1014 ExternalBytesPort {
1015 process_key: from.key,
1016 port_id: next_external_port_id,
1017 _phantom: PhantomData,
1018 },
1019 raw_stream
1020 .flatten_ordered() .into_keyed(),
1022 raw_membership_stream.map(q!(|join| {
1023 if join {
1024 MembershipEvent::Joined
1025 } else {
1026 MembershipEvent::Left
1027 }
1028 })),
1029 fwd_ref,
1030 )
1031 }
1032
1033 #[expect(clippy::type_complexity, reason = "stream markers")]
1049 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1050 &self,
1051 from: &External<L>,
1052 ) -> (
1053 ExternalBincodeBidi<InT, OutT, Many>,
1054 KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1055 KeyedStream<
1056 u64,
1057 MembershipEvent,
1058 Self::DropConsistency,
1059 Unbounded,
1060 TotalOrder,
1061 ExactlyOnce,
1062 >,
1063 ForwardHandle<
1064 'a,
1065 KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1066 >,
1067 )
1068 where
1069 Self: TopLevel<'a> + Sized,
1070 {
1071 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1072
1073 let target_consistency = self.drop_consistency();
1074 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1075 u64,
1076 OutT,
1077 Self::DropConsistency,
1078 Unbounded,
1079 NoOrder,
1080 ExactlyOnce,
1081 >>();
1082 let mut flow_state_borrow = self.flow_state().borrow_mut();
1083
1084 let root = get_this_crate();
1085
1086 let out_t_type = quote_type::<OutT>();
1087 let ser_fn: syn::Expr = syn::parse_quote! {
1088 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1089 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1090 )
1091 };
1092
1093 flow_state_borrow.push_root(HydroRoot::SendExternal {
1094 to_external_key: from.key,
1095 to_port_id: next_external_port_id,
1096 to_many: true,
1097 unpaired: false,
1098 serialize_fn: Some(ser_fn.into()),
1099 instantiate_fn: DebugInstantiate::Building,
1100 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1101 op_metadata: HydroIrOpMetadata::new(),
1102 });
1103
1104 let in_t_type = quote_type::<InT>();
1105
1106 let deser_fn: syn::Expr = syn::parse_quote! {
1107 |res| {
1108 let (id, b) = res.unwrap();
1109 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1110 }
1111 };
1112
1113 let raw_stream: KeyedStream<
1114 u64,
1115 InT,
1116 Self::DropConsistency,
1117 Unbounded,
1118 TotalOrder,
1119 ExactlyOnce,
1120 > = KeyedStream::new(
1121 target_consistency.clone(),
1122 HydroNode::ExternalInput {
1123 from_external_key: from.key,
1124 from_port_id: next_external_port_id,
1125 from_many: true,
1126 codec_type: quote_type::<LengthDelimitedCodec>().into(),
1127 port_hint: NetworkHint::Auto,
1128 instantiate_fn: DebugInstantiate::Building,
1129 deserialize_fn: Some(deser_fn.into()),
1130 metadata: target_consistency.new_node_metadata(KeyedStream::<
1131 u64,
1132 InT,
1133 Self::DropConsistency,
1134 Unbounded,
1135 TotalOrder,
1136 ExactlyOnce,
1137 >::collection_kind(
1138 )),
1139 },
1140 );
1141
1142 let membership_stream_ident = syn::Ident::new(
1143 &format!(
1144 "__hydro_deploy_many_{}_{}_membership",
1145 from.key, next_external_port_id
1146 ),
1147 Span::call_site(),
1148 );
1149 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1150 let raw_membership_stream: KeyedStream<
1151 u64,
1152 bool,
1153 Self::DropConsistency,
1154 Unbounded,
1155 TotalOrder,
1156 ExactlyOnce,
1157 > = KeyedStream::new(
1158 target_consistency.clone(),
1159 HydroNode::Source {
1160 source: HydroSource::Stream(membership_stream_expr.into()),
1161 metadata: target_consistency.new_node_metadata(KeyedStream::<
1162 u64,
1163 bool,
1164 Self::DropConsistency,
1165 Unbounded,
1166 TotalOrder,
1167 ExactlyOnce,
1168 >::collection_kind(
1169 )),
1170 },
1171 );
1172
1173 (
1174 ExternalBincodeBidi {
1175 process_key: from.key,
1176 port_id: next_external_port_id,
1177 _phantom: PhantomData,
1178 },
1179 raw_stream,
1180 raw_membership_stream.map(q!(|join| {
1181 if join {
1182 MembershipEvent::Joined
1183 } else {
1184 MembershipEvent::Left
1185 }
1186 })),
1187 fwd_ref,
1188 )
1189 }
1190
1191 #[expect(clippy::type_complexity, reason = "stream markers")]
1244 fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1245 &self,
1246 sidecar: impl QuotedWithContext<'a, F, Self>,
1247 ) -> (
1248 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1249 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1250 )
1251 where
1252 Self: Sized + TopLevel<'a>,
1253 {
1254 let location_key = Location::id(self).key();
1255
1256 let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1257 let (stream_ident, sink_ident) = sidecar_id.idents();
1258
1259 let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1260 self.flow_state()
1261 .borrow_mut()
1262 .sidecars
1263 .push(crate::compile::builder::Sidecar::Bidi {
1264 location_key,
1265 sidecar_id,
1266 sidecar_closure: Box::new(sidecar_closure),
1267 });
1268
1269 let source_expr: syn::Expr = parse_quote! {
1271 #stream_ident
1272 };
1273 let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1274 self.clone(),
1275 HydroNode::Source {
1276 source: HydroSource::Stream(source_expr.into()),
1277 metadata: self.new_node_metadata(Stream::<
1278 InT,
1279 Self,
1280 Unbounded, TotalOrder, ExactlyOnce,
1283 >::collection_kind()),
1284 },
1285 );
1286
1287 let (fwd_ref, to_sink): (
1289 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1290 Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1291 ) = self.forward_ref();
1292
1293 let sink_expr: syn::Expr = parse_quote! {
1294 #sink_ident
1295 };
1296
1297 let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1298 self.flow_state()
1299 .borrow_mut()
1300 .try_push_root(HydroRoot::DestSink {
1301 sink: sink_expr.into(),
1302 input: Box::new(sink_input_ir),
1303 op_metadata: HydroIrOpMetadata::new(),
1304 });
1305
1306 (inbound, fwd_ref)
1307 }
1308
1309 fn singleton<T>(
1329 &self,
1330 e: impl QuotedWithContext<'a, T, Self>,
1331 ) -> Singleton<T, Self::DropConsistency, Bounded>
1332 where
1333 Self: Sized,
1334 {
1335 let e = e.splice_untyped_ctx(self);
1336
1337 let target_location = self.drop_consistency();
1338 Singleton::new(
1339 target_location.clone(),
1340 HydroNode::SingletonSource {
1341 value: e.into(),
1342 first_tick_only: false,
1343 metadata: target_location.new_node_metadata(Singleton::<
1344 T,
1345 Self::DropConsistency,
1346 Bounded,
1347 >::collection_kind()),
1348 },
1349 )
1350 }
1351
1352 fn singleton_future<F>(
1375 &self,
1376 e: impl QuotedWithContext<'a, F, Self>,
1377 ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1378 where
1379 F: Future,
1380 Self: Sized,
1381 {
1382 self.singleton(e).resolve_future_blocking()
1383 }
1384
1385 fn source_interval(
1394 &self,
1395 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1396 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1397 where
1398 Self: TopLevel<'a> + Sized,
1399 {
1400 self.source_stream(q!(tokio_stream::StreamExt::map(
1401 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1402 |_| ()
1403 )))
1404 .assert_has_consistency_of_trusted(
1405 manual_proof!(),
1406 )
1407 }
1408
1409 fn source_interval_delayed(
1416 &self,
1417 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1418 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1419 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1420 where
1421 Self: TopLevel<'a> + Sized,
1422 {
1423 self.source_stream(q!(tokio_stream::StreamExt::map(
1424 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1425 tokio::time::Instant::now() + delay,
1426 interval,
1427 )),
1428 |_| ()
1429 )))
1430 .assert_has_consistency_of_trusted(
1431 manual_proof!(),
1432 )
1433 }
1434
1435 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1475 where
1476 S: CycleCollection<'a, ForwardRef, Location = Self>,
1477 {
1478 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1479 (
1480 ForwardHandle::new(cycle_id, Location::id(self)),
1481 S::create_source(cycle_id, self.clone()),
1482 )
1483 }
1484}
1485
1486#[cfg(feature = "deploy")]
1487#[cfg(test)]
1488mod tests {
1489 use std::collections::HashSet;
1490
1491 use futures::{SinkExt, StreamExt};
1492 use hydro_deploy::Deployment;
1493 use stageleft::q;
1494 use tokio_util::codec::LengthDelimitedCodec;
1495
1496 use crate::compile::builder::FlowBuilder;
1497 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1498 use crate::location::{Location, NetworkHint};
1499 use crate::nondet::nondet;
1500
1501 #[tokio::test]
1502 async fn top_level_singleton_replay_cardinality() {
1503 let mut deployment = Deployment::new();
1504
1505 let mut flow = FlowBuilder::new();
1506 let node = flow.process::<()>();
1507 let external = flow.external::<()>();
1508
1509 let (in_port, input) =
1510 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1511 let singleton = node.singleton(q!(123));
1512 let tick = node.tick();
1513 let out = input
1514 .batch(&tick, nondet!())
1515 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1516 .cross_singleton(
1517 singleton
1518 .snapshot(&tick, nondet!())
1519 .into_stream()
1520 .count(),
1521 )
1522 .all_ticks()
1523 .send_bincode_external(&external);
1524
1525 let nodes = flow
1526 .with_process(&node, deployment.Localhost())
1527 .with_external(&external, deployment.Localhost())
1528 .deploy(&mut deployment);
1529
1530 deployment.deploy().await.unwrap();
1531
1532 let mut external_in = nodes.connect(in_port).await;
1533 let mut external_out = nodes.connect(out).await;
1534
1535 deployment.start().await.unwrap();
1536
1537 external_in.send(1).await.unwrap();
1538 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1539
1540 external_in.send(2).await.unwrap();
1541 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1542 }
1543
1544 #[tokio::test]
1545 async fn tick_singleton_replay_cardinality() {
1546 let mut deployment = Deployment::new();
1547
1548 let mut flow = FlowBuilder::new();
1549 let node = flow.process::<()>();
1550 let external = flow.external::<()>();
1551
1552 let (in_port, input) =
1553 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1554 let tick = node.tick();
1555 let singleton = tick.singleton(q!(123));
1556 let out = input
1557 .batch(&tick, nondet!())
1558 .cross_singleton(singleton.clone())
1559 .cross_singleton(singleton.into_stream().count())
1560 .all_ticks()
1561 .send_bincode_external(&external);
1562
1563 let nodes = flow
1564 .with_process(&node, deployment.Localhost())
1565 .with_external(&external, deployment.Localhost())
1566 .deploy(&mut deployment);
1567
1568 deployment.deploy().await.unwrap();
1569
1570 let mut external_in = nodes.connect(in_port).await;
1571 let mut external_out = nodes.connect(out).await;
1572
1573 deployment.start().await.unwrap();
1574
1575 external_in.send(1).await.unwrap();
1576 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1577
1578 external_in.send(2).await.unwrap();
1579 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1580 }
1581
1582 #[tokio::test]
1583 async fn external_bytes() {
1584 let mut deployment = Deployment::new();
1585
1586 let mut flow = FlowBuilder::new();
1587 let first_node = flow.process::<()>();
1588 let external = flow.external::<()>();
1589
1590 let (in_port, input) = first_node.source_external_bytes(&external);
1591 let out = input.send_bincode_external(&external);
1592
1593 let nodes = flow
1594 .with_process(&first_node, deployment.Localhost())
1595 .with_external(&external, deployment.Localhost())
1596 .deploy(&mut deployment);
1597
1598 deployment.deploy().await.unwrap();
1599
1600 let mut external_in = nodes.connect(in_port).await.1;
1601 let mut external_out = nodes.connect(out).await;
1602
1603 deployment.start().await.unwrap();
1604
1605 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1606
1607 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1608 }
1609
1610 #[tokio::test]
1611 async fn multi_external_source() {
1612 let mut deployment = Deployment::new();
1613
1614 let mut flow = FlowBuilder::new();
1615 let first_node = flow.process::<()>();
1616 let external = flow.external::<()>();
1617
1618 let (in_port, input, _membership, complete_sink) =
1619 first_node.bidi_external_many_bincode(&external);
1620 let out = input.entries().send_bincode_external(&external);
1621 complete_sink.complete(
1622 first_node
1623 .source_iter::<(u64, ()), _>(q!([]))
1624 .into_keyed()
1625 .weaken_ordering(),
1626 );
1627
1628 let nodes = flow
1629 .with_process(&first_node, deployment.Localhost())
1630 .with_external(&external, deployment.Localhost())
1631 .deploy(&mut deployment);
1632
1633 deployment.deploy().await.unwrap();
1634
1635 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1636 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1637 let external_out = nodes.connect(out).await;
1638
1639 deployment.start().await.unwrap();
1640
1641 external_in_1.send(123).await.unwrap();
1642 external_in_2.send(456).await.unwrap();
1643
1644 assert_eq!(
1645 external_out.take(2).collect::<HashSet<_>>().await,
1646 vec![(0, 123), (1, 456)].into_iter().collect()
1647 );
1648 }
1649
1650 #[tokio::test]
1651 async fn second_connection_only_multi_source() {
1652 let mut deployment = Deployment::new();
1653
1654 let mut flow = FlowBuilder::new();
1655 let first_node = flow.process::<()>();
1656 let external = flow.external::<()>();
1657
1658 let (in_port, input, _membership, complete_sink) =
1659 first_node.bidi_external_many_bincode(&external);
1660 let out = input.entries().send_bincode_external(&external);
1661 complete_sink.complete(
1662 first_node
1663 .source_iter::<(u64, ()), _>(q!([]))
1664 .into_keyed()
1665 .weaken_ordering(),
1666 );
1667
1668 let nodes = flow
1669 .with_process(&first_node, deployment.Localhost())
1670 .with_external(&external, deployment.Localhost())
1671 .deploy(&mut deployment);
1672
1673 deployment.deploy().await.unwrap();
1674
1675 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1677 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1678 let mut external_out = nodes.connect(out).await;
1679
1680 deployment.start().await.unwrap();
1681
1682 external_in_2.send(456).await.unwrap();
1683
1684 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1685 }
1686
1687 #[tokio::test]
1688 async fn multi_external_bytes() {
1689 let mut deployment = Deployment::new();
1690
1691 let mut flow = FlowBuilder::new();
1692 let first_node = flow.process::<()>();
1693 let external = flow.external::<()>();
1694
1695 let (in_port, input, _membership, complete_sink) = first_node
1696 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1697 let out = input.entries().send_bincode_external(&external);
1698 complete_sink.complete(
1699 first_node
1700 .source_iter(q!([]))
1701 .into_keyed()
1702 .weaken_ordering(),
1703 );
1704
1705 let nodes = flow
1706 .with_process(&first_node, deployment.Localhost())
1707 .with_external(&external, deployment.Localhost())
1708 .deploy(&mut deployment);
1709
1710 deployment.deploy().await.unwrap();
1711
1712 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1713 let mut external_in_2 = nodes.connect(in_port).await.1;
1714 let external_out = nodes.connect(out).await;
1715
1716 deployment.start().await.unwrap();
1717
1718 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1719 external_in_2.send(vec![4, 5].into()).await.unwrap();
1720
1721 assert_eq!(
1722 external_out.take(2).collect::<HashSet<_>>().await,
1723 vec![
1724 (0, (&[1u8, 2, 3] as &[u8]).into()),
1725 (1, (&[4u8, 5] as &[u8]).into())
1726 ]
1727 .into_iter()
1728 .collect()
1729 );
1730 }
1731
1732 #[tokio::test]
1733 async fn single_client_external_bytes() {
1734 let mut deployment = Deployment::new();
1735 let mut flow = FlowBuilder::new();
1736 let first_node = flow.process::<()>();
1737 let external = flow.external::<()>();
1738 let (port, input, complete_sink) = first_node
1739 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1740 complete_sink.complete(input.map(q!(|data| {
1741 let mut resp: Vec<u8> = data.into();
1742 resp.push(42);
1743 resp.into() })));
1745
1746 let nodes = flow
1747 .with_process(&first_node, deployment.Localhost())
1748 .with_external(&external, deployment.Localhost())
1749 .deploy(&mut deployment);
1750
1751 deployment.deploy().await.unwrap();
1752 deployment.start().await.unwrap();
1753
1754 let (mut external_out, mut external_in) = nodes.connect(port).await;
1755
1756 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1757 assert_eq!(
1758 external_out.next().await.unwrap().unwrap(),
1759 vec![1, 2, 3, 42]
1760 );
1761 }
1762
1763 #[tokio::test]
1764 async fn echo_external_bytes() {
1765 let mut deployment = Deployment::new();
1766
1767 let mut flow = FlowBuilder::new();
1768 let first_node = flow.process::<()>();
1769 let external = flow.external::<()>();
1770
1771 let (port, input, _membership, complete_sink) = first_node
1772 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1773 complete_sink
1774 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1775
1776 let nodes = flow
1777 .with_process(&first_node, deployment.Localhost())
1778 .with_external(&external, deployment.Localhost())
1779 .deploy(&mut deployment);
1780
1781 deployment.deploy().await.unwrap();
1782
1783 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1784 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1785
1786 deployment.start().await.unwrap();
1787
1788 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1789 external_in_2.send(vec![4, 5].into()).await.unwrap();
1790
1791 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1792 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1793 }
1794
1795 #[tokio::test]
1796 async fn echo_external_bincode() {
1797 let mut deployment = Deployment::new();
1798
1799 let mut flow = FlowBuilder::new();
1800 let first_node = flow.process::<()>();
1801 let external = flow.external::<()>();
1802
1803 let (port, input, _membership, complete_sink) =
1804 first_node.bidi_external_many_bincode(&external);
1805 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1806
1807 let nodes = flow
1808 .with_process(&first_node, deployment.Localhost())
1809 .with_external(&external, deployment.Localhost())
1810 .deploy(&mut deployment);
1811
1812 deployment.deploy().await.unwrap();
1813
1814 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1815 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1816
1817 deployment.start().await.unwrap();
1818
1819 external_in_1.send("hi".to_owned()).await.unwrap();
1820 external_in_2.send("hello".to_owned()).await.unwrap();
1821
1822 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1823 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1824 }
1825
1826 #[tokio::test]
1827 async fn closure_location_name() {
1828 let mut deployment = Deployment::new();
1829 let mut flow = FlowBuilder::new();
1830
1831 enum ClosureProcess {}
1832
1833 let node = flow.process::<ClosureProcess>();
1834 let external = flow.external::<()>();
1835
1836 let (in_port, input) =
1837 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1838 let out = input.send_bincode_external(&external);
1839
1840 let nodes = flow
1841 .with_process(&node, deployment.Localhost())
1842 .with_external(&external, deployment.Localhost())
1843 .deploy(&mut deployment);
1844
1845 deployment.deploy().await.unwrap();
1846
1847 let mut external_in = nodes.connect(in_port).await;
1848 let mut external_out = nodes.connect(out).await;
1849
1850 deployment.start().await.unwrap();
1851
1852 external_in.send(42).await.unwrap();
1853 assert_eq!(external_out.next().await.unwrap(), 42);
1854 }
1855}