1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use bytes::{Bytes, BytesMut};
6use futures::stream::Stream as FuturesStream;
7use proc_macro2::Span;
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use stageleft::{QuotedWithContext, q, quote_type};
11use syn::parse_quote;
12use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
13
14use super::builder::FlowState;
15use crate::backtrace::get_backtrace;
16use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
17use crate::ir::{DebugInstantiate, HydroIrMetadata, HydroLeaf, HydroNode, HydroSource};
18use crate::keyed_stream::KeyedStream;
19use crate::location::cluster::ClusterIds;
20use crate::location::external_process::{
21 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many,
22};
23use crate::staging_util::get_this_crate;
24use crate::stream::ExactlyOnce;
25use crate::unsafety::NonDet;
26use crate::{NoOrder, Singleton, Stream, TotalOrder, Unbounded, nondet};
27
28pub mod external_process;
29pub use external_process::External;
30
31pub mod process;
32pub use process::Process;
33
34pub mod cluster;
35pub use cluster::Cluster;
36
37pub mod member_id;
38pub use member_id::MemberId;
39
40pub mod tick;
41pub use tick::{Atomic, NoTick, Tick};
42
43#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
44pub enum LocationId {
45 Process(usize),
46 Cluster(usize),
47 Tick(usize, Box<LocationId>),
48}
49
50#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
51pub enum MembershipEvent {
52 Joined,
53 Left,
54}
55
56impl LocationId {
57 pub fn root(&self) -> &LocationId {
58 match self {
59 LocationId::Process(_) => self,
60 LocationId::Cluster(_) => self,
61 LocationId::Tick(_, id) => id.root(),
62 }
63 }
64
65 pub fn is_root(&self) -> bool {
66 match self {
67 LocationId::Process(_) | LocationId::Cluster(_) => true,
68 LocationId::Tick(_, _) => false,
69 }
70 }
71
72 pub fn raw_id(&self) -> usize {
73 match self {
74 LocationId::Process(id) => *id,
75 LocationId::Cluster(id) => *id,
76 LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
77 }
78 }
79
80 pub fn swap_root(&mut self, new_root: LocationId) {
81 match self {
82 LocationId::Tick(_, id) => {
83 id.swap_root(new_root);
84 }
85 _ => {
86 assert!(new_root.is_root());
87 *self = new_root;
88 }
89 }
90 }
91}
92
93pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
94 assert_eq!(l1.id(), l2.id(), "locations do not match");
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
98pub enum NetworkHint {
99 Auto,
100 TcpPort(Option<u16>),
101}
102
103pub trait Location<'a>: Clone {
104 type Root: Location<'a>;
105
106 fn root(&self) -> Self::Root;
107
108 fn id(&self) -> LocationId;
109
110 fn flow_state(&self) -> &FlowState;
111
112 fn is_top_level() -> bool;
113
114 fn tick(&self) -> Tick<Self>
115 where
116 Self: NoTick,
117 {
118 let next_id = self.flow_state().borrow_mut().next_clock_id;
119 self.flow_state().borrow_mut().next_clock_id += 1;
120 Tick {
121 id: next_id,
122 l: self.clone(),
123 }
124 }
125
126 fn next_node_id(&self) -> usize {
127 let next_id = self.flow_state().borrow_mut().next_node_id;
128 self.flow_state().borrow_mut().next_node_id += 1;
129 next_id
130 }
131
132 #[inline(never)]
133 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
134 HydroIrMetadata {
135 location_kind: self.id(),
136 backtrace: get_backtrace(2),
137 output_type: Some(quote_type::<T>().into()),
138 cardinality: None,
139 cpu_usage: None,
140 network_recv_cpu_usage: None,
141 id: None,
142 tag: None,
143 }
144 }
145
146 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
147 where
148 Self: Sized + NoTick,
149 {
150 Stream::new(
151 self.clone(),
152 HydroNode::Persist {
153 inner: Box::new(HydroNode::Source {
154 source: HydroSource::Spin(),
155 metadata: self.new_node_metadata::<()>(),
156 }),
157 metadata: self.new_node_metadata::<()>(),
158 },
159 )
160 }
161
162 fn source_stream<T, E>(
163 &self,
164 e: impl QuotedWithContext<'a, E, Self>,
165 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
166 where
167 E: FuturesStream<Item = T> + Unpin,
168 Self: Sized + NoTick,
169 {
170 let e = e.splice_untyped_ctx(self);
171
172 Stream::new(
173 self.clone(),
174 HydroNode::Persist {
175 inner: Box::new(HydroNode::Source {
176 source: HydroSource::Stream(e.into()),
177 metadata: self.new_node_metadata::<T>(),
178 }),
179 metadata: self.new_node_metadata::<T>(),
180 },
181 )
182 }
183
184 fn source_iter<T, E>(
185 &self,
186 e: impl QuotedWithContext<'a, E, Self>,
187 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
188 where
189 E: IntoIterator<Item = T>,
190 Self: Sized + NoTick,
191 {
192 let e = e.splice_untyped_ctx(self);
195
196 Stream::new(
197 self.clone(),
198 HydroNode::Persist {
199 inner: Box::new(HydroNode::Source {
200 source: HydroSource::Iter(e.into()),
201 metadata: self.new_node_metadata::<T>(),
202 }),
203 metadata: self.new_node_metadata::<T>(),
204 },
205 )
206 }
207
208 fn source_cluster_members<C: 'a>(
209 &self,
210 cluster: &Cluster<'a, C>,
211 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
212 where
213 Self: Sized + NoTick,
214 {
215 let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
216 id: cluster.id,
217 _phantom: PhantomData,
218 };
219
220 self.source_iter(q!(underlying_memberids))
221 .map(q!(|id| (*id, MembershipEvent::Joined)))
222 .into_keyed()
223 }
224
225 fn source_external_bytes<L>(
226 &self,
227 from: &External<L>,
228 ) -> (
229 ExternalBytesPort,
230 Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
231 )
232 where
233 Self: Sized + NoTick,
234 {
235 let next_external_port_id = {
236 let mut flow_state = from.flow_state.borrow_mut();
237 let id = flow_state.next_external_out;
238 flow_state.next_external_out += 1;
239 id
240 };
241
242 (
243 ExternalBytesPort {
244 process_id: from.id,
245 port_id: next_external_port_id,
246 _phantom: Default::default(),
247 },
248 Stream::new(
249 self.clone(),
250 HydroNode::Persist {
251 inner: Box::new(HydroNode::ExternalInput {
252 from_external_id: from.id,
253 from_key: next_external_port_id,
254 from_many: false,
255 codec_type: quote_type::<LengthDelimitedCodec>().into(),
256 port_hint: NetworkHint::Auto,
257 instantiate_fn: DebugInstantiate::Building,
258 deserialize_fn: None,
259 metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
260 }),
261 metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
262 },
263 ),
264 )
265 }
266
267 fn source_external_bincode<L, T>(
268 &self,
269 from: &External<L>,
270 ) -> (
271 ExternalBincodeSink<T>,
272 Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>,
273 )
274 where
275 Self: Sized + NoTick,
276 T: Serialize + DeserializeOwned,
277 {
278 let next_external_port_id = {
279 let mut flow_state = from.flow_state.borrow_mut();
280 let id = flow_state.next_external_out;
281 flow_state.next_external_out += 1;
282 id
283 };
284
285 (
286 ExternalBincodeSink {
287 process_id: from.id,
288 port_id: next_external_port_id,
289 _phantom: PhantomData,
290 },
291 Stream::new(
292 self.clone(),
293 HydroNode::Persist {
294 inner: Box::new(HydroNode::ExternalInput {
295 from_external_id: from.id,
296 from_key: next_external_port_id,
297 from_many: false,
298 codec_type: quote_type::<LengthDelimitedCodec>().into(),
299 port_hint: NetworkHint::Auto,
300 instantiate_fn: DebugInstantiate::Building,
301 deserialize_fn: Some(
302 crate::stream::networking::deserialize_bincode::<T>(None).into(),
303 ),
304 metadata: self.new_node_metadata::<T>(),
305 }),
306 metadata: self.new_node_metadata::<T>(),
307 },
308 ),
309 )
310 }
311
312 #[expect(clippy::type_complexity, reason = "stream markers")]
313 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
314 &self,
315 from: &External<L>,
316 port_hint: NetworkHint,
317 ) -> (
318 ExternalBytesPort<Many>,
319 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
320 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
321 ForwardRef<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
322 )
323 where
324 Self: Sized + NoTick,
325 {
326 let next_external_port_id = {
327 let mut flow_state = from.flow_state.borrow_mut();
328 let id = flow_state.next_external_out;
329 flow_state.next_external_out += 1;
330 id
331 };
332
333 let (fwd_ref, to_sink) =
334 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
335 let mut flow_state_borrow = self.flow_state().borrow_mut();
336
337 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
338
339 leaves.push(HydroLeaf::SendExternal {
340 to_external_id: from.id,
341 to_key: next_external_port_id,
342 to_many: true,
343 serialize_fn: None,
344 instantiate_fn: DebugInstantiate::Building,
345 input: Box::new(HydroNode::Unpersist {
346 inner: Box::new(to_sink.entries().ir_node.into_inner()),
347 metadata: self.new_node_metadata::<(u64, T)>(),
348 }),
349 });
350
351 let raw_stream: Stream<
352 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
353 Self,
354 Unbounded,
355 NoOrder,
356 ExactlyOnce,
357 > = Stream::new(
358 self.clone(),
359 HydroNode::Persist {
360 inner: Box::new(HydroNode::ExternalInput {
361 from_external_id: from.id,
362 from_key: next_external_port_id,
363 from_many: true,
364 codec_type: quote_type::<Codec>().into(),
365 port_hint,
366 instantiate_fn: DebugInstantiate::Building,
367 deserialize_fn: None,
368 metadata: self
369 .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
370 }),
371 metadata: self
372 .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
373 },
374 );
375
376 let membership_stream_ident = syn::Ident::new(
377 &format!(
378 "__hydro_deploy_many_{}_{}_membership",
379 from.id, next_external_port_id
380 ),
381 Span::call_site(),
382 );
383 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
384 let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, TotalOrder, ExactlyOnce> =
385 Stream::new(
386 self.clone(),
387 HydroNode::Persist {
388 inner: Box::new(HydroNode::Source {
389 source: HydroSource::Stream(membership_stream_expr.into()),
390 metadata: self.new_node_metadata::<(u64, bool)>(),
391 }),
392 metadata: self.new_node_metadata::<(u64, bool)>(),
393 },
394 );
395
396 (
397 ExternalBytesPort {
398 process_id: from.id,
399 port_id: next_external_port_id,
400 _phantom: PhantomData,
401 },
402 raw_stream
403 .flatten_ordered() .into_keyed()
405 .assume_ordering::<TotalOrder>(
406 nondet!()
407 ),
408 raw_membership_stream
409 .into_keyed()
410 .assume_ordering::<TotalOrder>(
411 nondet!(),
412 )
413 .map(q!(|join| {
414 if join {
415 MembershipEvent::Joined
416 } else {
417 MembershipEvent::Left
418 }
419 })),
420 fwd_ref,
421 )
422 }
423
424 #[expect(clippy::type_complexity, reason = "stream markers")]
425 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
426 &self,
427 from: &External<L>,
428 ) -> (
429 ExternalBincodeBidi<InT, OutT, Many>,
430 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
431 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
432 ForwardRef<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
433 )
434 where
435 Self: Sized + NoTick,
436 {
437 let next_external_port_id = {
438 let mut flow_state = from.flow_state.borrow_mut();
439 let id = flow_state.next_external_out;
440 flow_state.next_external_out += 1;
441 id
442 };
443
444 let root = get_this_crate();
445
446 let (fwd_ref, to_sink) =
447 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
448 let mut flow_state_borrow = self.flow_state().borrow_mut();
449
450 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
451
452 let out_t_type = quote_type::<OutT>();
453 let ser_fn: syn::Expr = syn::parse_quote! {
454 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
455 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
456 )
457 };
458
459 leaves.push(HydroLeaf::SendExternal {
460 to_external_id: from.id,
461 to_key: next_external_port_id,
462 to_many: true,
463 serialize_fn: Some(ser_fn.into()),
464 instantiate_fn: DebugInstantiate::Building,
465 input: Box::new(HydroNode::Unpersist {
466 inner: Box::new(to_sink.entries().ir_node.into_inner()),
467 metadata: self.new_node_metadata::<(u64, Bytes)>(),
468 }),
469 });
470
471 let in_t_type = quote_type::<InT>();
472
473 let deser_fn: syn::Expr = syn::parse_quote! {
474 |res| {
475 let (id, b) = res.unwrap();
476 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
477 }
478 };
479
480 let raw_stream: Stream<(u64, InT), Self, Unbounded, NoOrder, ExactlyOnce> = Stream::new(
481 self.clone(),
482 HydroNode::Persist {
483 inner: Box::new(HydroNode::ExternalInput {
484 from_external_id: from.id,
485 from_key: next_external_port_id,
486 from_many: true,
487 codec_type: quote_type::<LengthDelimitedCodec>().into(),
488 port_hint: NetworkHint::Auto,
489 instantiate_fn: DebugInstantiate::Building,
490 deserialize_fn: Some(deser_fn.into()),
491 metadata: self.new_node_metadata::<(u64, InT)>(),
492 }),
493 metadata: self.new_node_metadata::<(u64, InT)>(),
494 },
495 );
496
497 let membership_stream_ident = syn::Ident::new(
498 &format!(
499 "__hydro_deploy_many_{}_{}_membership",
500 from.id, next_external_port_id
501 ),
502 Span::call_site(),
503 );
504 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
505 let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, NoOrder, ExactlyOnce> =
506 Stream::new(
507 self.clone(),
508 HydroNode::Persist {
509 inner: Box::new(HydroNode::Source {
510 source: HydroSource::Stream(membership_stream_expr.into()),
511 metadata: self.new_node_metadata::<(u64, bool)>(),
512 }),
513 metadata: self.new_node_metadata::<(u64, bool)>(),
514 },
515 );
516
517 (
518 ExternalBincodeBidi {
519 process_id: from.id,
520 port_id: next_external_port_id,
521 _phantom: PhantomData,
522 },
523 raw_stream.into_keyed().assume_ordering::<TotalOrder>(
524 nondet!(),
525 ),
526 raw_membership_stream
527 .into_keyed()
528 .assume_ordering::<TotalOrder>(
529 nondet!(),
530 )
531 .map(q!(|join| {
532 if join {
533 MembershipEvent::Joined
534 } else {
535 MembershipEvent::Left
536 }
537 })),
538 fwd_ref,
539 )
540 }
541
542 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
543 where
544 T: Clone,
545 Self: Sized + NoTick,
546 {
547 let e_arr = q!([e]);
551 let e = e_arr.splice_untyped_ctx(self);
552
553 Singleton::new(
557 self.clone(),
558 HydroNode::Persist {
559 inner: Box::new(HydroNode::Persist {
560 inner: Box::new(HydroNode::Source {
561 source: HydroSource::Iter(e.into()),
562 metadata: self.new_node_metadata::<T>(),
563 }),
564 metadata: self.new_node_metadata::<T>(),
565 }),
566 metadata: self.new_node_metadata::<T>(),
567 },
568 )
569 }
570
571 fn source_interval(
581 &self,
582 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
583 _nondet: NonDet,
584 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
585 where
586 Self: Sized + NoTick,
587 {
588 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
589 tokio::time::interval(interval)
590 )))
591 }
592
593 fn source_interval_delayed(
604 &self,
605 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
606 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
607 _nondet: NonDet,
608 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
609 where
610 Self: Sized + NoTick,
611 {
612 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
613 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
614 )))
615 }
616
617 fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
618 where
619 S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
620 Self: NoTick,
621 {
622 let next_id = self.flow_state().borrow_mut().next_cycle_id();
623 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
624
625 (
626 ForwardRef {
627 completed: false,
628 ident: ident.clone(),
629 expected_location: self.id(),
630 _phantom: PhantomData,
631 },
632 S::create_source(ident, self.clone()),
633 )
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use std::collections::HashSet;
640
641 use futures::{SinkExt, StreamExt};
642 use hydro_deploy::Deployment;
643 use stageleft::q;
644 use tokio_util::codec::LengthDelimitedCodec;
645
646 use crate::{FlowBuilder, Location, NetworkHint};
647
648 #[tokio::test]
649 async fn external_bytes() {
650 let mut deployment = Deployment::new();
651
652 let flow = FlowBuilder::new();
653 let first_node = flow.process::<()>();
654 let external = flow.external::<()>();
655
656 let (in_port, input) = first_node.source_external_bytes(&external);
657 let out = input
658 .map(q!(|r| r.unwrap()))
659 .send_bincode_external(&external);
660
661 let nodes = flow
662 .with_process(&first_node, deployment.Localhost())
663 .with_external(&external, deployment.Localhost())
664 .deploy(&mut deployment);
665
666 deployment.deploy().await.unwrap();
667
668 let mut external_in = nodes.connect_sink_bytes(in_port).await;
669 let mut external_out = nodes.connect_source_bincode(out).await;
670
671 deployment.start().await.unwrap();
672
673 external_in.send(vec![1, 2, 3].into()).await.unwrap();
674
675 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
676 }
677
678 #[tokio::test]
679 async fn multi_external_source() {
680 let mut deployment = Deployment::new();
681
682 let flow = FlowBuilder::new();
683 let first_node = flow.process::<()>();
684 let external = flow.external::<()>();
685
686 let (in_port, input, _membership, complete_sink) =
687 first_node.bidi_external_many_bincode(&external);
688 let out = input.entries().send_bincode_external(&external);
689 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
690
691 let nodes = flow
692 .with_process(&first_node, deployment.Localhost())
693 .with_external(&external, deployment.Localhost())
694 .deploy(&mut deployment);
695
696 deployment.deploy().await.unwrap();
697
698 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
699 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
700 let external_out = nodes.connect_source_bincode(out).await;
701
702 deployment.start().await.unwrap();
703
704 external_in_1.send(123).await.unwrap();
705 external_in_2.send(456).await.unwrap();
706
707 assert_eq!(
708 external_out.take(2).collect::<HashSet<_>>().await,
709 vec![(0, 123), (1, 456)].into_iter().collect()
710 );
711 }
712
713 #[tokio::test]
714 async fn second_connection_only_multi_source() {
715 let mut deployment = Deployment::new();
716
717 let flow = FlowBuilder::new();
718 let first_node = flow.process::<()>();
719 let external = flow.external::<()>();
720
721 let (in_port, input, _membership, complete_sink) =
722 first_node.bidi_external_many_bincode(&external);
723 let out = input.entries().send_bincode_external(&external);
724 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
725
726 let nodes = flow
727 .with_process(&first_node, deployment.Localhost())
728 .with_external(&external, deployment.Localhost())
729 .deploy(&mut deployment);
730
731 deployment.deploy().await.unwrap();
732
733 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
735 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
736 let mut external_out = nodes.connect_source_bincode(out).await;
737
738 deployment.start().await.unwrap();
739
740 external_in_2.send(456).await.unwrap();
741
742 assert_eq!(external_out.next().await.unwrap(), (1, 456));
743 }
744
745 #[tokio::test]
746 async fn multi_external_bytes() {
747 let mut deployment = Deployment::new();
748
749 let flow = FlowBuilder::new();
750 let first_node = flow.process::<()>();
751 let external = flow.external::<()>();
752
753 let (in_port, input, _membership, complete_sink) = first_node
754 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
755 let out = input.entries().send_bincode_external(&external);
756 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
757
758 let nodes = flow
759 .with_process(&first_node, deployment.Localhost())
760 .with_external(&external, deployment.Localhost())
761 .deploy(&mut deployment);
762
763 deployment.deploy().await.unwrap();
764
765 let mut external_in_1 = nodes.connect_sink_bytes(in_port.clone()).await;
766 let mut external_in_2 = nodes.connect_sink_bytes(in_port).await;
767 let external_out = nodes.connect_source_bincode(out).await;
768
769 deployment.start().await.unwrap();
770
771 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
772 external_in_2.send(vec![4, 5].into()).await.unwrap();
773
774 assert_eq!(
775 external_out.take(2).collect::<HashSet<_>>().await,
776 vec![
777 (0, (&[1u8, 2, 3] as &[u8]).into()),
778 (1, (&[4u8, 5] as &[u8]).into())
779 ]
780 .into_iter()
781 .collect()
782 );
783 }
784
785 #[tokio::test]
786 async fn echo_external_bytes() {
787 let mut deployment = Deployment::new();
788
789 let flow = FlowBuilder::new();
790 let first_node = flow.process::<()>();
791 let external = flow.external::<()>();
792
793 let (port, input, _membership, complete_sink) = first_node
794 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
795 complete_sink
796 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
797
798 let nodes = flow
799 .with_process(&first_node, deployment.Localhost())
800 .with_external(&external, deployment.Localhost())
801 .deploy(&mut deployment);
802
803 deployment.deploy().await.unwrap();
804
805 let (mut external_out_1, mut external_in_1) = nodes.connect_bytes(port.clone()).await;
806 let (mut external_out_2, mut external_in_2) = nodes.connect_bytes(port).await;
807
808 deployment.start().await.unwrap();
809
810 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
811 external_in_2.send(vec![4, 5].into()).await.unwrap();
812
813 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
814 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
815 }
816
817 #[tokio::test]
818 async fn echo_external_bincode() {
819 let mut deployment = Deployment::new();
820
821 let flow = FlowBuilder::new();
822 let first_node = flow.process::<()>();
823 let external = flow.external::<()>();
824
825 let (port, input, _membership, complete_sink) =
826 first_node.bidi_external_many_bincode(&external);
827 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
828
829 let nodes = flow
830 .with_process(&first_node, deployment.Localhost())
831 .with_external(&external, deployment.Localhost())
832 .deploy(&mut deployment);
833
834 deployment.deploy().await.unwrap();
835
836 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
837 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
838
839 deployment.start().await.unwrap();
840
841 external_in_1.send("hi".to_string()).await.unwrap();
842 external_in_2.send("hello".to_string()).await.unwrap();
843
844 assert_eq!(external_out_1.next().await.unwrap(), "HI");
845 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
846 }
847}