1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56pub mod external_process;
57pub use external_process::External;
58
59pub mod process;
60pub use process::Process;
61
62pub mod cluster;
63pub use cluster::Cluster;
64
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
74pub enum MembershipEvent {
75 Joined,
77 Left,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub enum NetworkHint {
88 Auto,
90 TcpPort(Option<u16>),
95}
96
97pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
98 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
99}
100
101#[stageleft::export(LocationKey)]
102new_key_type! {
103 pub struct LocationKey;
105}
106
107impl std::fmt::Display for LocationKey {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "loc{:?}", self.data()) }
111}
112
113impl std::str::FromStr for LocationKey {
116 type Err = Option<ParseIntError>;
117
118 fn from_str(s: &str) -> Result<Self, Self::Err> {
119 let nvn = s.strip_prefix("loc").ok_or(None)?;
120 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
121 let idx: u64 = idx.parse()?;
122 let ver: u64 = ver.parse()?;
123 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
124 }
125}
126
127impl LocationKey {
128 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
134 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
138 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
140
141impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
143 type O = LocationKey;
144
145 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
146 where
147 Self: Sized,
148 {
149 let root = get_this_crate();
150 let n = Key::data(&self).as_ffi();
151 (
152 QuoteTokens {
153 prelude: None,
154 expr: Some(quote! {
155 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
156 }),
157 },
158 (),
159 )
160 }
161}
162
163#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
165pub enum LocationType {
166 Process,
168 Cluster,
170 External,
172}
173
174#[expect(
188 private_bounds,
189 reason = "only internal Hydro code can define location types"
190)]
191pub trait Location<'a>: dynamic::DynLocation {
192 type Root: Location<'a>;
197
198 fn root(&self) -> Self::Root;
203
204 fn try_tick(&self) -> Option<Tick<Self>> {
211 if Self::is_top_level() {
212 let id = self.flow_state().borrow_mut().next_clock_id();
213 Some(Tick {
214 id,
215 l: self.clone(),
216 })
217 } else {
218 None
219 }
220 }
221
222 fn id(&self) -> LocationId {
224 dynamic::DynLocation::id(self)
225 }
226
227 fn tick(&self) -> Tick<Self>
253 where
254 Self: NoTick,
255 {
256 let id = self.flow_state().borrow_mut().next_clock_id();
257 Tick {
258 id,
259 l: self.clone(),
260 }
261 }
262
263 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
288 where
289 Self: Sized + NoTick,
290 {
291 Stream::new(
292 self.clone(),
293 HydroNode::Source {
294 source: HydroSource::Spin(),
295 metadata: self.new_node_metadata(Stream::<
296 (),
297 Self,
298 Unbounded,
299 TotalOrder,
300 ExactlyOnce,
301 >::collection_kind()),
302 },
303 )
304 }
305
306 fn source_stream<T, E>(
327 &self,
328 e: impl QuotedWithContext<'a, E, Self>,
329 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
330 where
331 E: FuturesStream<Item = T> + Unpin,
332 Self: Sized + NoTick,
333 {
334 let e = e.splice_untyped_ctx(self);
335
336 Stream::new(
337 self.clone(),
338 HydroNode::Source {
339 source: HydroSource::Stream(e.into()),
340 metadata: self.new_node_metadata(Stream::<
341 T,
342 Self,
343 Unbounded,
344 TotalOrder,
345 ExactlyOnce,
346 >::collection_kind()),
347 },
348 )
349 }
350
351 fn source_iter<T, E>(
373 &self,
374 e: impl QuotedWithContext<'a, E, Self>,
375 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
376 where
377 E: IntoIterator<Item = T>,
378 Self: Sized + NoTick,
379 {
380 let e = e.splice_typed_ctx(self);
381
382 Stream::new(
383 self.clone(),
384 HydroNode::Source {
385 source: HydroSource::Iter(e.into()),
386 metadata: self.new_node_metadata(
387 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
388 ),
389 },
390 )
391 }
392
393 fn source_cluster_members<C: 'a>(
427 &self,
428 cluster: &Cluster<'a, C>,
429 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
430 where
431 Self: Sized + NoTick,
432 {
433 Stream::new(
434 self.clone(),
435 HydroNode::Source {
436 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
437 metadata: self.new_node_metadata(Stream::<
438 (TaglessMemberId, MembershipEvent),
439 Self,
440 Unbounded,
441 TotalOrder,
442 ExactlyOnce,
443 >::collection_kind()),
444 },
445 )
446 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
447 .into_keyed()
448 }
449
450 fn source_external_bytes<L>(
458 &self,
459 from: &External<L>,
460 ) -> (
461 ExternalBytesPort,
462 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
463 )
464 where
465 Self: Sized + NoTick,
466 {
467 let (port, stream, sink) =
468 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
469
470 sink.complete(self.source_iter(q!([])));
471
472 (port, stream)
473 }
474
475 #[expect(clippy::type_complexity, reason = "stream markers")]
482 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
483 &self,
484 from: &External<L>,
485 ) -> (
486 ExternalBincodeSink<T, NotMany, O, R>,
487 Stream<T, Self, Unbounded, O, R>,
488 )
489 where
490 Self: Sized + NoTick,
491 T: Serialize + DeserializeOwned,
492 {
493 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
494 sink.complete(self.source_iter(q!([])));
495
496 (
497 ExternalBincodeSink {
498 process_key: from.key,
499 port_id: port.port_id,
500 _phantom: PhantomData,
501 },
502 stream.weaken_ordering().weaken_retries(),
503 )
504 }
505
506 #[cfg(feature = "sim")]
511 #[expect(clippy::type_complexity, reason = "stream markers")]
512 fn sim_input<T, O: Ordering, R: Retries>(
513 &self,
514 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
515 where
516 Self: Sized + NoTick,
517 T: Serialize + DeserializeOwned,
518 {
519 let external_location: External<'a, ()> = External {
520 key: LocationKey::FIRST,
521 flow_state: self.flow_state().clone(),
522 _phantom: PhantomData,
523 };
524
525 let (external, stream) = self.source_external_bincode(&external_location);
526
527 (SimSender(external.port_id, PhantomData), stream)
528 }
529
530 fn embedded_input<T>(
536 &self,
537 name: impl Into<String>,
538 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
539 where
540 Self: Sized + NoTick,
541 {
542 let ident = syn::Ident::new(&name.into(), Span::call_site());
543
544 Stream::new(
545 self.clone(),
546 HydroNode::Source {
547 source: HydroSource::Embedded(ident),
548 metadata: self.new_node_metadata(Stream::<
549 T,
550 Self,
551 Unbounded,
552 TotalOrder,
553 ExactlyOnce,
554 >::collection_kind()),
555 },
556 )
557 }
558
559 #[expect(clippy::type_complexity, reason = "stream markers")]
604 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
605 &self,
606 from: &External<L>,
607 port_hint: NetworkHint,
608 ) -> (
609 ExternalBytesPort<NotMany>,
610 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
611 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
612 )
613 where
614 Self: Sized + NoTick,
615 {
616 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
617
618 let (fwd_ref, to_sink) =
619 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
620 let mut flow_state_borrow = self.flow_state().borrow_mut();
621
622 flow_state_borrow.push_root(HydroRoot::SendExternal {
623 to_external_key: from.key,
624 to_port_id: next_external_port_id,
625 to_many: false,
626 unpaired: false,
627 serialize_fn: None,
628 instantiate_fn: DebugInstantiate::Building,
629 input: Box::new(to_sink.ir_node.into_inner()),
630 op_metadata: HydroIrOpMetadata::new(),
631 });
632
633 let raw_stream: Stream<
634 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
635 Self,
636 Unbounded,
637 TotalOrder,
638 ExactlyOnce,
639 > = Stream::new(
640 self.clone(),
641 HydroNode::ExternalInput {
642 from_external_key: from.key,
643 from_port_id: next_external_port_id,
644 from_many: false,
645 codec_type: quote_type::<Codec>().into(),
646 port_hint,
647 instantiate_fn: DebugInstantiate::Building,
648 deserialize_fn: None,
649 metadata: self.new_node_metadata(Stream::<
650 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
651 Self,
652 Unbounded,
653 TotalOrder,
654 ExactlyOnce,
655 >::collection_kind()),
656 },
657 );
658
659 (
660 ExternalBytesPort {
661 process_key: from.key,
662 port_id: next_external_port_id,
663 _phantom: PhantomData,
664 },
665 raw_stream.flatten_ordered(),
666 fwd_ref,
667 )
668 }
669
670 #[expect(clippy::type_complexity, reason = "stream markers")]
680 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
681 &self,
682 from: &External<L>,
683 ) -> (
684 ExternalBincodeBidi<InT, OutT, NotMany>,
685 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
686 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
687 )
688 where
689 Self: Sized + NoTick,
690 {
691 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
692
693 let (fwd_ref, to_sink) =
694 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
695 let mut flow_state_borrow = self.flow_state().borrow_mut();
696
697 let root = get_this_crate();
698
699 let out_t_type = quote_type::<OutT>();
700 let ser_fn: syn::Expr = syn::parse_quote! {
701 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
702 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
703 )
704 };
705
706 flow_state_borrow.push_root(HydroRoot::SendExternal {
707 to_external_key: from.key,
708 to_port_id: next_external_port_id,
709 to_many: false,
710 unpaired: false,
711 serialize_fn: Some(ser_fn.into()),
712 instantiate_fn: DebugInstantiate::Building,
713 input: Box::new(to_sink.ir_node.into_inner()),
714 op_metadata: HydroIrOpMetadata::new(),
715 });
716
717 let in_t_type = quote_type::<InT>();
718
719 let deser_fn: syn::Expr = syn::parse_quote! {
720 |res| {
721 let b = res.unwrap();
722 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
723 }
724 };
725
726 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
727 self.clone(),
728 HydroNode::ExternalInput {
729 from_external_key: from.key,
730 from_port_id: next_external_port_id,
731 from_many: false,
732 codec_type: quote_type::<LengthDelimitedCodec>().into(),
733 port_hint: NetworkHint::Auto,
734 instantiate_fn: DebugInstantiate::Building,
735 deserialize_fn: Some(deser_fn.into()),
736 metadata: self.new_node_metadata(Stream::<
737 InT,
738 Self,
739 Unbounded,
740 TotalOrder,
741 ExactlyOnce,
742 >::collection_kind()),
743 },
744 );
745
746 (
747 ExternalBincodeBidi {
748 process_key: from.key,
749 port_id: next_external_port_id,
750 _phantom: PhantomData,
751 },
752 raw_stream,
753 fwd_ref,
754 )
755 }
756
757 #[expect(clippy::type_complexity, reason = "stream markers")]
769 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
770 &self,
771 from: &External<L>,
772 port_hint: NetworkHint,
773 ) -> (
774 ExternalBytesPort<Many>,
775 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
776 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
777 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
778 )
779 where
780 Self: Sized + NoTick,
781 {
782 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
783
784 let (fwd_ref, to_sink) =
785 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
786 let mut flow_state_borrow = self.flow_state().borrow_mut();
787
788 flow_state_borrow.push_root(HydroRoot::SendExternal {
789 to_external_key: from.key,
790 to_port_id: next_external_port_id,
791 to_many: true,
792 unpaired: false,
793 serialize_fn: None,
794 instantiate_fn: DebugInstantiate::Building,
795 input: Box::new(to_sink.entries().ir_node.into_inner()),
796 op_metadata: HydroIrOpMetadata::new(),
797 });
798
799 let raw_stream: Stream<
800 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
801 Self,
802 Unbounded,
803 TotalOrder,
804 ExactlyOnce,
805 > = Stream::new(
806 self.clone(),
807 HydroNode::ExternalInput {
808 from_external_key: from.key,
809 from_port_id: next_external_port_id,
810 from_many: true,
811 codec_type: quote_type::<Codec>().into(),
812 port_hint,
813 instantiate_fn: DebugInstantiate::Building,
814 deserialize_fn: None,
815 metadata: self.new_node_metadata(Stream::<
816 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
817 Self,
818 Unbounded,
819 TotalOrder,
820 ExactlyOnce,
821 >::collection_kind()),
822 },
823 );
824
825 let membership_stream_ident = syn::Ident::new(
826 &format!(
827 "__hydro_deploy_many_{}_{}_membership",
828 from.key, next_external_port_id
829 ),
830 Span::call_site(),
831 );
832 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
833 let raw_membership_stream: KeyedStream<
834 u64,
835 bool,
836 Self,
837 Unbounded,
838 TotalOrder,
839 ExactlyOnce,
840 > = KeyedStream::new(
841 self.clone(),
842 HydroNode::Source {
843 source: HydroSource::Stream(membership_stream_expr.into()),
844 metadata: self.new_node_metadata(KeyedStream::<
845 u64,
846 bool,
847 Self,
848 Unbounded,
849 TotalOrder,
850 ExactlyOnce,
851 >::collection_kind()),
852 },
853 );
854
855 (
856 ExternalBytesPort {
857 process_key: from.key,
858 port_id: next_external_port_id,
859 _phantom: PhantomData,
860 },
861 raw_stream
862 .flatten_ordered() .into_keyed(),
864 raw_membership_stream.map(q!(|join| {
865 if join {
866 MembershipEvent::Joined
867 } else {
868 MembershipEvent::Left
869 }
870 })),
871 fwd_ref,
872 )
873 }
874
875 #[expect(clippy::type_complexity, reason = "stream markers")]
891 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
892 &self,
893 from: &External<L>,
894 ) -> (
895 ExternalBincodeBidi<InT, OutT, Many>,
896 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
897 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
898 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
899 )
900 where
901 Self: Sized + NoTick,
902 {
903 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
904
905 let (fwd_ref, to_sink) =
906 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
907 let mut flow_state_borrow = self.flow_state().borrow_mut();
908
909 let root = get_this_crate();
910
911 let out_t_type = quote_type::<OutT>();
912 let ser_fn: syn::Expr = syn::parse_quote! {
913 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
914 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
915 )
916 };
917
918 flow_state_borrow.push_root(HydroRoot::SendExternal {
919 to_external_key: from.key,
920 to_port_id: next_external_port_id,
921 to_many: true,
922 unpaired: false,
923 serialize_fn: Some(ser_fn.into()),
924 instantiate_fn: DebugInstantiate::Building,
925 input: Box::new(to_sink.entries().ir_node.into_inner()),
926 op_metadata: HydroIrOpMetadata::new(),
927 });
928
929 let in_t_type = quote_type::<InT>();
930
931 let deser_fn: syn::Expr = syn::parse_quote! {
932 |res| {
933 let (id, b) = res.unwrap();
934 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
935 }
936 };
937
938 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
939 KeyedStream::new(
940 self.clone(),
941 HydroNode::ExternalInput {
942 from_external_key: from.key,
943 from_port_id: next_external_port_id,
944 from_many: true,
945 codec_type: quote_type::<LengthDelimitedCodec>().into(),
946 port_hint: NetworkHint::Auto,
947 instantiate_fn: DebugInstantiate::Building,
948 deserialize_fn: Some(deser_fn.into()),
949 metadata: self.new_node_metadata(KeyedStream::<
950 u64,
951 InT,
952 Self,
953 Unbounded,
954 TotalOrder,
955 ExactlyOnce,
956 >::collection_kind()),
957 },
958 );
959
960 let membership_stream_ident = syn::Ident::new(
961 &format!(
962 "__hydro_deploy_many_{}_{}_membership",
963 from.key, next_external_port_id
964 ),
965 Span::call_site(),
966 );
967 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
968 let raw_membership_stream: KeyedStream<
969 u64,
970 bool,
971 Self,
972 Unbounded,
973 TotalOrder,
974 ExactlyOnce,
975 > = KeyedStream::new(
976 self.clone(),
977 HydroNode::Source {
978 source: HydroSource::Stream(membership_stream_expr.into()),
979 metadata: self.new_node_metadata(KeyedStream::<
980 u64,
981 bool,
982 Self,
983 Unbounded,
984 TotalOrder,
985 ExactlyOnce,
986 >::collection_kind()),
987 },
988 );
989
990 (
991 ExternalBincodeBidi {
992 process_key: from.key,
993 port_id: next_external_port_id,
994 _phantom: PhantomData,
995 },
996 raw_stream,
997 raw_membership_stream.map(q!(|join| {
998 if join {
999 MembershipEvent::Joined
1000 } else {
1001 MembershipEvent::Left
1002 }
1003 })),
1004 fwd_ref,
1005 )
1006 }
1007
1008 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1026 where
1027 T: Clone,
1028 Self: Sized,
1029 {
1030 let e = e.splice_untyped_ctx(self);
1031
1032 Singleton::new(
1033 self.clone(),
1034 HydroNode::SingletonSource {
1035 value: e.into(),
1036 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1037 },
1038 )
1039 }
1040
1041 fn source_interval(
1051 &self,
1052 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1053 _nondet: NonDet,
1054 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1055 where
1056 Self: Sized + NoTick,
1057 {
1058 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1059 tokio::time::interval(interval)
1060 )))
1061 }
1062
1063 fn source_interval_delayed(
1074 &self,
1075 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1076 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1077 _nondet: NonDet,
1078 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1079 where
1080 Self: Sized + NoTick,
1081 {
1082 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1083 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1084 )))
1085 }
1086
1087 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1121 where
1122 S: CycleCollection<'a, ForwardRef, Location = Self>,
1123 {
1124 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1125 (
1126 ForwardHandle::new(cycle_id, Location::id(self)),
1127 S::create_source(cycle_id, self.clone()),
1128 )
1129 }
1130}
1131
1132#[cfg(feature = "deploy")]
1133#[cfg(test)]
1134mod tests {
1135 use std::collections::HashSet;
1136
1137 use futures::{SinkExt, StreamExt};
1138 use hydro_deploy::Deployment;
1139 use stageleft::q;
1140 use tokio_util::codec::LengthDelimitedCodec;
1141
1142 use crate::compile::builder::FlowBuilder;
1143 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1144 use crate::location::{Location, NetworkHint};
1145 use crate::nondet::nondet;
1146
1147 #[tokio::test]
1148 async fn top_level_singleton_replay_cardinality() {
1149 let mut deployment = Deployment::new();
1150
1151 let mut flow = FlowBuilder::new();
1152 let node = flow.process::<()>();
1153 let external = flow.external::<()>();
1154
1155 let (in_port, input) =
1156 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1157 let singleton = node.singleton(q!(123));
1158 let tick = node.tick();
1159 let out = input
1160 .batch(&tick, nondet!())
1161 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1162 .cross_singleton(
1163 singleton
1164 .snapshot(&tick, nondet!())
1165 .into_stream()
1166 .count(),
1167 )
1168 .all_ticks()
1169 .send_bincode_external(&external);
1170
1171 let nodes = flow
1172 .with_process(&node, deployment.Localhost())
1173 .with_external(&external, deployment.Localhost())
1174 .deploy(&mut deployment);
1175
1176 deployment.deploy().await.unwrap();
1177
1178 let mut external_in = nodes.connect(in_port).await;
1179 let mut external_out = nodes.connect(out).await;
1180
1181 deployment.start().await.unwrap();
1182
1183 external_in.send(1).await.unwrap();
1184 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1185
1186 external_in.send(2).await.unwrap();
1187 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1188 }
1189
1190 #[tokio::test]
1191 async fn tick_singleton_replay_cardinality() {
1192 let mut deployment = Deployment::new();
1193
1194 let mut flow = FlowBuilder::new();
1195 let node = flow.process::<()>();
1196 let external = flow.external::<()>();
1197
1198 let (in_port, input) =
1199 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1200 let tick = node.tick();
1201 let singleton = tick.singleton(q!(123));
1202 let out = input
1203 .batch(&tick, nondet!())
1204 .cross_singleton(singleton.clone())
1205 .cross_singleton(singleton.into_stream().count())
1206 .all_ticks()
1207 .send_bincode_external(&external);
1208
1209 let nodes = flow
1210 .with_process(&node, deployment.Localhost())
1211 .with_external(&external, deployment.Localhost())
1212 .deploy(&mut deployment);
1213
1214 deployment.deploy().await.unwrap();
1215
1216 let mut external_in = nodes.connect(in_port).await;
1217 let mut external_out = nodes.connect(out).await;
1218
1219 deployment.start().await.unwrap();
1220
1221 external_in.send(1).await.unwrap();
1222 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1223
1224 external_in.send(2).await.unwrap();
1225 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1226 }
1227
1228 #[tokio::test]
1229 async fn external_bytes() {
1230 let mut deployment = Deployment::new();
1231
1232 let mut flow = FlowBuilder::new();
1233 let first_node = flow.process::<()>();
1234 let external = flow.external::<()>();
1235
1236 let (in_port, input) = first_node.source_external_bytes(&external);
1237 let out = input.send_bincode_external(&external);
1238
1239 let nodes = flow
1240 .with_process(&first_node, deployment.Localhost())
1241 .with_external(&external, deployment.Localhost())
1242 .deploy(&mut deployment);
1243
1244 deployment.deploy().await.unwrap();
1245
1246 let mut external_in = nodes.connect(in_port).await.1;
1247 let mut external_out = nodes.connect(out).await;
1248
1249 deployment.start().await.unwrap();
1250
1251 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1252
1253 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1254 }
1255
1256 #[tokio::test]
1257 async fn multi_external_source() {
1258 let mut deployment = Deployment::new();
1259
1260 let mut flow = FlowBuilder::new();
1261 let first_node = flow.process::<()>();
1262 let external = flow.external::<()>();
1263
1264 let (in_port, input, _membership, complete_sink) =
1265 first_node.bidi_external_many_bincode(&external);
1266 let out = input.entries().send_bincode_external(&external);
1267 complete_sink.complete(
1268 first_node
1269 .source_iter::<(u64, ()), _>(q!([]))
1270 .into_keyed()
1271 .weaken_ordering(),
1272 );
1273
1274 let nodes = flow
1275 .with_process(&first_node, deployment.Localhost())
1276 .with_external(&external, deployment.Localhost())
1277 .deploy(&mut deployment);
1278
1279 deployment.deploy().await.unwrap();
1280
1281 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1282 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1283 let external_out = nodes.connect(out).await;
1284
1285 deployment.start().await.unwrap();
1286
1287 external_in_1.send(123).await.unwrap();
1288 external_in_2.send(456).await.unwrap();
1289
1290 assert_eq!(
1291 external_out.take(2).collect::<HashSet<_>>().await,
1292 vec![(0, 123), (1, 456)].into_iter().collect()
1293 );
1294 }
1295
1296 #[tokio::test]
1297 async fn second_connection_only_multi_source() {
1298 let mut deployment = Deployment::new();
1299
1300 let mut flow = FlowBuilder::new();
1301 let first_node = flow.process::<()>();
1302 let external = flow.external::<()>();
1303
1304 let (in_port, input, _membership, complete_sink) =
1305 first_node.bidi_external_many_bincode(&external);
1306 let out = input.entries().send_bincode_external(&external);
1307 complete_sink.complete(
1308 first_node
1309 .source_iter::<(u64, ()), _>(q!([]))
1310 .into_keyed()
1311 .weaken_ordering(),
1312 );
1313
1314 let nodes = flow
1315 .with_process(&first_node, deployment.Localhost())
1316 .with_external(&external, deployment.Localhost())
1317 .deploy(&mut deployment);
1318
1319 deployment.deploy().await.unwrap();
1320
1321 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1323 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1324 let mut external_out = nodes.connect(out).await;
1325
1326 deployment.start().await.unwrap();
1327
1328 external_in_2.send(456).await.unwrap();
1329
1330 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1331 }
1332
1333 #[tokio::test]
1334 async fn multi_external_bytes() {
1335 let mut deployment = Deployment::new();
1336
1337 let mut flow = FlowBuilder::new();
1338 let first_node = flow.process::<()>();
1339 let external = flow.external::<()>();
1340
1341 let (in_port, input, _membership, complete_sink) = first_node
1342 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1343 let out = input.entries().send_bincode_external(&external);
1344 complete_sink.complete(
1345 first_node
1346 .source_iter(q!([]))
1347 .into_keyed()
1348 .weaken_ordering(),
1349 );
1350
1351 let nodes = flow
1352 .with_process(&first_node, deployment.Localhost())
1353 .with_external(&external, deployment.Localhost())
1354 .deploy(&mut deployment);
1355
1356 deployment.deploy().await.unwrap();
1357
1358 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1359 let mut external_in_2 = nodes.connect(in_port).await.1;
1360 let external_out = nodes.connect(out).await;
1361
1362 deployment.start().await.unwrap();
1363
1364 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1365 external_in_2.send(vec![4, 5].into()).await.unwrap();
1366
1367 assert_eq!(
1368 external_out.take(2).collect::<HashSet<_>>().await,
1369 vec![
1370 (0, (&[1u8, 2, 3] as &[u8]).into()),
1371 (1, (&[4u8, 5] as &[u8]).into())
1372 ]
1373 .into_iter()
1374 .collect()
1375 );
1376 }
1377
1378 #[tokio::test]
1379 async fn single_client_external_bytes() {
1380 let mut deployment = Deployment::new();
1381 let mut flow = FlowBuilder::new();
1382 let first_node = flow.process::<()>();
1383 let external = flow.external::<()>();
1384 let (port, input, complete_sink) = first_node
1385 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1386 complete_sink.complete(input.map(q!(|data| {
1387 let mut resp: Vec<u8> = data.into();
1388 resp.push(42);
1389 resp.into() })));
1391
1392 let nodes = flow
1393 .with_process(&first_node, deployment.Localhost())
1394 .with_external(&external, deployment.Localhost())
1395 .deploy(&mut deployment);
1396
1397 deployment.deploy().await.unwrap();
1398 deployment.start().await.unwrap();
1399
1400 let (mut external_out, mut external_in) = nodes.connect(port).await;
1401
1402 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1403 assert_eq!(
1404 external_out.next().await.unwrap().unwrap(),
1405 vec![1, 2, 3, 42]
1406 );
1407 }
1408
1409 #[tokio::test]
1410 async fn echo_external_bytes() {
1411 let mut deployment = Deployment::new();
1412
1413 let mut flow = FlowBuilder::new();
1414 let first_node = flow.process::<()>();
1415 let external = flow.external::<()>();
1416
1417 let (port, input, _membership, complete_sink) = first_node
1418 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1419 complete_sink
1420 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1421
1422 let nodes = flow
1423 .with_process(&first_node, deployment.Localhost())
1424 .with_external(&external, deployment.Localhost())
1425 .deploy(&mut deployment);
1426
1427 deployment.deploy().await.unwrap();
1428
1429 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1430 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1431
1432 deployment.start().await.unwrap();
1433
1434 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1435 external_in_2.send(vec![4, 5].into()).await.unwrap();
1436
1437 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1438 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1439 }
1440
1441 #[tokio::test]
1442 async fn echo_external_bincode() {
1443 let mut deployment = Deployment::new();
1444
1445 let mut flow = FlowBuilder::new();
1446 let first_node = flow.process::<()>();
1447 let external = flow.external::<()>();
1448
1449 let (port, input, _membership, complete_sink) =
1450 first_node.bidi_external_many_bincode(&external);
1451 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1452
1453 let nodes = flow
1454 .with_process(&first_node, deployment.Localhost())
1455 .with_external(&external, deployment.Localhost())
1456 .deploy(&mut deployment);
1457
1458 deployment.deploy().await.unwrap();
1459
1460 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1461 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1462
1463 deployment.start().await.unwrap();
1464
1465 external_in_1.send("hi".to_owned()).await.unwrap();
1466 external_in_2.send("hello".to_owned()).await.unwrap();
1467
1468 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1469 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1470 }
1471}