1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::{Bounded, Unbounded};
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::dynamic::LocationId;
40use crate::location::external_process::{
41 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
42};
43use crate::nondet::NonDet;
44#[cfg(feature = "sim")]
45use crate::sim::SimSender;
46use crate::staging_util::get_this_crate;
47
48pub mod dynamic;
49
50#[expect(missing_docs, reason = "TODO")]
51pub mod external_process;
52pub use external_process::External;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod process;
56pub use process::Process;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod cluster;
60pub use cluster::Cluster;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod member_id;
64pub use member_id::{MemberId, TaglessMemberId};
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72 Joined,
73 Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79 Auto,
80 TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89 private_bounds,
90 reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93 type Root: Location<'a>;
94
95 fn root(&self) -> Self::Root;
96
97 fn try_tick(&self) -> Option<Tick<Self>> {
98 if Self::is_top_level() {
99 let next_id = self.flow_state().borrow_mut().next_clock_id;
100 self.flow_state().borrow_mut().next_clock_id += 1;
101 Some(Tick {
102 id: next_id,
103 l: self.clone(),
104 })
105 } else {
106 None
107 }
108 }
109
110 fn id(&self) -> LocationId {
111 dynamic::DynLocation::id(self)
112 }
113
114 fn tick(&self) -> Tick<Self>
115 where
116 Self: NoTick,
117 {
118 let next_id = self.flow_state().borrow_mut().next_clock_id;
119 self.flow_state().borrow_mut().next_clock_id += 1;
120 Tick {
121 id: next_id,
122 l: self.clone(),
123 }
124 }
125
126 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127 where
128 Self: Sized + NoTick,
129 {
130 Stream::new(
131 self.clone(),
132 HydroNode::Source {
133 source: HydroSource::Spin(),
134 metadata: self.new_node_metadata(Stream::<
135 (),
136 Self,
137 Unbounded,
138 TotalOrder,
139 ExactlyOnce,
140 >::collection_kind()),
141 },
142 )
143 }
144
145 fn source_stream<T, E>(
146 &self,
147 e: impl QuotedWithContext<'a, E, Self>,
148 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149 where
150 E: FuturesStream<Item = T> + Unpin,
151 Self: Sized + NoTick,
152 {
153 let e = e.splice_untyped_ctx(self);
154
155 Stream::new(
156 self.clone(),
157 HydroNode::Source {
158 source: HydroSource::Stream(e.into()),
159 metadata: self.new_node_metadata(Stream::<
160 T,
161 Self,
162 Unbounded,
163 TotalOrder,
164 ExactlyOnce,
165 >::collection_kind()),
166 },
167 )
168 }
169
170 fn source_iter<T, E>(
171 &self,
172 e: impl QuotedWithContext<'a, E, Self>,
173 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
174 where
175 E: IntoIterator<Item = T>,
176 Self: Sized + NoTick,
177 {
178 let e = e.splice_typed_ctx(self);
179
180 Stream::new(
181 self.clone(),
182 HydroNode::Source {
183 source: HydroSource::Iter(e.into()),
184 metadata: self.new_node_metadata(
185 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
186 ),
187 },
188 )
189 }
190
191 fn source_cluster_members<C: 'a>(
192 &self,
193 cluster: &Cluster<'a, C>,
194 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
195 where
196 Self: Sized + NoTick,
197 {
198 Stream::new(
199 self.clone(),
200 HydroNode::Source {
201 source: HydroSource::ClusterMembers(cluster.id()),
202 metadata: self.new_node_metadata(Stream::<
203 (TaglessMemberId, MembershipEvent),
204 Self,
205 Unbounded,
206 TotalOrder,
207 ExactlyOnce,
208 >::collection_kind()),
209 },
210 )
211 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
212 .into_keyed()
213 }
214
215 fn source_external_bytes<L>(
216 &self,
217 from: &External<L>,
218 ) -> (
219 ExternalBytesPort,
220 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
221 )
222 where
223 Self: Sized + NoTick,
224 {
225 let (port, stream, sink) =
226 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
227
228 sink.complete(self.source_iter(q!([])));
229
230 (port, stream)
231 }
232
233 #[expect(clippy::type_complexity, reason = "stream markers")]
234 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
235 &self,
236 from: &External<L>,
237 ) -> (
238 ExternalBincodeSink<T, NotMany, O, R>,
239 Stream<T, Self, Unbounded, O, R>,
240 )
241 where
242 Self: Sized + NoTick,
243 T: Serialize + DeserializeOwned,
244 {
245 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
246 sink.complete(self.source_iter(q!([])));
247
248 (
249 ExternalBincodeSink {
250 process_id: from.id,
251 port_id: port.port_id,
252 _phantom: PhantomData,
253 },
254 stream.weaken_ordering().weaken_retries(),
255 )
256 }
257
258 #[cfg(feature = "sim")]
259 #[expect(clippy::type_complexity, reason = "stream markers")]
260 fn sim_input<T, O: Ordering, R: Retries>(
263 &self,
264 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
265 where
266 Self: Sized + NoTick,
267 T: Serialize + DeserializeOwned,
268 {
269 let external_location: External<'a, ()> = External {
270 id: 0,
271 flow_state: self.flow_state().clone(),
272 _phantom: PhantomData,
273 };
274
275 let (external, stream) = self.source_external_bincode(&external_location);
276
277 (SimSender(external.port_id, PhantomData), stream)
278 }
279
280 #[expect(clippy::type_complexity, reason = "stream markers")]
325 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
326 &self,
327 from: &External<L>,
328 port_hint: NetworkHint,
329 ) -> (
330 ExternalBytesPort<NotMany>,
331 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
332 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
333 )
334 where
335 Self: Sized + NoTick,
336 {
337 let next_external_port_id = from
338 .flow_state
339 .borrow_mut()
340 .next_external_port
341 .get_and_increment();
342
343 let (fwd_ref, to_sink) =
344 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
345 let mut flow_state_borrow = self.flow_state().borrow_mut();
346
347 flow_state_borrow.push_root(HydroRoot::SendExternal {
348 to_external_id: from.id,
349 to_port_id: next_external_port_id,
350 to_many: false,
351 unpaired: false,
352 serialize_fn: None,
353 instantiate_fn: DebugInstantiate::Building,
354 input: Box::new(to_sink.ir_node.into_inner()),
355 op_metadata: HydroIrOpMetadata::new(),
356 });
357
358 let raw_stream: Stream<
359 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
360 Self,
361 Unbounded,
362 TotalOrder,
363 ExactlyOnce,
364 > = Stream::new(
365 self.clone(),
366 HydroNode::ExternalInput {
367 from_external_id: from.id,
368 from_port_id: next_external_port_id,
369 from_many: false,
370 codec_type: quote_type::<Codec>().into(),
371 port_hint,
372 instantiate_fn: DebugInstantiate::Building,
373 deserialize_fn: None,
374 metadata: self.new_node_metadata(Stream::<
375 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
376 Self,
377 Unbounded,
378 TotalOrder,
379 ExactlyOnce,
380 >::collection_kind()),
381 },
382 );
383
384 (
385 ExternalBytesPort {
386 process_id: from.id,
387 port_id: next_external_port_id,
388 _phantom: PhantomData,
389 },
390 raw_stream.flatten_ordered(),
391 fwd_ref,
392 )
393 }
394
395 #[expect(clippy::type_complexity, reason = "stream markers")]
396 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
397 &self,
398 from: &External<L>,
399 ) -> (
400 ExternalBincodeBidi<InT, OutT, NotMany>,
401 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
402 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
403 )
404 where
405 Self: Sized + NoTick,
406 {
407 let next_external_port_id = from
408 .flow_state
409 .borrow_mut()
410 .next_external_port
411 .get_and_increment();
412
413 let (fwd_ref, to_sink) =
414 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
415 let mut flow_state_borrow = self.flow_state().borrow_mut();
416
417 let root = get_this_crate();
418
419 let out_t_type = quote_type::<OutT>();
420 let ser_fn: syn::Expr = syn::parse_quote! {
421 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
422 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
423 )
424 };
425
426 flow_state_borrow.push_root(HydroRoot::SendExternal {
427 to_external_id: from.id,
428 to_port_id: next_external_port_id,
429 to_many: false,
430 unpaired: false,
431 serialize_fn: Some(ser_fn.into()),
432 instantiate_fn: DebugInstantiate::Building,
433 input: Box::new(to_sink.ir_node.into_inner()),
434 op_metadata: HydroIrOpMetadata::new(),
435 });
436
437 let in_t_type = quote_type::<InT>();
438
439 let deser_fn: syn::Expr = syn::parse_quote! {
440 |res| {
441 let b = res.unwrap();
442 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
443 }
444 };
445
446 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
447 self.clone(),
448 HydroNode::ExternalInput {
449 from_external_id: from.id,
450 from_port_id: next_external_port_id,
451 from_many: false,
452 codec_type: quote_type::<LengthDelimitedCodec>().into(),
453 port_hint: NetworkHint::Auto,
454 instantiate_fn: DebugInstantiate::Building,
455 deserialize_fn: Some(deser_fn.into()),
456 metadata: self.new_node_metadata(Stream::<
457 InT,
458 Self,
459 Unbounded,
460 TotalOrder,
461 ExactlyOnce,
462 >::collection_kind()),
463 },
464 );
465
466 (
467 ExternalBincodeBidi {
468 process_id: from.id,
469 port_id: next_external_port_id,
470 _phantom: PhantomData,
471 },
472 raw_stream,
473 fwd_ref,
474 )
475 }
476
477 #[expect(clippy::type_complexity, reason = "stream markers")]
478 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
479 &self,
480 from: &External<L>,
481 port_hint: NetworkHint,
482 ) -> (
483 ExternalBytesPort<Many>,
484 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
485 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
486 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
487 )
488 where
489 Self: Sized + NoTick,
490 {
491 let next_external_port_id = from
492 .flow_state
493 .borrow_mut()
494 .next_external_port
495 .get_and_increment();
496
497 let (fwd_ref, to_sink) =
498 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
499 let mut flow_state_borrow = self.flow_state().borrow_mut();
500
501 flow_state_borrow.push_root(HydroRoot::SendExternal {
502 to_external_id: from.id,
503 to_port_id: next_external_port_id,
504 to_many: true,
505 unpaired: false,
506 serialize_fn: None,
507 instantiate_fn: DebugInstantiate::Building,
508 input: Box::new(to_sink.entries().ir_node.into_inner()),
509 op_metadata: HydroIrOpMetadata::new(),
510 });
511
512 let raw_stream: Stream<
513 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
514 Self,
515 Unbounded,
516 TotalOrder,
517 ExactlyOnce,
518 > = Stream::new(
519 self.clone(),
520 HydroNode::ExternalInput {
521 from_external_id: from.id,
522 from_port_id: next_external_port_id,
523 from_many: true,
524 codec_type: quote_type::<Codec>().into(),
525 port_hint,
526 instantiate_fn: DebugInstantiate::Building,
527 deserialize_fn: None,
528 metadata: self.new_node_metadata(Stream::<
529 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
530 Self,
531 Unbounded,
532 TotalOrder,
533 ExactlyOnce,
534 >::collection_kind()),
535 },
536 );
537
538 let membership_stream_ident = syn::Ident::new(
539 &format!(
540 "__hydro_deploy_many_{}_{}_membership",
541 from.id, next_external_port_id
542 ),
543 Span::call_site(),
544 );
545 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
546 let raw_membership_stream: KeyedStream<
547 u64,
548 bool,
549 Self,
550 Unbounded,
551 TotalOrder,
552 ExactlyOnce,
553 > = KeyedStream::new(
554 self.clone(),
555 HydroNode::Source {
556 source: HydroSource::Stream(membership_stream_expr.into()),
557 metadata: self.new_node_metadata(KeyedStream::<
558 u64,
559 bool,
560 Self,
561 Unbounded,
562 TotalOrder,
563 ExactlyOnce,
564 >::collection_kind()),
565 },
566 );
567
568 (
569 ExternalBytesPort {
570 process_id: from.id,
571 port_id: next_external_port_id,
572 _phantom: PhantomData,
573 },
574 raw_stream
575 .flatten_ordered() .into_keyed(),
577 raw_membership_stream.map(q!(|join| {
578 if join {
579 MembershipEvent::Joined
580 } else {
581 MembershipEvent::Left
582 }
583 })),
584 fwd_ref,
585 )
586 }
587
588 #[expect(clippy::type_complexity, reason = "stream markers")]
589 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
590 &self,
591 from: &External<L>,
592 ) -> (
593 ExternalBincodeBidi<InT, OutT, Many>,
594 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
595 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
596 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
597 )
598 where
599 Self: Sized + NoTick,
600 {
601 let next_external_port_id = from
602 .flow_state
603 .borrow_mut()
604 .next_external_port
605 .get_and_increment();
606
607 let (fwd_ref, to_sink) =
608 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
609 let mut flow_state_borrow = self.flow_state().borrow_mut();
610
611 let root = get_this_crate();
612
613 let out_t_type = quote_type::<OutT>();
614 let ser_fn: syn::Expr = syn::parse_quote! {
615 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
616 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
617 )
618 };
619
620 flow_state_borrow.push_root(HydroRoot::SendExternal {
621 to_external_id: from.id,
622 to_port_id: next_external_port_id,
623 to_many: true,
624 unpaired: false,
625 serialize_fn: Some(ser_fn.into()),
626 instantiate_fn: DebugInstantiate::Building,
627 input: Box::new(to_sink.entries().ir_node.into_inner()),
628 op_metadata: HydroIrOpMetadata::new(),
629 });
630
631 let in_t_type = quote_type::<InT>();
632
633 let deser_fn: syn::Expr = syn::parse_quote! {
634 |res| {
635 let (id, b) = res.unwrap();
636 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
637 }
638 };
639
640 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
641 KeyedStream::new(
642 self.clone(),
643 HydroNode::ExternalInput {
644 from_external_id: from.id,
645 from_port_id: next_external_port_id,
646 from_many: true,
647 codec_type: quote_type::<LengthDelimitedCodec>().into(),
648 port_hint: NetworkHint::Auto,
649 instantiate_fn: DebugInstantiate::Building,
650 deserialize_fn: Some(deser_fn.into()),
651 metadata: self.new_node_metadata(KeyedStream::<
652 u64,
653 InT,
654 Self,
655 Unbounded,
656 TotalOrder,
657 ExactlyOnce,
658 >::collection_kind()),
659 },
660 );
661
662 let membership_stream_ident = syn::Ident::new(
663 &format!(
664 "__hydro_deploy_many_{}_{}_membership",
665 from.id, next_external_port_id
666 ),
667 Span::call_site(),
668 );
669 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
670 let raw_membership_stream: KeyedStream<
671 u64,
672 bool,
673 Self,
674 Unbounded,
675 TotalOrder,
676 ExactlyOnce,
677 > = KeyedStream::new(
678 self.clone(),
679 HydroNode::Source {
680 source: HydroSource::Stream(membership_stream_expr.into()),
681 metadata: self.new_node_metadata(KeyedStream::<
682 u64,
683 bool,
684 Self,
685 Unbounded,
686 TotalOrder,
687 ExactlyOnce,
688 >::collection_kind()),
689 },
690 );
691
692 (
693 ExternalBincodeBidi {
694 process_id: from.id,
695 port_id: next_external_port_id,
696 _phantom: PhantomData,
697 },
698 raw_stream,
699 raw_membership_stream.map(q!(|join| {
700 if join {
701 MembershipEvent::Joined
702 } else {
703 MembershipEvent::Left
704 }
705 })),
706 fwd_ref,
707 )
708 }
709
710 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
728 where
729 T: Clone,
730 Self: Sized,
731 {
732 let e = e.splice_untyped_ctx(self);
733
734 Singleton::new(
735 self.clone(),
736 HydroNode::SingletonSource {
737 value: e.into(),
738 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
739 },
740 )
741 }
742
743 fn source_interval(
753 &self,
754 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
755 _nondet: NonDet,
756 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
757 where
758 Self: Sized + NoTick,
759 {
760 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
761 tokio::time::interval(interval)
762 )))
763 }
764
765 fn source_interval_delayed(
776 &self,
777 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
778 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
779 _nondet: NonDet,
780 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
781 where
782 Self: Sized + NoTick,
783 {
784 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
785 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
786 )))
787 }
788
789 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
790 where
791 S: CycleCollection<'a, ForwardRef, Location = Self>,
792 {
793 let next_id = self.flow_state().borrow_mut().next_cycle_id();
794 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
795
796 (
797 ForwardHandle {
798 completed: false,
799 ident: ident.clone(),
800 expected_location: Location::id(self),
801 _phantom: PhantomData,
802 },
803 S::create_source(ident, self.clone()),
804 )
805 }
806}
807
808#[cfg(feature = "deploy")]
809#[cfg(test)]
810mod tests {
811 use std::collections::HashSet;
812
813 use futures::{SinkExt, StreamExt};
814 use hydro_deploy::Deployment;
815 use stageleft::q;
816 use tokio_util::codec::LengthDelimitedCodec;
817
818 use crate::compile::builder::FlowBuilder;
819 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
820 use crate::location::{Location, NetworkHint};
821 use crate::nondet::nondet;
822
823 #[tokio::test]
824 async fn top_level_singleton_replay_cardinality() {
825 let mut deployment = Deployment::new();
826
827 let flow = FlowBuilder::new();
828 let node = flow.process::<()>();
829 let external = flow.external::<()>();
830
831 let (in_port, input) =
832 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
833 let singleton = node.singleton(q!(123));
834 let tick = node.tick();
835 let out = input
836 .batch(&tick, nondet!())
837 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
838 .cross_singleton(
839 singleton
840 .snapshot(&tick, nondet!())
841 .into_stream()
842 .count(),
843 )
844 .all_ticks()
845 .send_bincode_external(&external);
846
847 let nodes = flow
848 .with_process(&node, deployment.Localhost())
849 .with_external(&external, deployment.Localhost())
850 .deploy(&mut deployment);
851
852 deployment.deploy().await.unwrap();
853
854 let mut external_in = nodes.connect(in_port).await;
855 let mut external_out = nodes.connect(out).await;
856
857 deployment.start().await.unwrap();
858
859 external_in.send(1).await.unwrap();
860 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
861
862 external_in.send(2).await.unwrap();
863 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
864 }
865
866 #[tokio::test]
867 async fn tick_singleton_replay_cardinality() {
868 let mut deployment = Deployment::new();
869
870 let flow = FlowBuilder::new();
871 let node = flow.process::<()>();
872 let external = flow.external::<()>();
873
874 let (in_port, input) =
875 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
876 let tick = node.tick();
877 let singleton = tick.singleton(q!(123));
878 let out = input
879 .batch(&tick, nondet!())
880 .cross_singleton(singleton.clone())
881 .cross_singleton(singleton.into_stream().count())
882 .all_ticks()
883 .send_bincode_external(&external);
884
885 let nodes = flow
886 .with_process(&node, deployment.Localhost())
887 .with_external(&external, deployment.Localhost())
888 .deploy(&mut deployment);
889
890 deployment.deploy().await.unwrap();
891
892 let mut external_in = nodes.connect(in_port).await;
893 let mut external_out = nodes.connect(out).await;
894
895 deployment.start().await.unwrap();
896
897 external_in.send(1).await.unwrap();
898 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
899
900 external_in.send(2).await.unwrap();
901 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
902 }
903
904 #[tokio::test]
905 async fn external_bytes() {
906 let mut deployment = Deployment::new();
907
908 let flow = FlowBuilder::new();
909 let first_node = flow.process::<()>();
910 let external = flow.external::<()>();
911
912 let (in_port, input) = first_node.source_external_bytes(&external);
913 let out = input.send_bincode_external(&external);
914
915 let nodes = flow
916 .with_process(&first_node, deployment.Localhost())
917 .with_external(&external, deployment.Localhost())
918 .deploy(&mut deployment);
919
920 deployment.deploy().await.unwrap();
921
922 let mut external_in = nodes.connect(in_port).await.1;
923 let mut external_out = nodes.connect(out).await;
924
925 deployment.start().await.unwrap();
926
927 external_in.send(vec![1, 2, 3].into()).await.unwrap();
928
929 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
930 }
931
932 #[tokio::test]
933 async fn multi_external_source() {
934 let mut deployment = Deployment::new();
935
936 let flow = FlowBuilder::new();
937 let first_node = flow.process::<()>();
938 let external = flow.external::<()>();
939
940 let (in_port, input, _membership, complete_sink) =
941 first_node.bidi_external_many_bincode(&external);
942 let out = input.entries().send_bincode_external(&external);
943 complete_sink.complete(
944 first_node
945 .source_iter::<(u64, ()), _>(q!([]))
946 .into_keyed()
947 .weakest_ordering(),
948 );
949
950 let nodes = flow
951 .with_process(&first_node, deployment.Localhost())
952 .with_external(&external, deployment.Localhost())
953 .deploy(&mut deployment);
954
955 deployment.deploy().await.unwrap();
956
957 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
958 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
959 let external_out = nodes.connect(out).await;
960
961 deployment.start().await.unwrap();
962
963 external_in_1.send(123).await.unwrap();
964 external_in_2.send(456).await.unwrap();
965
966 assert_eq!(
967 external_out.take(2).collect::<HashSet<_>>().await,
968 vec![(0, 123), (1, 456)].into_iter().collect()
969 );
970 }
971
972 #[tokio::test]
973 async fn second_connection_only_multi_source() {
974 let mut deployment = Deployment::new();
975
976 let flow = FlowBuilder::new();
977 let first_node = flow.process::<()>();
978 let external = flow.external::<()>();
979
980 let (in_port, input, _membership, complete_sink) =
981 first_node.bidi_external_many_bincode(&external);
982 let out = input.entries().send_bincode_external(&external);
983 complete_sink.complete(
984 first_node
985 .source_iter::<(u64, ()), _>(q!([]))
986 .into_keyed()
987 .weakest_ordering(),
988 );
989
990 let nodes = flow
991 .with_process(&first_node, deployment.Localhost())
992 .with_external(&external, deployment.Localhost())
993 .deploy(&mut deployment);
994
995 deployment.deploy().await.unwrap();
996
997 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
999 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1000 let mut external_out = nodes.connect(out).await;
1001
1002 deployment.start().await.unwrap();
1003
1004 external_in_2.send(456).await.unwrap();
1005
1006 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1007 }
1008
1009 #[tokio::test]
1010 async fn multi_external_bytes() {
1011 let mut deployment = Deployment::new();
1012
1013 let flow = FlowBuilder::new();
1014 let first_node = flow.process::<()>();
1015 let external = flow.external::<()>();
1016
1017 let (in_port, input, _membership, complete_sink) = first_node
1018 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1019 let out = input.entries().send_bincode_external(&external);
1020 complete_sink.complete(
1021 first_node
1022 .source_iter(q!([]))
1023 .into_keyed()
1024 .weakest_ordering(),
1025 );
1026
1027 let nodes = flow
1028 .with_process(&first_node, deployment.Localhost())
1029 .with_external(&external, deployment.Localhost())
1030 .deploy(&mut deployment);
1031
1032 deployment.deploy().await.unwrap();
1033
1034 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1035 let mut external_in_2 = nodes.connect(in_port).await.1;
1036 let external_out = nodes.connect(out).await;
1037
1038 deployment.start().await.unwrap();
1039
1040 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1041 external_in_2.send(vec![4, 5].into()).await.unwrap();
1042
1043 assert_eq!(
1044 external_out.take(2).collect::<HashSet<_>>().await,
1045 vec![
1046 (0, (&[1u8, 2, 3] as &[u8]).into()),
1047 (1, (&[4u8, 5] as &[u8]).into())
1048 ]
1049 .into_iter()
1050 .collect()
1051 );
1052 }
1053
1054 #[tokio::test]
1055 async fn single_client_external_bytes() {
1056 let mut deployment = Deployment::new();
1057 let flow = FlowBuilder::new();
1058 let first_node = flow.process::<()>();
1059 let external = flow.external::<()>();
1060 let (port, input, complete_sink) = first_node
1061 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1062 complete_sink.complete(input.map(q!(|data| {
1063 let mut resp: Vec<u8> = data.into();
1064 resp.push(42);
1065 resp.into() })));
1067
1068 let nodes = flow
1069 .with_process(&first_node, deployment.Localhost())
1070 .with_external(&external, deployment.Localhost())
1071 .deploy(&mut deployment);
1072
1073 deployment.deploy().await.unwrap();
1074 deployment.start().await.unwrap();
1075
1076 let (mut external_out, mut external_in) = nodes.connect(port).await;
1077
1078 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1079 assert_eq!(
1080 external_out.next().await.unwrap().unwrap(),
1081 vec![1, 2, 3, 42]
1082 );
1083 }
1084
1085 #[tokio::test]
1086 async fn echo_external_bytes() {
1087 let mut deployment = Deployment::new();
1088
1089 let flow = FlowBuilder::new();
1090 let first_node = flow.process::<()>();
1091 let external = flow.external::<()>();
1092
1093 let (port, input, _membership, complete_sink) = first_node
1094 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1095 complete_sink
1096 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1097
1098 let nodes = flow
1099 .with_process(&first_node, deployment.Localhost())
1100 .with_external(&external, deployment.Localhost())
1101 .deploy(&mut deployment);
1102
1103 deployment.deploy().await.unwrap();
1104
1105 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1106 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1107
1108 deployment.start().await.unwrap();
1109
1110 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1111 external_in_2.send(vec![4, 5].into()).await.unwrap();
1112
1113 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1114 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1115 }
1116
1117 #[tokio::test]
1118 async fn echo_external_bincode() {
1119 let mut deployment = Deployment::new();
1120
1121 let flow = FlowBuilder::new();
1122 let first_node = flow.process::<()>();
1123 let external = flow.external::<()>();
1124
1125 let (port, input, _membership, complete_sink) =
1126 first_node.bidi_external_many_bincode(&external);
1127 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1128
1129 let nodes = flow
1130 .with_process(&first_node, deployment.Localhost())
1131 .with_external(&external, deployment.Localhost())
1132 .deploy(&mut deployment);
1133
1134 deployment.deploy().await.unwrap();
1135
1136 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1137 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1138
1139 deployment.start().await.unwrap();
1140
1141 external_in_1.send("hi".to_string()).await.unwrap();
1142 external_in_2.send("hello".to_string()).await.unwrap();
1143
1144 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1145 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1146 }
1147}