1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::BytesMut;
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{
37 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
38};
39use crate::location::cluster::ClusterIds;
40use crate::location::dynamic::LocationId;
41use crate::location::external_process::{
42 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
43};
44use crate::nondet::NonDet;
45use crate::staging_util::get_this_crate;
46
47pub mod dynamic;
48
49#[expect(missing_docs, reason = "TODO")]
50pub mod external_process;
51pub use external_process::External;
52
53#[expect(missing_docs, reason = "TODO")]
54pub mod process;
55pub use process::Process;
56
57#[expect(missing_docs, reason = "TODO")]
58pub mod cluster;
59pub use cluster::Cluster;
60
61#[expect(missing_docs, reason = "TODO")]
62pub mod member_id;
63pub use member_id::MemberId;
64
65#[expect(missing_docs, reason = "TODO")]
66pub mod tick;
67pub use tick::{Atomic, NoTick, Tick};
68
69#[expect(missing_docs, reason = "TODO")]
70#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
71pub enum MembershipEvent {
72 Joined,
73 Left,
74}
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum NetworkHint {
79 Auto,
80 TcpPort(Option<u16>),
81}
82
83pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
84 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
85}
86
87#[expect(missing_docs, reason = "TODO")]
88#[expect(
89 private_bounds,
90 reason = "only internal Hydro code can define location types"
91)]
92pub trait Location<'a>: dynamic::DynLocation {
93 type Root: Location<'a>;
94
95 fn root(&self) -> Self::Root;
96
97 fn try_tick(&self) -> Option<Tick<Self>> {
98 if Self::is_top_level() {
99 let next_id = self.flow_state().borrow_mut().next_clock_id;
100 self.flow_state().borrow_mut().next_clock_id += 1;
101 Some(Tick {
102 id: next_id,
103 l: self.clone(),
104 })
105 } else {
106 None
107 }
108 }
109
110 fn id(&self) -> LocationId {
111 dynamic::DynLocation::id(self)
112 }
113
114 fn tick(&self) -> Tick<Self>
115 where
116 Self: NoTick,
117 {
118 let next_id = self.flow_state().borrow_mut().next_clock_id;
119 self.flow_state().borrow_mut().next_clock_id += 1;
120 Tick {
121 id: next_id,
122 l: self.clone(),
123 }
124 }
125
126 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
127 where
128 Self: Sized + NoTick,
129 {
130 Stream::new(
131 self.clone(),
132 HydroNode::Source {
133 source: HydroSource::Spin(),
134 metadata: self.new_node_metadata(Stream::<
135 (),
136 Self,
137 Unbounded,
138 TotalOrder,
139 ExactlyOnce,
140 >::collection_kind()),
141 },
142 )
143 }
144
145 fn source_stream<T, E>(
146 &self,
147 e: impl QuotedWithContext<'a, E, Self>,
148 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
149 where
150 E: FuturesStream<Item = T> + Unpin,
151 Self: Sized + NoTick,
152 {
153 let e = e.splice_untyped_ctx(self);
154
155 Stream::new(
156 self.clone(),
157 HydroNode::Source {
158 source: HydroSource::Stream(e.into()),
159 metadata: self.new_node_metadata(Stream::<
160 T,
161 Self,
162 Unbounded,
163 TotalOrder,
164 ExactlyOnce,
165 >::collection_kind()),
166 },
167 )
168 }
169
170 fn source_iter<T, E>(
171 &self,
172 e: impl QuotedWithContext<'a, E, Self>,
173 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
174 where
175 E: IntoIterator<Item = T>,
176 Self: Sized + NoTick,
177 {
178 let e = e.splice_untyped_ctx(self);
181
182 Stream::new(
183 self.clone(),
184 HydroNode::Source {
185 source: HydroSource::Iter(e.into()),
186 metadata: self.new_node_metadata(Stream::<
187 T,
188 Self,
189 Unbounded,
190 TotalOrder,
191 ExactlyOnce,
192 >::collection_kind()),
193 },
194 )
195 }
196
197 fn source_cluster_members<C: 'a>(
198 &self,
199 cluster: &Cluster<'a, C>,
200 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
201 where
202 Self: Sized + NoTick,
203 {
204 let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
205 id: cluster.id,
206 _phantom: PhantomData,
207 };
208
209 self.source_iter(q!(underlying_memberids))
210 .map(q!(|id| (*id, MembershipEvent::Joined)))
211 .into_keyed()
212 }
213
214 fn source_external_bytes<L>(
215 &self,
216 from: &External<L>,
217 ) -> (
218 ExternalBytesPort,
219 Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
220 )
221 where
222 Self: Sized + NoTick,
223 {
224 let next_external_port_id = {
225 let mut flow_state = from.flow_state.borrow_mut();
226 let id = flow_state.next_external_out;
227 flow_state.next_external_out += 1;
228 id
229 };
230
231 (
232 ExternalBytesPort {
233 process_id: from.id,
234 port_id: next_external_port_id,
235 _phantom: Default::default(),
236 },
237 Stream::new(
238 self.clone(),
239 HydroNode::ExternalInput {
240 from_external_id: from.id,
241 from_key: next_external_port_id,
242 from_many: false,
243 codec_type: quote_type::<LengthDelimitedCodec>().into(),
244 port_hint: NetworkHint::Auto,
245 instantiate_fn: DebugInstantiate::Building,
246 deserialize_fn: None,
247 metadata: self.new_node_metadata(Stream::<
248 std::io::Result<BytesMut>,
249 Self,
250 Unbounded,
251 TotalOrder,
252 ExactlyOnce,
253 >::collection_kind()),
254 },
255 ),
256 )
257 }
258
259 #[expect(clippy::type_complexity, reason = "stream markers")]
260 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
261 &self,
262 from: &External<L>,
263 ) -> (
264 ExternalBincodeSink<T, NotMany, O, R>,
265 Stream<T, Self, Unbounded, O, R>,
266 )
267 where
268 Self: Sized + NoTick,
269 T: Serialize + DeserializeOwned,
270 {
271 let next_external_port_id = {
272 let mut flow_state = from.flow_state.borrow_mut();
273 let id = flow_state.next_external_out;
274 flow_state.next_external_out += 1;
275 id
276 };
277
278 (
279 ExternalBincodeSink {
280 process_id: from.id,
281 port_id: next_external_port_id,
282 _phantom: PhantomData,
283 },
284 Stream::new(
285 self.clone(),
286 HydroNode::ExternalInput {
287 from_external_id: from.id,
288 from_key: next_external_port_id,
289 from_many: false,
290 codec_type: quote_type::<LengthDelimitedCodec>().into(),
291 port_hint: NetworkHint::Auto,
292 instantiate_fn: DebugInstantiate::Building,
293 deserialize_fn: Some(
294 crate::live_collections::stream::networking::deserialize_bincode::<T>(None)
295 .into(),
296 ),
297 metadata: self
298 .new_node_metadata(Stream::<T, Self, Unbounded, O, R>::collection_kind()),
299 },
300 ),
301 )
302 }
303
304 #[expect(clippy::type_complexity, reason = "stream markers")]
305 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
306 &self,
307 from: &External<L>,
308 port_hint: NetworkHint,
309 ) -> (
310 ExternalBytesPort<Many>,
311 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
312 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
313 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
314 )
315 where
316 Self: Sized + NoTick,
317 {
318 let next_external_port_id = {
319 let mut flow_state = from.flow_state.borrow_mut();
320 let id = flow_state.next_external_out;
321 flow_state.next_external_out += 1;
322 id
323 };
324
325 let (fwd_ref, to_sink) =
326 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
327 let mut flow_state_borrow = self.flow_state().borrow_mut();
328
329 flow_state_borrow.push_root(HydroRoot::SendExternal {
330 to_external_id: from.id,
331 to_key: next_external_port_id,
332 to_many: true,
333 serialize_fn: None,
334 instantiate_fn: DebugInstantiate::Building,
335 input: Box::new(to_sink.entries().ir_node.into_inner()),
336 op_metadata: HydroIrOpMetadata::new(),
337 });
338
339 let raw_stream: Stream<
340 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
341 Self,
342 Unbounded,
343 TotalOrder,
344 ExactlyOnce,
345 > = Stream::new(
346 self.clone(),
347 HydroNode::ExternalInput {
348 from_external_id: from.id,
349 from_key: next_external_port_id,
350 from_many: true,
351 codec_type: quote_type::<Codec>().into(),
352 port_hint,
353 instantiate_fn: DebugInstantiate::Building,
354 deserialize_fn: None,
355 metadata: self.new_node_metadata(Stream::<
356 std::io::Result<(u64, <Codec as Decoder>::Item)>,
357 Self,
358 Unbounded,
359 TotalOrder,
360 ExactlyOnce,
361 >::collection_kind()),
362 },
363 );
364
365 let membership_stream_ident = syn::Ident::new(
366 &format!(
367 "__hydro_deploy_many_{}_{}_membership",
368 from.id, next_external_port_id
369 ),
370 Span::call_site(),
371 );
372 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
373 let raw_membership_stream: KeyedStream<
374 u64,
375 bool,
376 Self,
377 Unbounded,
378 TotalOrder,
379 ExactlyOnce,
380 > = KeyedStream::new(
381 self.clone(),
382 HydroNode::Source {
383 source: HydroSource::Stream(membership_stream_expr.into()),
384 metadata: self.new_node_metadata(KeyedStream::<
385 u64,
386 bool,
387 Self,
388 Unbounded,
389 TotalOrder,
390 ExactlyOnce,
391 >::collection_kind()),
392 },
393 );
394
395 (
396 ExternalBytesPort {
397 process_id: from.id,
398 port_id: next_external_port_id,
399 _phantom: PhantomData,
400 },
401 raw_stream
402 .flatten_ordered() .into_keyed(),
404 raw_membership_stream.map(q!(|join| {
405 if join {
406 MembershipEvent::Joined
407 } else {
408 MembershipEvent::Left
409 }
410 })),
411 fwd_ref,
412 )
413 }
414
415 #[expect(clippy::type_complexity, reason = "stream markers")]
416 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
417 &self,
418 from: &External<L>,
419 ) -> (
420 ExternalBincodeBidi<InT, OutT, Many>,
421 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
422 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
423 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
424 )
425 where
426 Self: Sized + NoTick,
427 {
428 let next_external_port_id = {
429 let mut flow_state = from.flow_state.borrow_mut();
430 let id = flow_state.next_external_out;
431 flow_state.next_external_out += 1;
432 id
433 };
434
435 let root = get_this_crate();
436
437 let (fwd_ref, to_sink) =
438 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
439 let mut flow_state_borrow = self.flow_state().borrow_mut();
440
441 let out_t_type = quote_type::<OutT>();
442 let ser_fn: syn::Expr = syn::parse_quote! {
443 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
444 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
445 )
446 };
447
448 flow_state_borrow.push_root(HydroRoot::SendExternal {
449 to_external_id: from.id,
450 to_key: next_external_port_id,
451 to_many: true,
452 serialize_fn: Some(ser_fn.into()),
453 instantiate_fn: DebugInstantiate::Building,
454 input: Box::new(to_sink.entries().ir_node.into_inner()),
455 op_metadata: HydroIrOpMetadata::new(),
456 });
457
458 let in_t_type = quote_type::<InT>();
459
460 let deser_fn: syn::Expr = syn::parse_quote! {
461 |res| {
462 let (id, b) = res.unwrap();
463 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
464 }
465 };
466
467 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
468 KeyedStream::new(
469 self.clone(),
470 HydroNode::ExternalInput {
471 from_external_id: from.id,
472 from_key: next_external_port_id,
473 from_many: true,
474 codec_type: quote_type::<LengthDelimitedCodec>().into(),
475 port_hint: NetworkHint::Auto,
476 instantiate_fn: DebugInstantiate::Building,
477 deserialize_fn: Some(deser_fn.into()),
478 metadata: self.new_node_metadata(KeyedStream::<
479 u64,
480 InT,
481 Self,
482 Unbounded,
483 TotalOrder,
484 ExactlyOnce,
485 >::collection_kind()),
486 },
487 );
488
489 let membership_stream_ident = syn::Ident::new(
490 &format!(
491 "__hydro_deploy_many_{}_{}_membership",
492 from.id, next_external_port_id
493 ),
494 Span::call_site(),
495 );
496 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
497 let raw_membership_stream: KeyedStream<
498 u64,
499 bool,
500 Self,
501 Unbounded,
502 TotalOrder,
503 ExactlyOnce,
504 > = KeyedStream::new(
505 self.clone(),
506 HydroNode::Source {
507 source: HydroSource::Stream(membership_stream_expr.into()),
508 metadata: self.new_node_metadata(KeyedStream::<
509 u64,
510 bool,
511 Self,
512 Unbounded,
513 TotalOrder,
514 ExactlyOnce,
515 >::collection_kind()),
516 },
517 );
518
519 (
520 ExternalBincodeBidi {
521 process_id: from.id,
522 port_id: next_external_port_id,
523 _phantom: PhantomData,
524 },
525 raw_stream,
526 raw_membership_stream.map(q!(|join| {
527 if join {
528 MembershipEvent::Joined
529 } else {
530 MembershipEvent::Left
531 }
532 })),
533 fwd_ref,
534 )
535 }
536
537 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
553 where
554 T: Clone,
555 Self: Sized,
556 {
557 let e_arr = q!([e]);
561 let e = e_arr.splice_untyped_ctx(self);
562
563 if Self::is_top_level() {
564 Singleton::new(
565 self.clone(),
566 HydroNode::Persist {
567 inner: Box::new(HydroNode::Source {
568 source: HydroSource::Iter(e.into()),
569 metadata: self.new_node_metadata(Stream::<
570 T,
571 Self,
572 Unbounded,
573 TotalOrder,
574 ExactlyOnce,
575 >::collection_kind(
576 )),
577 }),
578 metadata: self
579 .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
580 },
581 )
582 } else {
583 Singleton::new(
584 self.clone(),
585 HydroNode::Source {
586 source: HydroSource::Iter(e.into()),
587 metadata: self
588 .new_node_metadata(Singleton::<T, Self, Unbounded>::collection_kind()),
589 },
590 )
591 }
592 }
593
594 fn source_interval(
604 &self,
605 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
606 _nondet: NonDet,
607 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
608 where
609 Self: Sized + NoTick,
610 {
611 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
612 tokio::time::interval(interval)
613 )))
614 }
615
616 fn source_interval_delayed(
627 &self,
628 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
629 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
630 _nondet: NonDet,
631 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
632 where
633 Self: Sized + NoTick,
634 {
635 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
636 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
637 )))
638 }
639
640 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
641 where
642 S: CycleCollection<'a, ForwardRef, Location = Self>,
643 Self: NoTick,
644 {
645 let next_id = self.flow_state().borrow_mut().next_cycle_id();
646 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
647
648 (
649 ForwardHandle {
650 completed: false,
651 ident: ident.clone(),
652 expected_location: Location::id(self),
653 _phantom: PhantomData,
654 },
655 S::create_source(ident, self.clone()),
656 )
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use std::collections::HashSet;
663
664 use futures::{SinkExt, StreamExt};
665 use hydro_deploy::Deployment;
666 use stageleft::q;
667 use tokio_util::codec::LengthDelimitedCodec;
668
669 use crate::compile::builder::FlowBuilder;
670 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
671 use crate::location::{Location, NetworkHint};
672 use crate::nondet::nondet;
673
674 #[tokio::test]
675 async fn top_level_singleton_replay_cardinality() {
676 let mut deployment = Deployment::new();
677
678 let flow = FlowBuilder::new();
679 let node = flow.process::<()>();
680 let external = flow.external::<()>();
681
682 let (in_port, input) =
683 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
684 let singleton = node.singleton(q!(123));
685 let tick = node.tick();
686 let out = input
687 .batch(&tick, nondet!())
688 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
689 .cross_singleton(
690 singleton
691 .snapshot(&tick, nondet!())
692 .into_stream()
693 .count(),
694 )
695 .all_ticks()
696 .send_bincode_external(&external);
697
698 let nodes = flow
699 .with_process(&node, deployment.Localhost())
700 .with_external(&external, deployment.Localhost())
701 .deploy(&mut deployment);
702
703 deployment.deploy().await.unwrap();
704
705 let mut external_in = nodes.connect(in_port).await;
706 let mut external_out = nodes.connect(out).await;
707
708 deployment.start().await.unwrap();
709
710 external_in.send(1).await.unwrap();
711 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
712
713 external_in.send(2).await.unwrap();
714 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
715 }
716
717 #[tokio::test]
718 async fn tick_singleton_replay_cardinality() {
719 let mut deployment = Deployment::new();
720
721 let flow = FlowBuilder::new();
722 let node = flow.process::<()>();
723 let external = flow.external::<()>();
724
725 let (in_port, input) =
726 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
727 let tick = node.tick();
728 let singleton = tick.singleton(q!(123));
729 let out = input
730 .batch(&tick, nondet!())
731 .cross_singleton(singleton.clone())
732 .cross_singleton(singleton.into_stream().count())
733 .all_ticks()
734 .send_bincode_external(&external);
735
736 let nodes = flow
737 .with_process(&node, deployment.Localhost())
738 .with_external(&external, deployment.Localhost())
739 .deploy(&mut deployment);
740
741 deployment.deploy().await.unwrap();
742
743 let mut external_in = nodes.connect(in_port).await;
744 let mut external_out = nodes.connect(out).await;
745
746 deployment.start().await.unwrap();
747
748 external_in.send(1).await.unwrap();
749 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
750
751 external_in.send(2).await.unwrap();
752 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
753 }
754
755 #[tokio::test]
756 async fn external_bytes() {
757 let mut deployment = Deployment::new();
758
759 let flow = FlowBuilder::new();
760 let first_node = flow.process::<()>();
761 let external = flow.external::<()>();
762
763 let (in_port, input) = first_node.source_external_bytes(&external);
764 let out = input
765 .map(q!(|r| r.unwrap()))
766 .send_bincode_external(&external);
767
768 let nodes = flow
769 .with_process(&first_node, deployment.Localhost())
770 .with_external(&external, deployment.Localhost())
771 .deploy(&mut deployment);
772
773 deployment.deploy().await.unwrap();
774
775 let mut external_in = nodes.connect(in_port).await.1;
776 let mut external_out = nodes.connect(out).await;
777
778 deployment.start().await.unwrap();
779
780 external_in.send(vec![1, 2, 3].into()).await.unwrap();
781
782 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
783 }
784
785 #[tokio::test]
786 async fn multi_external_source() {
787 let mut deployment = Deployment::new();
788
789 let flow = FlowBuilder::new();
790 let first_node = flow.process::<()>();
791 let external = flow.external::<()>();
792
793 let (in_port, input, _membership, complete_sink) =
794 first_node.bidi_external_many_bincode(&external);
795 let out = input.entries().send_bincode_external(&external);
796 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
797
798 let nodes = flow
799 .with_process(&first_node, deployment.Localhost())
800 .with_external(&external, deployment.Localhost())
801 .deploy(&mut deployment);
802
803 deployment.deploy().await.unwrap();
804
805 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
806 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
807 let external_out = nodes.connect(out).await;
808
809 deployment.start().await.unwrap();
810
811 external_in_1.send(123).await.unwrap();
812 external_in_2.send(456).await.unwrap();
813
814 assert_eq!(
815 external_out.take(2).collect::<HashSet<_>>().await,
816 vec![(0, 123), (1, 456)].into_iter().collect()
817 );
818 }
819
820 #[tokio::test]
821 async fn second_connection_only_multi_source() {
822 let mut deployment = Deployment::new();
823
824 let flow = FlowBuilder::new();
825 let first_node = flow.process::<()>();
826 let external = flow.external::<()>();
827
828 let (in_port, input, _membership, complete_sink) =
829 first_node.bidi_external_many_bincode(&external);
830 let out = input.entries().send_bincode_external(&external);
831 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
832
833 let nodes = flow
834 .with_process(&first_node, deployment.Localhost())
835 .with_external(&external, deployment.Localhost())
836 .deploy(&mut deployment);
837
838 deployment.deploy().await.unwrap();
839
840 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
842 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
843 let mut external_out = nodes.connect(out).await;
844
845 deployment.start().await.unwrap();
846
847 external_in_2.send(456).await.unwrap();
848
849 assert_eq!(external_out.next().await.unwrap(), (1, 456));
850 }
851
852 #[tokio::test]
853 async fn multi_external_bytes() {
854 let mut deployment = Deployment::new();
855
856 let flow = FlowBuilder::new();
857 let first_node = flow.process::<()>();
858 let external = flow.external::<()>();
859
860 let (in_port, input, _membership, complete_sink) = first_node
861 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
862 let out = input.entries().send_bincode_external(&external);
863 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
864
865 let nodes = flow
866 .with_process(&first_node, deployment.Localhost())
867 .with_external(&external, deployment.Localhost())
868 .deploy(&mut deployment);
869
870 deployment.deploy().await.unwrap();
871
872 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
873 let mut external_in_2 = nodes.connect(in_port).await.1;
874 let external_out = nodes.connect(out).await;
875
876 deployment.start().await.unwrap();
877
878 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
879 external_in_2.send(vec![4, 5].into()).await.unwrap();
880
881 assert_eq!(
882 external_out.take(2).collect::<HashSet<_>>().await,
883 vec![
884 (0, (&[1u8, 2, 3] as &[u8]).into()),
885 (1, (&[4u8, 5] as &[u8]).into())
886 ]
887 .into_iter()
888 .collect()
889 );
890 }
891
892 #[tokio::test]
893 async fn echo_external_bytes() {
894 let mut deployment = Deployment::new();
895
896 let flow = FlowBuilder::new();
897 let first_node = flow.process::<()>();
898 let external = flow.external::<()>();
899
900 let (port, input, _membership, complete_sink) = first_node
901 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
902 complete_sink
903 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
904
905 let nodes = flow
906 .with_process(&first_node, deployment.Localhost())
907 .with_external(&external, deployment.Localhost())
908 .deploy(&mut deployment);
909
910 deployment.deploy().await.unwrap();
911
912 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
913 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
914
915 deployment.start().await.unwrap();
916
917 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
918 external_in_2.send(vec![4, 5].into()).await.unwrap();
919
920 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
921 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
922 }
923
924 #[tokio::test]
925 async fn echo_external_bincode() {
926 let mut deployment = Deployment::new();
927
928 let flow = FlowBuilder::new();
929 let first_node = flow.process::<()>();
930 let external = flow.external::<()>();
931
932 let (port, input, _membership, complete_sink) =
933 first_node.bidi_external_many_bincode(&external);
934 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
935
936 let nodes = flow
937 .with_process(&first_node, deployment.Localhost())
938 .with_external(&external, deployment.Localhost())
939 .deploy(&mut deployment);
940
941 deployment.deploy().await.unwrap();
942
943 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
944 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
945
946 deployment.start().await.unwrap();
947
948 external_in_1.send("hi".to_string()).await.unwrap();
949 external_in_2.send("hello".to_string()).await.unwrap();
950
951 assert_eq!(external_out_1.next().await.unwrap(), "HI");
952 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
953 }
954}