1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::dynamic::LocationId;
40use crate::location::external_process::{
41 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
42};
43use crate::nondet::NonDet;
44#[cfg(feature = "sim")]
45use crate::sim::SimSender;
46use crate::staging_util::get_this_crate;
47
48pub mod dynamic;
49
50#[expect(missing_docs, reason = "TODO")]
51pub mod external_process;
52pub use external_process::External;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod process;
56pub use process::Process;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod cluster;
60pub use cluster::Cluster;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod member_id;
64pub use member_id::{MemberId, TaglessMemberId};
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72 Joined,
73 Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79 Auto,
80 TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89 private_bounds,
90 reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93 type Root: Location<'a>;
94
95 fn root(&self) -> Self::Root;
96
97 fn try_tick(&self) -> Option<Tick<Self>> {
98 if Self::is_top_level() {
99 let next_id = self.flow_state().borrow_mut().next_clock_id;
100 self.flow_state().borrow_mut().next_clock_id += 1;
101 Some(Tick {
102 id: next_id,
103 l: self.clone(),
104 })
105 } else {
106 None
107 }
108 }
109
110 fn id(&self) -> LocationId {
111 dynamic::DynLocation::id(self)
112 }
113
114 fn tick(&self) -> Tick<Self>
115 where
116 Self: NoTick,
117 {
118 let next_id = self.flow_state().borrow_mut().next_clock_id;
119 self.flow_state().borrow_mut().next_clock_id += 1;
120 Tick {
121 id: next_id,
122 l: self.clone(),
123 }
124 }
125
126 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127 where
128 Self: Sized + NoTick,
129 {
130 Stream::new(
131 self.clone(),
132 HydroNode::Source {
133 source: HydroSource::Spin(),
134 metadata: self.new_node_metadata(Stream::<
135 (),
136 Self,
137 Unbounded,
138 TotalOrder,
139 ExactlyOnce,
140 >::collection_kind()),
141 },
142 )
143 }
144
145 fn source_stream<T, E>(
146 &self,
147 e: impl QuotedWithContext<'a, E, Self>,
148 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149 where
150 E: FuturesStream<Item = T> + Unpin,
151 Self: Sized + NoTick,
152 {
153 let e = e.splice_untyped_ctx(self);
154
155 Stream::new(
156 self.clone(),
157 HydroNode::Source {
158 source: HydroSource::Stream(e.into()),
159 metadata: self.new_node_metadata(Stream::<
160 T,
161 Self,
162 Unbounded,
163 TotalOrder,
164 ExactlyOnce,
165 >::collection_kind()),
166 },
167 )
168 }
169
170 fn source_iter<T, E>(
171 &self,
172 e: impl QuotedWithContext<'a, E, Self>,
173 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174 where
175 E: IntoIterator<Item = T>,
176 Self: Sized + NoTick,
177 {
178 let e = e.splice_typed_ctx(self);
181
182 Stream::new(
183 self.clone(),
184 HydroNode::Source {
185 source: HydroSource::Iter(e.into()),
186 metadata: self.new_node_metadata(Stream::<
187 T,
188 Self,
189 Unbounded,
190 TotalOrder,
191 ExactlyOnce,
192 >::collection_kind()),
193 },
194 )
195 }
196
197 fn source_cluster_members<C: 'a>(
198 &self,
199 cluster: &Cluster<'a, C>,
200 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201 where
202 Self: Sized + NoTick,
203 {
204 Stream::new(
205 self.clone(),
206 HydroNode::Source {
207 source: HydroSource::ClusterMembers(cluster.id()),
208 metadata: self.new_node_metadata(Stream::<
209 (TaglessMemberId, MembershipEvent),
210 Self,
211 Unbounded,
212 TotalOrder,
213 ExactlyOnce,
214 >::collection_kind()),
215 },
216 )
217 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
218 .into_keyed()
219 }
220
221 fn source_external_bytes<L>(
222 &self,
223 from: &External<L>,
224 ) -> (
225 ExternalBytesPort,
226 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
227 )
228 where
229 Self: Sized + NoTick,
230 {
231 let (port, stream, sink) =
232 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
233
234 sink.complete(self.source_iter(q!([])));
235
236 (port, stream)
237 }
238
239 #[expect(clippy::type_complexity, reason = "stream markers")]
240 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
241 &self,
242 from: &External<L>,
243 ) -> (
244 ExternalBincodeSink<T, NotMany, O, R>,
245 Stream<T, Self, Unbounded, O, R>,
246 )
247 where
248 Self: Sized + NoTick,
249 T: Serialize + DeserializeOwned,
250 {
251 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
252 sink.complete(self.source_iter(q!([])));
253
254 (
255 ExternalBincodeSink {
256 process_id: from.id,
257 port_id: port.port_id,
258 _phantom: PhantomData,
259 },
260 stream.weaken_ordering().weaken_retries(),
261 )
262 }
263
264 #[cfg(feature = "sim")]
265 #[expect(clippy::type_complexity, reason = "stream markers")]
266 fn sim_input<T, O: Ordering, R: Retries>(
269 &self,
270 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
271 where
272 Self: Sized + NoTick,
273 T: Serialize + DeserializeOwned,
274 {
275 let external_location: External<'a, ()> = External {
276 id: 0,
277 flow_state: self.flow_state().clone(),
278 _phantom: PhantomData,
279 };
280
281 let (external, stream) = self.source_external_bincode(&external_location);
282
283 (SimSender(external.port_id, PhantomData), stream)
284 }
285
286 #[expect(clippy::type_complexity, reason = "stream markers")]
331 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
332 &self,
333 from: &External<L>,
334 port_hint: NetworkHint,
335 ) -> (
336 ExternalBytesPort<NotMany>,
337 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
338 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
339 )
340 where
341 Self: Sized + NoTick,
342 {
343 let next_external_port_id = {
344 let mut flow_state = from.flow_state.borrow_mut();
345 let id = flow_state.next_external_out;
346 flow_state.next_external_out += 1;
347 id
348 };
349
350 let (fwd_ref, to_sink) =
351 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
352 let mut flow_state_borrow = self.flow_state().borrow_mut();
353
354 flow_state_borrow.push_root(HydroRoot::SendExternal {
355 to_external_id: from.id,
356 to_key: next_external_port_id,
357 to_many: false,
358 unpaired: false,
359 serialize_fn: None,
360 instantiate_fn: DebugInstantiate::Building,
361 input: Box::new(to_sink.ir_node.into_inner()),
362 op_metadata: HydroIrOpMetadata::new(),
363 });
364
365 let raw_stream: Stream<
366 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
367 Self,
368 Unbounded,
369 TotalOrder,
370 ExactlyOnce,
371 > = Stream::new(
372 self.clone(),
373 HydroNode::ExternalInput {
374 from_external_id: from.id,
375 from_key: next_external_port_id,
376 from_many: false,
377 codec_type: quote_type::<Codec>().into(),
378 port_hint,
379 instantiate_fn: DebugInstantiate::Building,
380 deserialize_fn: None,
381 metadata: self.new_node_metadata(Stream::<
382 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
383 Self,
384 Unbounded,
385 TotalOrder,
386 ExactlyOnce,
387 >::collection_kind()),
388 },
389 );
390
391 (
392 ExternalBytesPort {
393 process_id: from.id,
394 port_id: next_external_port_id,
395 _phantom: PhantomData,
396 },
397 raw_stream.flatten_ordered(),
398 fwd_ref,
399 )
400 }
401
402 #[expect(clippy::type_complexity, reason = "stream markers")]
403 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
404 &self,
405 from: &External<L>,
406 ) -> (
407 ExternalBincodeBidi<InT, OutT, NotMany>,
408 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
409 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
410 )
411 where
412 Self: Sized + NoTick,
413 {
414 let next_external_port_id = {
415 let mut flow_state = from.flow_state.borrow_mut();
416 let id = flow_state.next_external_out;
417 flow_state.next_external_out += 1;
418 id
419 };
420
421 let (fwd_ref, to_sink) =
422 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
423 let mut flow_state_borrow = self.flow_state().borrow_mut();
424
425 let root = get_this_crate();
426
427 let out_t_type = quote_type::<OutT>();
428 let ser_fn: syn::Expr = syn::parse_quote! {
429 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
430 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
431 )
432 };
433
434 flow_state_borrow.push_root(HydroRoot::SendExternal {
435 to_external_id: from.id,
436 to_key: next_external_port_id,
437 to_many: false,
438 unpaired: false,
439 serialize_fn: Some(ser_fn.into()),
440 instantiate_fn: DebugInstantiate::Building,
441 input: Box::new(to_sink.ir_node.into_inner()),
442 op_metadata: HydroIrOpMetadata::new(),
443 });
444
445 let in_t_type = quote_type::<InT>();
446
447 let deser_fn: syn::Expr = syn::parse_quote! {
448 |res| {
449 let b = res.unwrap();
450 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
451 }
452 };
453
454 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
455 self.clone(),
456 HydroNode::ExternalInput {
457 from_external_id: from.id,
458 from_key: next_external_port_id,
459 from_many: false,
460 codec_type: quote_type::<LengthDelimitedCodec>().into(),
461 port_hint: NetworkHint::Auto,
462 instantiate_fn: DebugInstantiate::Building,
463 deserialize_fn: Some(deser_fn.into()),
464 metadata: self.new_node_metadata(Stream::<
465 InT,
466 Self,
467 Unbounded,
468 TotalOrder,
469 ExactlyOnce,
470 >::collection_kind()),
471 },
472 );
473
474 (
475 ExternalBincodeBidi {
476 process_id: from.id,
477 port_id: next_external_port_id,
478 _phantom: PhantomData,
479 },
480 raw_stream,
481 fwd_ref,
482 )
483 }
484
485 #[expect(clippy::type_complexity, reason = "stream markers")]
486 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
487 &self,
488 from: &External<L>,
489 port_hint: NetworkHint,
490 ) -> (
491 ExternalBytesPort<Many>,
492 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
493 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
494 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
495 )
496 where
497 Self: Sized + NoTick,
498 {
499 let next_external_port_id = {
500 let mut flow_state = from.flow_state.borrow_mut();
501 let id = flow_state.next_external_out;
502 flow_state.next_external_out += 1;
503 id
504 };
505
506 let (fwd_ref, to_sink) =
507 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
508 let mut flow_state_borrow = self.flow_state().borrow_mut();
509
510 flow_state_borrow.push_root(HydroRoot::SendExternal {
511 to_external_id: from.id,
512 to_key: next_external_port_id,
513 to_many: true,
514 unpaired: false,
515 serialize_fn: None,
516 instantiate_fn: DebugInstantiate::Building,
517 input: Box::new(to_sink.entries().ir_node.into_inner()),
518 op_metadata: HydroIrOpMetadata::new(),
519 });
520
521 let raw_stream: Stream<
522 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
523 Self,
524 Unbounded,
525 TotalOrder,
526 ExactlyOnce,
527 > = Stream::new(
528 self.clone(),
529 HydroNode::ExternalInput {
530 from_external_id: from.id,
531 from_key: next_external_port_id,
532 from_many: true,
533 codec_type: quote_type::<Codec>().into(),
534 port_hint,
535 instantiate_fn: DebugInstantiate::Building,
536 deserialize_fn: None,
537 metadata: self.new_node_metadata(Stream::<
538 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
539 Self,
540 Unbounded,
541 TotalOrder,
542 ExactlyOnce,
543 >::collection_kind()),
544 },
545 );
546
547 let membership_stream_ident = syn::Ident::new(
548 &format!(
549 "__hydro_deploy_many_{}_{}_membership",
550 from.id, next_external_port_id
551 ),
552 Span::call_site(),
553 );
554 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
555 let raw_membership_stream: KeyedStream<
556 u64,
557 bool,
558 Self,
559 Unbounded,
560 TotalOrder,
561 ExactlyOnce,
562 > = KeyedStream::new(
563 self.clone(),
564 HydroNode::Source {
565 source: HydroSource::Stream(membership_stream_expr.into()),
566 metadata: self.new_node_metadata(KeyedStream::<
567 u64,
568 bool,
569 Self,
570 Unbounded,
571 TotalOrder,
572 ExactlyOnce,
573 >::collection_kind()),
574 },
575 );
576
577 (
578 ExternalBytesPort {
579 process_id: from.id,
580 port_id: next_external_port_id,
581 _phantom: PhantomData,
582 },
583 raw_stream
584 .flatten_ordered() .into_keyed(),
586 raw_membership_stream.map(q!(|join| {
587 if join {
588 MembershipEvent::Joined
589 } else {
590 MembershipEvent::Left
591 }
592 })),
593 fwd_ref,
594 )
595 }
596
597 #[expect(clippy::type_complexity, reason = "stream markers")]
598 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
599 &self,
600 from: &External<L>,
601 ) -> (
602 ExternalBincodeBidi<InT, OutT, Many>,
603 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
604 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
605 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
606 )
607 where
608 Self: Sized + NoTick,
609 {
610 let next_external_port_id = {
611 let mut flow_state = from.flow_state.borrow_mut();
612 let id = flow_state.next_external_out;
613 flow_state.next_external_out += 1;
614 id
615 };
616
617 let root = get_this_crate();
618
619 let (fwd_ref, to_sink) =
620 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
621 let mut flow_state_borrow = self.flow_state().borrow_mut();
622
623 let out_t_type = quote_type::<OutT>();
624 let ser_fn: syn::Expr = syn::parse_quote! {
625 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
626 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
627 )
628 };
629
630 flow_state_borrow.push_root(HydroRoot::SendExternal {
631 to_external_id: from.id,
632 to_key: next_external_port_id,
633 to_many: true,
634 unpaired: false,
635 serialize_fn: Some(ser_fn.into()),
636 instantiate_fn: DebugInstantiate::Building,
637 input: Box::new(to_sink.entries().ir_node.into_inner()),
638 op_metadata: HydroIrOpMetadata::new(),
639 });
640
641 let in_t_type = quote_type::<InT>();
642
643 let deser_fn: syn::Expr = syn::parse_quote! {
644 |res| {
645 let (id, b) = res.unwrap();
646 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
647 }
648 };
649
650 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
651 KeyedStream::new(
652 self.clone(),
653 HydroNode::ExternalInput {
654 from_external_id: from.id,
655 from_key: next_external_port_id,
656 from_many: true,
657 codec_type: quote_type::<LengthDelimitedCodec>().into(),
658 port_hint: NetworkHint::Auto,
659 instantiate_fn: DebugInstantiate::Building,
660 deserialize_fn: Some(deser_fn.into()),
661 metadata: self.new_node_metadata(KeyedStream::<
662 u64,
663 InT,
664 Self,
665 Unbounded,
666 TotalOrder,
667 ExactlyOnce,
668 >::collection_kind()),
669 },
670 );
671
672 let membership_stream_ident = syn::Ident::new(
673 &format!(
674 "__hydro_deploy_many_{}_{}_membership",
675 from.id, next_external_port_id
676 ),
677 Span::call_site(),
678 );
679 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
680 let raw_membership_stream: KeyedStream<
681 u64,
682 bool,
683 Self,
684 Unbounded,
685 TotalOrder,
686 ExactlyOnce,
687 > = KeyedStream::new(
688 self.clone(),
689 HydroNode::Source {
690 source: HydroSource::Stream(membership_stream_expr.into()),
691 metadata: self.new_node_metadata(KeyedStream::<
692 u64,
693 bool,
694 Self,
695 Unbounded,
696 TotalOrder,
697 ExactlyOnce,
698 >::collection_kind()),
699 },
700 );
701
702 (
703 ExternalBincodeBidi {
704 process_id: from.id,
705 port_id: next_external_port_id,
706 _phantom: PhantomData,
707 },
708 raw_stream,
709 raw_membership_stream.map(q!(|join| {
710 if join {
711 MembershipEvent::Joined
712 } else {
713 MembershipEvent::Left
714 }
715 })),
716 fwd_ref,
717 )
718 }
719
720 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
738 where
739 T: Clone,
740 Self: Sized,
741 {
742 let e = e.splice_untyped_ctx(self);
746
747 Singleton::new(
748 self.clone(),
749 HydroNode::SingletonSource {
750 value: e.into(),
751 metadata: self
752 .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
753 },
754 )
755 }
756
757 fn source_interval(
767 &self,
768 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
769 _nondet: NonDet,
770 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
771 where
772 Self: Sized + NoTick,
773 {
774 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
775 tokio::time::interval(interval)
776 )))
777 }
778
779 fn source_interval_delayed(
790 &self,
791 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
792 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
793 _nondet: NonDet,
794 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
795 where
796 Self: Sized + NoTick,
797 {
798 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
799 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
800 )))
801 }
802
803 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
804 where
805 S: CycleCollection<'a, ForwardRef, Location = Self>,
806 Self: NoTick,
807 {
808 let next_id = self.flow_state().borrow_mut().next_cycle_id();
809 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
810
811 (
812 ForwardHandle {
813 completed: false,
814 ident: ident.clone(),
815 expected_location: Location::id(self),
816 _phantom: PhantomData,
817 },
818 S::create_source(ident, self.clone()),
819 )
820 }
821}
822
823#[cfg(feature = "deploy")]
824#[cfg(test)]
825mod tests {
826 use std::collections::HashSet;
827
828 use futures::{SinkExt, StreamExt};
829 use hydro_deploy::Deployment;
830 use stageleft::q;
831 use tokio_util::codec::LengthDelimitedCodec;
832
833 use crate::compile::builder::FlowBuilder;
834 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
835 use crate::location::{Location, NetworkHint};
836 use crate::nondet::nondet;
837
838 #[tokio::test]
839 async fn top_level_singleton_replay_cardinality() {
840 let mut deployment = Deployment::new();
841
842 let flow = FlowBuilder::new();
843 let node = flow.process::<()>();
844 let external = flow.external::<()>();
845
846 let (in_port, input) =
847 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
848 let singleton = node.singleton(q!(123));
849 let tick = node.tick();
850 let out = input
851 .batch(&tick, nondet!())
852 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
853 .cross_singleton(
854 singleton
855 .snapshot(&tick, nondet!())
856 .into_stream()
857 .count(),
858 )
859 .all_ticks()
860 .send_bincode_external(&external);
861
862 let nodes = flow
863 .with_process(&node, deployment.Localhost())
864 .with_external(&external, deployment.Localhost())
865 .deploy(&mut deployment);
866
867 deployment.deploy().await.unwrap();
868
869 let mut external_in = nodes.connect(in_port).await;
870 let mut external_out = nodes.connect(out).await;
871
872 deployment.start().await.unwrap();
873
874 external_in.send(1).await.unwrap();
875 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
876
877 external_in.send(2).await.unwrap();
878 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
879 }
880
881 #[tokio::test]
882 async fn tick_singleton_replay_cardinality() {
883 let mut deployment = Deployment::new();
884
885 let flow = FlowBuilder::new();
886 let node = flow.process::<()>();
887 let external = flow.external::<()>();
888
889 let (in_port, input) =
890 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
891 let tick = node.tick();
892 let singleton = tick.singleton(q!(123));
893 let out = input
894 .batch(&tick, nondet!())
895 .cross_singleton(singleton.clone())
896 .cross_singleton(singleton.into_stream().count())
897 .all_ticks()
898 .send_bincode_external(&external);
899
900 let nodes = flow
901 .with_process(&node, deployment.Localhost())
902 .with_external(&external, deployment.Localhost())
903 .deploy(&mut deployment);
904
905 deployment.deploy().await.unwrap();
906
907 let mut external_in = nodes.connect(in_port).await;
908 let mut external_out = nodes.connect(out).await;
909
910 deployment.start().await.unwrap();
911
912 external_in.send(1).await.unwrap();
913 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
914
915 external_in.send(2).await.unwrap();
916 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
917 }
918
919 #[tokio::test]
920 async fn external_bytes() {
921 let mut deployment = Deployment::new();
922
923 let flow = FlowBuilder::new();
924 let first_node = flow.process::<()>();
925 let external = flow.external::<()>();
926
927 let (in_port, input) = first_node.source_external_bytes(&external);
928 let out = input.send_bincode_external(&external);
929
930 let nodes = flow
931 .with_process(&first_node, deployment.Localhost())
932 .with_external(&external, deployment.Localhost())
933 .deploy(&mut deployment);
934
935 deployment.deploy().await.unwrap();
936
937 let mut external_in = nodes.connect(in_port).await.1;
938 let mut external_out = nodes.connect(out).await;
939
940 deployment.start().await.unwrap();
941
942 external_in.send(vec![1, 2, 3].into()).await.unwrap();
943
944 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
945 }
946
947 #[tokio::test]
948 async fn multi_external_source() {
949 let mut deployment = Deployment::new();
950
951 let flow = FlowBuilder::new();
952 let first_node = flow.process::<()>();
953 let external = flow.external::<()>();
954
955 let (in_port, input, _membership, complete_sink) =
956 first_node.bidi_external_many_bincode(&external);
957 let out = input.entries().send_bincode_external(&external);
958 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
959
960 let nodes = flow
961 .with_process(&first_node, deployment.Localhost())
962 .with_external(&external, deployment.Localhost())
963 .deploy(&mut deployment);
964
965 deployment.deploy().await.unwrap();
966
967 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
968 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
969 let external_out = nodes.connect(out).await;
970
971 deployment.start().await.unwrap();
972
973 external_in_1.send(123).await.unwrap();
974 external_in_2.send(456).await.unwrap();
975
976 assert_eq!(
977 external_out.take(2).collect::<HashSet<_>>().await,
978 vec![(0, 123), (1, 456)].into_iter().collect()
979 );
980 }
981
982 #[tokio::test]
983 async fn second_connection_only_multi_source() {
984 let mut deployment = Deployment::new();
985
986 let flow = FlowBuilder::new();
987 let first_node = flow.process::<()>();
988 let external = flow.external::<()>();
989
990 let (in_port, input, _membership, complete_sink) =
991 first_node.bidi_external_many_bincode(&external);
992 let out = input.entries().send_bincode_external(&external);
993 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
994
995 let nodes = flow
996 .with_process(&first_node, deployment.Localhost())
997 .with_external(&external, deployment.Localhost())
998 .deploy(&mut deployment);
999
1000 deployment.deploy().await.unwrap();
1001
1002 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1004 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1005 let mut external_out = nodes.connect(out).await;
1006
1007 deployment.start().await.unwrap();
1008
1009 external_in_2.send(456).await.unwrap();
1010
1011 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1012 }
1013
1014 #[tokio::test]
1015 async fn multi_external_bytes() {
1016 let mut deployment = Deployment::new();
1017
1018 let flow = FlowBuilder::new();
1019 let first_node = flow.process::<()>();
1020 let external = flow.external::<()>();
1021
1022 let (in_port, input, _membership, complete_sink) = first_node
1023 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1024 let out = input.entries().send_bincode_external(&external);
1025 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
1026
1027 let nodes = flow
1028 .with_process(&first_node, deployment.Localhost())
1029 .with_external(&external, deployment.Localhost())
1030 .deploy(&mut deployment);
1031
1032 deployment.deploy().await.unwrap();
1033
1034 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1035 let mut external_in_2 = nodes.connect(in_port).await.1;
1036 let external_out = nodes.connect(out).await;
1037
1038 deployment.start().await.unwrap();
1039
1040 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1041 external_in_2.send(vec![4, 5].into()).await.unwrap();
1042
1043 assert_eq!(
1044 external_out.take(2).collect::<HashSet<_>>().await,
1045 vec![
1046 (0, (&[1u8, 2, 3] as &[u8]).into()),
1047 (1, (&[4u8, 5] as &[u8]).into())
1048 ]
1049 .into_iter()
1050 .collect()
1051 );
1052 }
1053
1054 #[tokio::test]
1055 async fn single_client_external_bytes() {
1056 let mut deployment = Deployment::new();
1057 let flow = FlowBuilder::new();
1058 let first_node = flow.process::<()>();
1059 let external = flow.external::<()>();
1060 let (port, input, complete_sink) = first_node
1061 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1062 complete_sink.complete(input.map(q!(|data| {
1063 let mut resp: Vec<u8> = data.into();
1064 resp.push(42);
1065 resp.into() })));
1067
1068 let nodes = flow
1069 .with_process(&first_node, deployment.Localhost())
1070 .with_external(&external, deployment.Localhost())
1071 .deploy(&mut deployment);
1072
1073 deployment.deploy().await.unwrap();
1074 deployment.start().await.unwrap();
1075
1076 let (mut external_out, mut external_in) = nodes.connect(port).await;
1077
1078 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1079 assert_eq!(
1080 external_out.next().await.unwrap().unwrap(),
1081 vec![1, 2, 3, 42]
1082 );
1083 }
1084
1085 #[tokio::test]
1086 async fn echo_external_bytes() {
1087 let mut deployment = Deployment::new();
1088
1089 let flow = FlowBuilder::new();
1090 let first_node = flow.process::<()>();
1091 let external = flow.external::<()>();
1092
1093 let (port, input, _membership, complete_sink) = first_node
1094 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1095 complete_sink
1096 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1097
1098 let nodes = flow
1099 .with_process(&first_node, deployment.Localhost())
1100 .with_external(&external, deployment.Localhost())
1101 .deploy(&mut deployment);
1102
1103 deployment.deploy().await.unwrap();
1104
1105 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1106 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1107
1108 deployment.start().await.unwrap();
1109
1110 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1111 external_in_2.send(vec![4, 5].into()).await.unwrap();
1112
1113 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1114 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1115 }
1116
1117 #[tokio::test]
1118 async fn echo_external_bincode() {
1119 let mut deployment = Deployment::new();
1120
1121 let flow = FlowBuilder::new();
1122 let first_node = flow.process::<()>();
1123 let external = flow.external::<()>();
1124
1125 let (port, input, _membership, complete_sink) =
1126 first_node.bidi_external_many_bincode(&external);
1127 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1128
1129 let nodes = flow
1130 .with_process(&first_node, deployment.Localhost())
1131 .with_external(&external, deployment.Localhost())
1132 .deploy(&mut deployment);
1133
1134 deployment.deploy().await.unwrap();
1135
1136 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1137 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1138
1139 deployment.start().await.unwrap();
1140
1141 external_in_1.send("hi".to_string()).await.unwrap();
1142 external_in_2.send("hello".to_string()).await.unwrap();
1143
1144 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1145 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1146 }
1147}