1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::cluster::ClusterIds;
40use crate::location::dynamic::LocationId;
41use crate::location::external_process::{
42 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
43};
44use crate::nondet::NonDet;
45use crate::staging_util::get_this_crate;
46
47pub mod dynamic;
48
49#[expect(missing_docs, reason = "TODO")]
50pub mod external_process;
51pub use external_process::External;
52
53#[expect(missing_docs, reason = "TODO")]
54pub mod process;
55pub use process::Process;
56
57#[expect(missing_docs, reason = "TODO")]
58pub mod cluster;
59pub use cluster::Cluster;
60
61#[expect(missing_docs, reason = "TODO")]
62pub mod member_id;
63pub use member_id::MemberId;
64
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72 Joined,
73 Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79 Auto,
80 TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89 private_bounds,
90 reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93 type Root: Location<'a>;
94
95 fn root(&self) -> Self::Root;
96
97 fn try_tick(&self) -> Option<Tick<Self>> {
98 if Self::is_top_level() {
99 let next_id = self.flow_state().borrow_mut().next_clock_id;
100 self.flow_state().borrow_mut().next_clock_id += 1;
101 Some(Tick {
102 id: next_id,
103 l: self.clone(),
104 })
105 } else {
106 None
107 }
108 }
109
110 fn id(&self) -> LocationId {
111 dynamic::DynLocation::id(self)
112 }
113
114 fn tick(&self) -> Tick<Self>
115 where
116 Self: NoTick,
117 {
118 let next_id = self.flow_state().borrow_mut().next_clock_id;
119 self.flow_state().borrow_mut().next_clock_id += 1;
120 Tick {
121 id: next_id,
122 l: self.clone(),
123 }
124 }
125
126 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127 where
128 Self: Sized + NoTick,
129 {
130 Stream::new(
131 self.clone(),
132 HydroNode::Source {
133 source: HydroSource::Spin(),
134 metadata: self.new_node_metadata(Stream::<
135 (),
136 Self,
137 Unbounded,
138 TotalOrder,
139 ExactlyOnce,
140 >::collection_kind()),
141 },
142 )
143 }
144
145 fn source_stream<T, E>(
146 &self,
147 e: impl QuotedWithContext<'a, E, Self>,
148 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149 where
150 E: FuturesStream<Item = T> + Unpin,
151 Self: Sized + NoTick,
152 {
153 let e = e.splice_untyped_ctx(self);
154
155 Stream::new(
156 self.clone(),
157 HydroNode::Source {
158 source: HydroSource::Stream(e.into()),
159 metadata: self.new_node_metadata(Stream::<
160 T,
161 Self,
162 Unbounded,
163 TotalOrder,
164 ExactlyOnce,
165 >::collection_kind()),
166 },
167 )
168 }
169
170 fn source_iter<T, E>(
171 &self,
172 e: impl QuotedWithContext<'a, E, Self>,
173 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174 where
175 E: IntoIterator<Item = T>,
176 Self: Sized + NoTick,
177 {
178 let e = e.splice_typed_ctx(self);
181
182 Stream::new(
183 self.clone(),
184 HydroNode::Source {
185 source: HydroSource::Iter(e.into()),
186 metadata: self.new_node_metadata(Stream::<
187 T,
188 Self,
189 Unbounded,
190 TotalOrder,
191 ExactlyOnce,
192 >::collection_kind()),
193 },
194 )
195 }
196
197 fn source_cluster_members<C: 'a>(
198 &self,
199 cluster: &Cluster<'a, C>,
200 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201 where
202 Self: Sized + NoTick,
203 {
204 let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
205 id: cluster.id,
206 _phantom: PhantomData,
207 };
208
209 self.source_iter(q!(underlying_memberids))
210 .map(q!(|id| (*id, MembershipEvent::Joined)))
211 .into_keyed()
212 }
213
214 fn source_external_bytes<L>(
215 &self,
216 from: &External<L>,
217 ) -> (
218 ExternalBytesPort,
219 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
220 )
221 where
222 Self: Sized + NoTick,
223 {
224 let (port, stream, sink) =
225 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
226
227 sink.complete(self.source_iter(q!([])));
228
229 (port, stream)
230 }
231
232 #[expect(clippy::type_complexity, reason = "stream markers")]
233 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
234 &self,
235 from: &External<L>,
236 ) -> (
237 ExternalBincodeSink<T, NotMany, O, R>,
238 Stream<T, Self, Unbounded, O, R>,
239 )
240 where
241 Self: Sized + NoTick,
242 T: Serialize + DeserializeOwned,
243 {
244 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
245 sink.complete(self.source_iter(q!([])));
246
247 (
248 ExternalBincodeSink {
249 process_id: from.id,
250 port_id: port.port_id,
251 _phantom: PhantomData,
252 },
253 stream.weaken_ordering().weaken_retries(),
254 )
255 }
256
257 #[expect(clippy::type_complexity, reason = "stream markers")]
300 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
301 &self,
302 from: &External<L>,
303 port_hint: NetworkHint,
304 ) -> (
305 ExternalBytesPort<NotMany>,
306 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
307 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
308 )
309 where
310 Self: Sized + NoTick,
311 {
312 let next_external_port_id = {
313 let mut flow_state = from.flow_state.borrow_mut();
314 let id = flow_state.next_external_out;
315 flow_state.next_external_out += 1;
316 id
317 };
318
319 let (fwd_ref, to_sink) =
320 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
321 let mut flow_state_borrow = self.flow_state().borrow_mut();
322
323 flow_state_borrow.push_root(HydroRoot::SendExternal {
324 to_external_id: from.id,
325 to_key: next_external_port_id,
326 to_many: false,
327 unpaired: false,
328 serialize_fn: None,
329 instantiate_fn: DebugInstantiate::Building,
330 input: Box::new(to_sink.ir_node.into_inner()),
331 op_metadata: HydroIrOpMetadata::new(),
332 });
333
334 let raw_stream: Stream<
335 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
336 Self,
337 Unbounded,
338 TotalOrder,
339 ExactlyOnce,
340 > = Stream::new(
341 self.clone(),
342 HydroNode::ExternalInput {
343 from_external_id: from.id,
344 from_key: next_external_port_id,
345 from_many: false,
346 codec_type: quote_type::<Codec>().into(),
347 port_hint,
348 instantiate_fn: DebugInstantiate::Building,
349 deserialize_fn: None,
350 metadata: self.new_node_metadata(Stream::<
351 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
352 Self,
353 Unbounded,
354 TotalOrder,
355 ExactlyOnce,
356 >::collection_kind()),
357 },
358 );
359
360 (
361 ExternalBytesPort {
362 process_id: from.id,
363 port_id: next_external_port_id,
364 _phantom: PhantomData,
365 },
366 raw_stream.flatten_ordered(),
367 fwd_ref,
368 )
369 }
370
371 #[expect(clippy::type_complexity, reason = "stream markers")]
372 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
373 &self,
374 from: &External<L>,
375 ) -> (
376 ExternalBincodeBidi<InT, OutT, NotMany>,
377 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
378 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
379 )
380 where
381 Self: Sized + NoTick,
382 {
383 let next_external_port_id = {
384 let mut flow_state = from.flow_state.borrow_mut();
385 let id = flow_state.next_external_out;
386 flow_state.next_external_out += 1;
387 id
388 };
389
390 let (fwd_ref, to_sink) =
391 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
392 let mut flow_state_borrow = self.flow_state().borrow_mut();
393
394 let root = get_this_crate();
395
396 let out_t_type = quote_type::<OutT>();
397 let ser_fn: syn::Expr = syn::parse_quote! {
398 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
399 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
400 )
401 };
402
403 flow_state_borrow.push_root(HydroRoot::SendExternal {
404 to_external_id: from.id,
405 to_key: next_external_port_id,
406 to_many: false,
407 unpaired: false,
408 serialize_fn: Some(ser_fn.into()),
409 instantiate_fn: DebugInstantiate::Building,
410 input: Box::new(to_sink.ir_node.into_inner()),
411 op_metadata: HydroIrOpMetadata::new(),
412 });
413
414 let in_t_type = quote_type::<InT>();
415
416 let deser_fn: syn::Expr = syn::parse_quote! {
417 |res| {
418 let b = res.unwrap();
419 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
420 }
421 };
422
423 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
424 self.clone(),
425 HydroNode::ExternalInput {
426 from_external_id: from.id,
427 from_key: next_external_port_id,
428 from_many: false,
429 codec_type: quote_type::<LengthDelimitedCodec>().into(),
430 port_hint: NetworkHint::Auto,
431 instantiate_fn: DebugInstantiate::Building,
432 deserialize_fn: Some(deser_fn.into()),
433 metadata: self.new_node_metadata(Stream::<
434 InT,
435 Self,
436 Unbounded,
437 TotalOrder,
438 ExactlyOnce,
439 >::collection_kind()),
440 },
441 );
442
443 (
444 ExternalBincodeBidi {
445 process_id: from.id,
446 port_id: next_external_port_id,
447 _phantom: PhantomData,
448 },
449 raw_stream,
450 fwd_ref,
451 )
452 }
453
454 #[expect(clippy::type_complexity, reason = "stream markers")]
455 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
456 &self,
457 from: &External<L>,
458 port_hint: NetworkHint,
459 ) -> (
460 ExternalBytesPort<Many>,
461 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
462 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
463 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
464 )
465 where
466 Self: Sized + NoTick,
467 {
468 let next_external_port_id = {
469 let mut flow_state = from.flow_state.borrow_mut();
470 let id = flow_state.next_external_out;
471 flow_state.next_external_out += 1;
472 id
473 };
474
475 let (fwd_ref, to_sink) =
476 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
477 let mut flow_state_borrow = self.flow_state().borrow_mut();
478
479 flow_state_borrow.push_root(HydroRoot::SendExternal {
480 to_external_id: from.id,
481 to_key: next_external_port_id,
482 to_many: true,
483 unpaired: false,
484 serialize_fn: None,
485 instantiate_fn: DebugInstantiate::Building,
486 input: Box::new(to_sink.entries().ir_node.into_inner()),
487 op_metadata: HydroIrOpMetadata::new(),
488 });
489
490 let raw_stream: Stream<
491 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
492 Self,
493 Unbounded,
494 TotalOrder,
495 ExactlyOnce,
496 > = Stream::new(
497 self.clone(),
498 HydroNode::ExternalInput {
499 from_external_id: from.id,
500 from_key: next_external_port_id,
501 from_many: true,
502 codec_type: quote_type::<Codec>().into(),
503 port_hint,
504 instantiate_fn: DebugInstantiate::Building,
505 deserialize_fn: None,
506 metadata: self.new_node_metadata(Stream::<
507 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
508 Self,
509 Unbounded,
510 TotalOrder,
511 ExactlyOnce,
512 >::collection_kind()),
513 },
514 );
515
516 let membership_stream_ident = syn::Ident::new(
517 &format!(
518 "__hydro_deploy_many_{}_{}_membership",
519 from.id, next_external_port_id
520 ),
521 Span::call_site(),
522 );
523 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
524 let raw_membership_stream: KeyedStream<
525 u64,
526 bool,
527 Self,
528 Unbounded,
529 TotalOrder,
530 ExactlyOnce,
531 > = KeyedStream::new(
532 self.clone(),
533 HydroNode::Source {
534 source: HydroSource::Stream(membership_stream_expr.into()),
535 metadata: self.new_node_metadata(KeyedStream::<
536 u64,
537 bool,
538 Self,
539 Unbounded,
540 TotalOrder,
541 ExactlyOnce,
542 >::collection_kind()),
543 },
544 );
545
546 (
547 ExternalBytesPort {
548 process_id: from.id,
549 port_id: next_external_port_id,
550 _phantom: PhantomData,
551 },
552 raw_stream
553 .flatten_ordered() .into_keyed(),
555 raw_membership_stream.map(q!(|join| {
556 if join {
557 MembershipEvent::Joined
558 } else {
559 MembershipEvent::Left
560 }
561 })),
562 fwd_ref,
563 )
564 }
565
566 #[expect(clippy::type_complexity, reason = "stream markers")]
567 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
568 &self,
569 from: &External<L>,
570 ) -> (
571 ExternalBincodeBidi<InT, OutT, Many>,
572 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
573 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
574 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
575 )
576 where
577 Self: Sized + NoTick,
578 {
579 let next_external_port_id = {
580 let mut flow_state = from.flow_state.borrow_mut();
581 let id = flow_state.next_external_out;
582 flow_state.next_external_out += 1;
583 id
584 };
585
586 let root = get_this_crate();
587
588 let (fwd_ref, to_sink) =
589 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
590 let mut flow_state_borrow = self.flow_state().borrow_mut();
591
592 let out_t_type = quote_type::<OutT>();
593 let ser_fn: syn::Expr = syn::parse_quote! {
594 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
595 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
596 )
597 };
598
599 flow_state_borrow.push_root(HydroRoot::SendExternal {
600 to_external_id: from.id,
601 to_key: next_external_port_id,
602 to_many: true,
603 unpaired: false,
604 serialize_fn: Some(ser_fn.into()),
605 instantiate_fn: DebugInstantiate::Building,
606 input: Box::new(to_sink.entries().ir_node.into_inner()),
607 op_metadata: HydroIrOpMetadata::new(),
608 });
609
610 let in_t_type = quote_type::<InT>();
611
612 let deser_fn: syn::Expr = syn::parse_quote! {
613 |res| {
614 let (id, b) = res.unwrap();
615 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
616 }
617 };
618
619 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
620 KeyedStream::new(
621 self.clone(),
622 HydroNode::ExternalInput {
623 from_external_id: from.id,
624 from_key: next_external_port_id,
625 from_many: true,
626 codec_type: quote_type::<LengthDelimitedCodec>().into(),
627 port_hint: NetworkHint::Auto,
628 instantiate_fn: DebugInstantiate::Building,
629 deserialize_fn: Some(deser_fn.into()),
630 metadata: self.new_node_metadata(KeyedStream::<
631 u64,
632 InT,
633 Self,
634 Unbounded,
635 TotalOrder,
636 ExactlyOnce,
637 >::collection_kind()),
638 },
639 );
640
641 let membership_stream_ident = syn::Ident::new(
642 &format!(
643 "__hydro_deploy_many_{}_{}_membership",
644 from.id, next_external_port_id
645 ),
646 Span::call_site(),
647 );
648 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
649 let raw_membership_stream: KeyedStream<
650 u64,
651 bool,
652 Self,
653 Unbounded,
654 TotalOrder,
655 ExactlyOnce,
656 > = KeyedStream::new(
657 self.clone(),
658 HydroNode::Source {
659 source: HydroSource::Stream(membership_stream_expr.into()),
660 metadata: self.new_node_metadata(KeyedStream::<
661 u64,
662 bool,
663 Self,
664 Unbounded,
665 TotalOrder,
666 ExactlyOnce,
667 >::collection_kind()),
668 },
669 );
670
671 (
672 ExternalBincodeBidi {
673 process_id: from.id,
674 port_id: next_external_port_id,
675 _phantom: PhantomData,
676 },
677 raw_stream,
678 raw_membership_stream.map(q!(|join| {
679 if join {
680 MembershipEvent::Joined
681 } else {
682 MembershipEvent::Left
683 }
684 })),
685 fwd_ref,
686 )
687 }
688
689 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
705 where
706 T: Clone,
707 Self: Sized,
708 {
709 let e = e.splice_untyped_ctx(self);
713
714 Singleton::new(
715 self.clone(),
716 HydroNode::SingletonSource {
717 value: e.into(),
718 metadata: self
719 .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
720 },
721 )
722 }
723
724 fn source_interval(
734 &self,
735 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
736 _nondet: NonDet,
737 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
738 where
739 Self: Sized + NoTick,
740 {
741 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
742 tokio::time::interval(interval)
743 )))
744 }
745
746 fn source_interval_delayed(
757 &self,
758 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
759 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
760 _nondet: NonDet,
761 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
762 where
763 Self: Sized + NoTick,
764 {
765 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
766 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
767 )))
768 }
769
770 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
771 where
772 S: CycleCollection<'a, ForwardRef, Location = Self>,
773 Self: NoTick,
774 {
775 let next_id = self.flow_state().borrow_mut().next_cycle_id();
776 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
777
778 (
779 ForwardHandle {
780 completed: false,
781 ident: ident.clone(),
782 expected_location: Location::id(self),
783 _phantom: PhantomData,
784 },
785 S::create_source(ident, self.clone()),
786 )
787 }
788}
789
790#[cfg(feature = "deploy")]
791#[cfg(test)]
792mod tests {
793 use std::collections::HashSet;
794
795 use futures::{SinkExt, StreamExt};
796 use hydro_deploy::Deployment;
797 use stageleft::q;
798 use tokio_util::codec::LengthDelimitedCodec;
799
800 use crate::compile::builder::FlowBuilder;
801 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
802 use crate::location::{Location, NetworkHint};
803 use crate::nondet::nondet;
804
805 #[tokio::test]
806 async fn top_level_singleton_replay_cardinality() {
807 let mut deployment = Deployment::new();
808
809 let flow = FlowBuilder::new();
810 let node = flow.process::<()>();
811 let external = flow.external::<()>();
812
813 let (in_port, input) =
814 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
815 let singleton = node.singleton(q!(123));
816 let tick = node.tick();
817 let out = input
818 .batch(&tick, nondet!())
819 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
820 .cross_singleton(
821 singleton
822 .snapshot(&tick, nondet!())
823 .into_stream()
824 .count(),
825 )
826 .all_ticks()
827 .send_bincode_external(&external);
828
829 let nodes = flow
830 .with_process(&node, deployment.Localhost())
831 .with_external(&external, deployment.Localhost())
832 .deploy(&mut deployment);
833
834 deployment.deploy().await.unwrap();
835
836 let mut external_in = nodes.connect(in_port).await;
837 let mut external_out = nodes.connect(out).await;
838
839 deployment.start().await.unwrap();
840
841 external_in.send(1).await.unwrap();
842 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
843
844 external_in.send(2).await.unwrap();
845 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
846 }
847
848 #[tokio::test]
849 async fn tick_singleton_replay_cardinality() {
850 let mut deployment = Deployment::new();
851
852 let flow = FlowBuilder::new();
853 let node = flow.process::<()>();
854 let external = flow.external::<()>();
855
856 let (in_port, input) =
857 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
858 let tick = node.tick();
859 let singleton = tick.singleton(q!(123));
860 let out = input
861 .batch(&tick, nondet!())
862 .cross_singleton(singleton.clone())
863 .cross_singleton(singleton.into_stream().count())
864 .all_ticks()
865 .send_bincode_external(&external);
866
867 let nodes = flow
868 .with_process(&node, deployment.Localhost())
869 .with_external(&external, deployment.Localhost())
870 .deploy(&mut deployment);
871
872 deployment.deploy().await.unwrap();
873
874 let mut external_in = nodes.connect(in_port).await;
875 let mut external_out = nodes.connect(out).await;
876
877 deployment.start().await.unwrap();
878
879 external_in.send(1).await.unwrap();
880 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
881
882 external_in.send(2).await.unwrap();
883 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
884 }
885
886 #[tokio::test]
887 async fn external_bytes() {
888 let mut deployment = Deployment::new();
889
890 let flow = FlowBuilder::new();
891 let first_node = flow.process::<()>();
892 let external = flow.external::<()>();
893
894 let (in_port, input) = first_node.source_external_bytes(&external);
895 let out = input.send_bincode_external(&external);
896
897 let nodes = flow
898 .with_process(&first_node, deployment.Localhost())
899 .with_external(&external, deployment.Localhost())
900 .deploy(&mut deployment);
901
902 deployment.deploy().await.unwrap();
903
904 let mut external_in = nodes.connect(in_port).await.1;
905 let mut external_out = nodes.connect(out).await;
906
907 deployment.start().await.unwrap();
908
909 external_in.send(vec![1, 2, 3].into()).await.unwrap();
910
911 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
912 }
913
914 #[tokio::test]
915 async fn multi_external_source() {
916 let mut deployment = Deployment::new();
917
918 let flow = FlowBuilder::new();
919 let first_node = flow.process::<()>();
920 let external = flow.external::<()>();
921
922 let (in_port, input, _membership, complete_sink) =
923 first_node.bidi_external_many_bincode(&external);
924 let out = input.entries().send_bincode_external(&external);
925 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
926
927 let nodes = flow
928 .with_process(&first_node, deployment.Localhost())
929 .with_external(&external, deployment.Localhost())
930 .deploy(&mut deployment);
931
932 deployment.deploy().await.unwrap();
933
934 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
935 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
936 let external_out = nodes.connect(out).await;
937
938 deployment.start().await.unwrap();
939
940 external_in_1.send(123).await.unwrap();
941 external_in_2.send(456).await.unwrap();
942
943 assert_eq!(
944 external_out.take(2).collect::<HashSet<_>>().await,
945 vec![(0, 123), (1, 456)].into_iter().collect()
946 );
947 }
948
949 #[tokio::test]
950 async fn second_connection_only_multi_source() {
951 let mut deployment = Deployment::new();
952
953 let flow = FlowBuilder::new();
954 let first_node = flow.process::<()>();
955 let external = flow.external::<()>();
956
957 let (in_port, input, _membership, complete_sink) =
958 first_node.bidi_external_many_bincode(&external);
959 let out = input.entries().send_bincode_external(&external);
960 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
961
962 let nodes = flow
963 .with_process(&first_node, deployment.Localhost())
964 .with_external(&external, deployment.Localhost())
965 .deploy(&mut deployment);
966
967 deployment.deploy().await.unwrap();
968
969 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
971 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
972 let mut external_out = nodes.connect(out).await;
973
974 deployment.start().await.unwrap();
975
976 external_in_2.send(456).await.unwrap();
977
978 assert_eq!(external_out.next().await.unwrap(), (1, 456));
979 }
980
981 #[tokio::test]
982 async fn multi_external_bytes() {
983 let mut deployment = Deployment::new();
984
985 let flow = FlowBuilder::new();
986 let first_node = flow.process::<()>();
987 let external = flow.external::<()>();
988
989 let (in_port, input, _membership, complete_sink) = first_node
990 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
991 let out = input.entries().send_bincode_external(&external);
992 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
993
994 let nodes = flow
995 .with_process(&first_node, deployment.Localhost())
996 .with_external(&external, deployment.Localhost())
997 .deploy(&mut deployment);
998
999 deployment.deploy().await.unwrap();
1000
1001 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1002 let mut external_in_2 = nodes.connect(in_port).await.1;
1003 let external_out = nodes.connect(out).await;
1004
1005 deployment.start().await.unwrap();
1006
1007 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1008 external_in_2.send(vec![4, 5].into()).await.unwrap();
1009
1010 assert_eq!(
1011 external_out.take(2).collect::<HashSet<_>>().await,
1012 vec![
1013 (0, (&[1u8, 2, 3] as &[u8]).into()),
1014 (1, (&[4u8, 5] as &[u8]).into())
1015 ]
1016 .into_iter()
1017 .collect()
1018 );
1019 }
1020
1021 #[tokio::test]
1022 async fn single_client_external_bytes() {
1023 let mut deployment = Deployment::new();
1024 let flow = FlowBuilder::new();
1025 let first_node = flow.process::<()>();
1026 let external = flow.external::<()>();
1027 let (port, input, complete_sink) = first_node
1028 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1029 complete_sink.complete(input.map(q!(|data| {
1030 let mut resp: Vec<u8> = data.into();
1031 resp.push(42);
1032 resp.into() })));
1034
1035 let nodes = flow
1036 .with_process(&first_node, deployment.Localhost())
1037 .with_external(&external, deployment.Localhost())
1038 .deploy(&mut deployment);
1039
1040 deployment.deploy().await.unwrap();
1041 deployment.start().await.unwrap();
1042
1043 let (mut external_out, mut external_in) = nodes.connect(port).await;
1044
1045 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1046 assert_eq!(
1047 external_out.next().await.unwrap().unwrap(),
1048 vec![1, 2, 3, 42]
1049 );
1050 }
1051
1052 #[tokio::test]
1053 async fn echo_external_bytes() {
1054 let mut deployment = Deployment::new();
1055
1056 let flow = FlowBuilder::new();
1057 let first_node = flow.process::<()>();
1058 let external = flow.external::<()>();
1059
1060 let (port, input, _membership, complete_sink) = first_node
1061 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1062 complete_sink
1063 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1064
1065 let nodes = flow
1066 .with_process(&first_node, deployment.Localhost())
1067 .with_external(&external, deployment.Localhost())
1068 .deploy(&mut deployment);
1069
1070 deployment.deploy().await.unwrap();
1071
1072 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1073 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1074
1075 deployment.start().await.unwrap();
1076
1077 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1078 external_in_2.send(vec![4, 5].into()).await.unwrap();
1079
1080 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1081 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1082 }
1083
1084 #[tokio::test]
1085 async fn echo_external_bincode() {
1086 let mut deployment = Deployment::new();
1087
1088 let flow = FlowBuilder::new();
1089 let first_node = flow.process::<()>();
1090 let external = flow.external::<()>();
1091
1092 let (port, input, _membership, complete_sink) =
1093 first_node.bidi_external_many_bincode(&external);
1094 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1095
1096 let nodes = flow
1097 .with_process(&first_node, deployment.Localhost())
1098 .with_external(&external, deployment.Localhost())
1099 .deploy(&mut deployment);
1100
1101 deployment.deploy().await.unwrap();
1102
1103 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1104 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1105
1106 deployment.start().await.unwrap();
1107
1108 external_in_1.send("hi".to_string()).await.unwrap();
1109 external_in_2.send("hello".to_string()).await.unwrap();
1110
1111 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1112 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1113 }
1114}