1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::{Bytes, BytesMut};
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
37use crate::location::cluster::ClusterIds;
38use crate::location::dynamic::LocationId;
39use crate::location::external_process::{
40 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many,
41};
42use crate::nondet::{NonDet, nondet};
43use crate::staging_util::get_this_crate;
44
45pub mod dynamic;
46
47#[expect(missing_docs, reason = "TODO")]
48pub mod external_process;
49pub use external_process::External;
50
51#[expect(missing_docs, reason = "TODO")]
52pub mod process;
53pub use process::Process;
54
55#[expect(missing_docs, reason = "TODO")]
56pub mod cluster;
57pub use cluster::Cluster;
58
59#[expect(missing_docs, reason = "TODO")]
60pub mod member_id;
61pub use member_id::MemberId;
62
63#[expect(missing_docs, reason = "TODO")]
64pub mod tick;
65pub use tick::{Atomic, NoTick, Tick};
66
67#[expect(missing_docs, reason = "TODO")]
68#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
69pub enum MembershipEvent {
70 Joined,
71 Left,
72}
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum NetworkHint {
77 Auto,
78 TcpPort(Option<u16>),
79}
80
81pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
82 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
83}
84
85#[expect(missing_docs, reason = "TODO")]
86#[expect(
87 private_bounds,
88 reason = "only internal Hydro code can define location types"
89)]
90pub trait Location<'a>: dynamic::DynLocation {
91 type Root: Location<'a>;
92
93 fn root(&self) -> Self::Root;
94
95 fn id(&self) -> LocationId {
96 dynamic::DynLocation::id(self)
97 }
98
99 fn tick(&self) -> Tick<Self>
100 where
101 Self: NoTick,
102 {
103 let next_id = self.flow_state().borrow_mut().next_clock_id;
104 self.flow_state().borrow_mut().next_clock_id += 1;
105 Tick {
106 id: next_id,
107 l: self.clone(),
108 }
109 }
110
111 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
112 where
113 Self: Sized + NoTick,
114 {
115 Stream::new(
116 self.clone(),
117 HydroNode::Persist {
118 inner: Box::new(HydroNode::Source {
119 source: HydroSource::Spin(),
120 metadata: self.new_node_metadata::<()>(),
121 }),
122 metadata: self.new_node_metadata::<()>(),
123 },
124 )
125 }
126
127 fn source_stream<T, E>(
128 &self,
129 e: impl QuotedWithContext<'a, E, Self>,
130 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
131 where
132 E: FuturesStream<Item = T> + Unpin,
133 Self: Sized + NoTick,
134 {
135 let e = e.splice_untyped_ctx(self);
136
137 Stream::new(
138 self.clone(),
139 HydroNode::Persist {
140 inner: Box::new(HydroNode::Source {
141 source: HydroSource::Stream(e.into()),
142 metadata: self.new_node_metadata::<T>(),
143 }),
144 metadata: self.new_node_metadata::<T>(),
145 },
146 )
147 }
148
149 fn source_iter<T, E>(
150 &self,
151 e: impl QuotedWithContext<'a, E, Self>,
152 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
153 where
154 E: IntoIterator<Item = T>,
155 Self: Sized + NoTick,
156 {
157 let e = e.splice_untyped_ctx(self);
160
161 Stream::new(
162 self.clone(),
163 HydroNode::Persist {
164 inner: Box::new(HydroNode::Source {
165 source: HydroSource::Iter(e.into()),
166 metadata: self.new_node_metadata::<T>(),
167 }),
168 metadata: self.new_node_metadata::<T>(),
169 },
170 )
171 }
172
173 fn source_cluster_members<C: 'a>(
174 &self,
175 cluster: &Cluster<'a, C>,
176 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
177 where
178 Self: Sized + NoTick,
179 {
180 let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
181 id: cluster.id,
182 _phantom: PhantomData,
183 };
184
185 self.source_iter(q!(underlying_memberids))
186 .map(q!(|id| (*id, MembershipEvent::Joined)))
187 .into_keyed()
188 }
189
190 fn source_external_bytes<L>(
191 &self,
192 from: &External<L>,
193 ) -> (
194 ExternalBytesPort,
195 Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
196 )
197 where
198 Self: Sized + NoTick,
199 {
200 let next_external_port_id = {
201 let mut flow_state = from.flow_state.borrow_mut();
202 let id = flow_state.next_external_out;
203 flow_state.next_external_out += 1;
204 id
205 };
206
207 (
208 ExternalBytesPort {
209 process_id: from.id,
210 port_id: next_external_port_id,
211 _phantom: Default::default(),
212 },
213 Stream::new(
214 self.clone(),
215 HydroNode::Persist {
216 inner: Box::new(HydroNode::ExternalInput {
217 from_external_id: from.id,
218 from_key: next_external_port_id,
219 from_many: false,
220 codec_type: quote_type::<LengthDelimitedCodec>().into(),
221 port_hint: NetworkHint::Auto,
222 instantiate_fn: DebugInstantiate::Building,
223 deserialize_fn: None,
224 metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
225 }),
226 metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
227 },
228 ),
229 )
230 }
231
232 fn source_external_bincode<L, T>(
233 &self,
234 from: &External<L>,
235 ) -> (
236 ExternalBincodeSink<T>,
237 Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>,
238 )
239 where
240 Self: Sized + NoTick,
241 T: Serialize + DeserializeOwned,
242 {
243 let next_external_port_id = {
244 let mut flow_state = from.flow_state.borrow_mut();
245 let id = flow_state.next_external_out;
246 flow_state.next_external_out += 1;
247 id
248 };
249
250 (
251 ExternalBincodeSink {
252 process_id: from.id,
253 port_id: next_external_port_id,
254 _phantom: PhantomData,
255 },
256 Stream::new(
257 self.clone(),
258 HydroNode::Persist {
259 inner: Box::new(HydroNode::ExternalInput {
260 from_external_id: from.id,
261 from_key: next_external_port_id,
262 from_many: false,
263 codec_type: quote_type::<LengthDelimitedCodec>().into(),
264 port_hint: NetworkHint::Auto,
265 instantiate_fn: DebugInstantiate::Building,
266 deserialize_fn: Some(
267 crate::live_collections::stream::networking::deserialize_bincode::<T>(
268 None,
269 )
270 .into(),
271 ),
272 metadata: self.new_node_metadata::<T>(),
273 }),
274 metadata: self.new_node_metadata::<T>(),
275 },
276 ),
277 )
278 }
279
280 #[expect(clippy::type_complexity, reason = "stream markers")]
281 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
282 &self,
283 from: &External<L>,
284 port_hint: NetworkHint,
285 ) -> (
286 ExternalBytesPort<Many>,
287 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
288 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
289 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
290 )
291 where
292 Self: Sized + NoTick,
293 {
294 let next_external_port_id = {
295 let mut flow_state = from.flow_state.borrow_mut();
296 let id = flow_state.next_external_out;
297 flow_state.next_external_out += 1;
298 id
299 };
300
301 let (fwd_ref, to_sink) =
302 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
303 let mut flow_state_borrow = self.flow_state().borrow_mut();
304
305 flow_state_borrow.push_root(HydroRoot::SendExternal {
306 to_external_id: from.id,
307 to_key: next_external_port_id,
308 to_many: true,
309 serialize_fn: None,
310 instantiate_fn: DebugInstantiate::Building,
311 input: Box::new(HydroNode::Unpersist {
312 inner: Box::new(to_sink.entries().ir_node.into_inner()),
313 metadata: self.new_node_metadata::<(u64, T)>(),
314 }),
315 op_metadata: HydroIrOpMetadata::new(),
316 });
317
318 let raw_stream: Stream<
319 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
320 Self,
321 Unbounded,
322 NoOrder,
323 ExactlyOnce,
324 > = Stream::new(
325 self.clone(),
326 HydroNode::Persist {
327 inner: Box::new(HydroNode::ExternalInput {
328 from_external_id: from.id,
329 from_key: next_external_port_id,
330 from_many: true,
331 codec_type: quote_type::<Codec>().into(),
332 port_hint,
333 instantiate_fn: DebugInstantiate::Building,
334 deserialize_fn: None,
335 metadata: self
336 .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
337 }),
338 metadata: self
339 .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
340 },
341 );
342
343 let membership_stream_ident = syn::Ident::new(
344 &format!(
345 "__hydro_deploy_many_{}_{}_membership",
346 from.id, next_external_port_id
347 ),
348 Span::call_site(),
349 );
350 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
351 let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, TotalOrder, ExactlyOnce> =
352 Stream::new(
353 self.clone(),
354 HydroNode::Persist {
355 inner: Box::new(HydroNode::Source {
356 source: HydroSource::Stream(membership_stream_expr.into()),
357 metadata: self.new_node_metadata::<(u64, bool)>(),
358 }),
359 metadata: self.new_node_metadata::<(u64, bool)>(),
360 },
361 );
362
363 (
364 ExternalBytesPort {
365 process_id: from.id,
366 port_id: next_external_port_id,
367 _phantom: PhantomData,
368 },
369 raw_stream
370 .flatten_ordered() .into_keyed()
372 .assume_ordering::<TotalOrder>(
373 nondet!()
374 ),
375 raw_membership_stream
376 .into_keyed()
377 .assume_ordering::<TotalOrder>(
378 nondet!(),
379 )
380 .map(q!(|join| {
381 if join {
382 MembershipEvent::Joined
383 } else {
384 MembershipEvent::Left
385 }
386 })),
387 fwd_ref,
388 )
389 }
390
391 #[expect(clippy::type_complexity, reason = "stream markers")]
392 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
393 &self,
394 from: &External<L>,
395 ) -> (
396 ExternalBincodeBidi<InT, OutT, Many>,
397 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
398 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
399 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
400 )
401 where
402 Self: Sized + NoTick,
403 {
404 let next_external_port_id = {
405 let mut flow_state = from.flow_state.borrow_mut();
406 let id = flow_state.next_external_out;
407 flow_state.next_external_out += 1;
408 id
409 };
410
411 let root = get_this_crate();
412
413 let (fwd_ref, to_sink) =
414 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
415 let mut flow_state_borrow = self.flow_state().borrow_mut();
416
417 let out_t_type = quote_type::<OutT>();
418 let ser_fn: syn::Expr = syn::parse_quote! {
419 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
420 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
421 )
422 };
423
424 flow_state_borrow.push_root(HydroRoot::SendExternal {
425 to_external_id: from.id,
426 to_key: next_external_port_id,
427 to_many: true,
428 serialize_fn: Some(ser_fn.into()),
429 instantiate_fn: DebugInstantiate::Building,
430 input: Box::new(HydroNode::Unpersist {
431 inner: Box::new(to_sink.entries().ir_node.into_inner()),
432 metadata: self.new_node_metadata::<(u64, Bytes)>(),
433 }),
434 op_metadata: HydroIrOpMetadata::new(),
435 });
436
437 let in_t_type = quote_type::<InT>();
438
439 let deser_fn: syn::Expr = syn::parse_quote! {
440 |res| {
441 let (id, b) = res.unwrap();
442 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
443 }
444 };
445
446 let raw_stream: Stream<(u64, InT), Self, Unbounded, NoOrder, ExactlyOnce> = Stream::new(
447 self.clone(),
448 HydroNode::Persist {
449 inner: Box::new(HydroNode::ExternalInput {
450 from_external_id: from.id,
451 from_key: next_external_port_id,
452 from_many: true,
453 codec_type: quote_type::<LengthDelimitedCodec>().into(),
454 port_hint: NetworkHint::Auto,
455 instantiate_fn: DebugInstantiate::Building,
456 deserialize_fn: Some(deser_fn.into()),
457 metadata: self.new_node_metadata::<(u64, InT)>(),
458 }),
459 metadata: self.new_node_metadata::<(u64, InT)>(),
460 },
461 );
462
463 let membership_stream_ident = syn::Ident::new(
464 &format!(
465 "__hydro_deploy_many_{}_{}_membership",
466 from.id, next_external_port_id
467 ),
468 Span::call_site(),
469 );
470 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
471 let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, NoOrder, ExactlyOnce> =
472 Stream::new(
473 self.clone(),
474 HydroNode::Persist {
475 inner: Box::new(HydroNode::Source {
476 source: HydroSource::Stream(membership_stream_expr.into()),
477 metadata: self.new_node_metadata::<(u64, bool)>(),
478 }),
479 metadata: self.new_node_metadata::<(u64, bool)>(),
480 },
481 );
482
483 (
484 ExternalBincodeBidi {
485 process_id: from.id,
486 port_id: next_external_port_id,
487 _phantom: PhantomData,
488 },
489 raw_stream.into_keyed().assume_ordering::<TotalOrder>(
490 nondet!(),
491 ),
492 raw_membership_stream
493 .into_keyed()
494 .assume_ordering::<TotalOrder>(
495 nondet!(),
496 )
497 .map(q!(|join| {
498 if join {
499 MembershipEvent::Joined
500 } else {
501 MembershipEvent::Left
502 }
503 })),
504 fwd_ref,
505 )
506 }
507
508 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
509 where
510 T: Clone,
511 Self: Sized + NoTick,
512 {
513 let e_arr = q!([e]);
517 let e = e_arr.splice_untyped_ctx(self);
518
519 Singleton::new(
523 self.clone(),
524 HydroNode::Persist {
525 inner: Box::new(HydroNode::Persist {
526 inner: Box::new(HydroNode::Source {
527 source: HydroSource::Iter(e.into()),
528 metadata: self.new_node_metadata::<T>(),
529 }),
530 metadata: self.new_node_metadata::<T>(),
531 }),
532 metadata: self.new_node_metadata::<T>(),
533 },
534 )
535 }
536
537 fn source_interval(
547 &self,
548 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
549 _nondet: NonDet,
550 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
551 where
552 Self: Sized + NoTick,
553 {
554 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
555 tokio::time::interval(interval)
556 )))
557 }
558
559 fn source_interval_delayed(
570 &self,
571 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
572 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
573 _nondet: NonDet,
574 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
575 where
576 Self: Sized + NoTick,
577 {
578 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
579 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
580 )))
581 }
582
583 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
584 where
585 S: CycleCollection<'a, ForwardRef, Location = Self>,
586 Self: NoTick,
587 {
588 let next_id = self.flow_state().borrow_mut().next_cycle_id();
589 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
590
591 (
592 ForwardHandle {
593 completed: false,
594 ident: ident.clone(),
595 expected_location: Location::id(self),
596 _phantom: PhantomData,
597 },
598 S::create_source(ident, self.clone()),
599 )
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use std::collections::HashSet;
606
607 use futures::{SinkExt, StreamExt};
608 use hydro_deploy::Deployment;
609 use stageleft::q;
610 use tokio_util::codec::LengthDelimitedCodec;
611
612 use crate::compile::builder::FlowBuilder;
613 use crate::location::{Location, NetworkHint};
614
615 #[tokio::test]
616 async fn external_bytes() {
617 let mut deployment = Deployment::new();
618
619 let flow = FlowBuilder::new();
620 let first_node = flow.process::<()>();
621 let external = flow.external::<()>();
622
623 let (in_port, input) = first_node.source_external_bytes(&external);
624 let out = input
625 .map(q!(|r| r.unwrap()))
626 .send_bincode_external(&external);
627
628 let nodes = flow
629 .with_process(&first_node, deployment.Localhost())
630 .with_external(&external, deployment.Localhost())
631 .deploy(&mut deployment);
632
633 deployment.deploy().await.unwrap();
634
635 let mut external_in = nodes.connect_sink_bytes(in_port).await;
636 let mut external_out = nodes.connect_source_bincode(out).await;
637
638 deployment.start().await.unwrap();
639
640 external_in.send(vec![1, 2, 3].into()).await.unwrap();
641
642 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
643 }
644
645 #[tokio::test]
646 async fn multi_external_source() {
647 let mut deployment = Deployment::new();
648
649 let flow = FlowBuilder::new();
650 let first_node = flow.process::<()>();
651 let external = flow.external::<()>();
652
653 let (in_port, input, _membership, complete_sink) =
654 first_node.bidi_external_many_bincode(&external);
655 let out = input.entries().send_bincode_external(&external);
656 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
657
658 let nodes = flow
659 .with_process(&first_node, deployment.Localhost())
660 .with_external(&external, deployment.Localhost())
661 .deploy(&mut deployment);
662
663 deployment.deploy().await.unwrap();
664
665 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
666 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
667 let external_out = nodes.connect_source_bincode(out).await;
668
669 deployment.start().await.unwrap();
670
671 external_in_1.send(123).await.unwrap();
672 external_in_2.send(456).await.unwrap();
673
674 assert_eq!(
675 external_out.take(2).collect::<HashSet<_>>().await,
676 vec![(0, 123), (1, 456)].into_iter().collect()
677 );
678 }
679
680 #[tokio::test]
681 async fn second_connection_only_multi_source() {
682 let mut deployment = Deployment::new();
683
684 let flow = FlowBuilder::new();
685 let first_node = flow.process::<()>();
686 let external = flow.external::<()>();
687
688 let (in_port, input, _membership, complete_sink) =
689 first_node.bidi_external_many_bincode(&external);
690 let out = input.entries().send_bincode_external(&external);
691 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
692
693 let nodes = flow
694 .with_process(&first_node, deployment.Localhost())
695 .with_external(&external, deployment.Localhost())
696 .deploy(&mut deployment);
697
698 deployment.deploy().await.unwrap();
699
700 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
702 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
703 let mut external_out = nodes.connect_source_bincode(out).await;
704
705 deployment.start().await.unwrap();
706
707 external_in_2.send(456).await.unwrap();
708
709 assert_eq!(external_out.next().await.unwrap(), (1, 456));
710 }
711
712 #[tokio::test]
713 async fn multi_external_bytes() {
714 let mut deployment = Deployment::new();
715
716 let flow = FlowBuilder::new();
717 let first_node = flow.process::<()>();
718 let external = flow.external::<()>();
719
720 let (in_port, input, _membership, complete_sink) = first_node
721 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
722 let out = input.entries().send_bincode_external(&external);
723 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
724
725 let nodes = flow
726 .with_process(&first_node, deployment.Localhost())
727 .with_external(&external, deployment.Localhost())
728 .deploy(&mut deployment);
729
730 deployment.deploy().await.unwrap();
731
732 let mut external_in_1 = nodes.connect_sink_bytes(in_port.clone()).await;
733 let mut external_in_2 = nodes.connect_sink_bytes(in_port).await;
734 let external_out = nodes.connect_source_bincode(out).await;
735
736 deployment.start().await.unwrap();
737
738 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
739 external_in_2.send(vec![4, 5].into()).await.unwrap();
740
741 assert_eq!(
742 external_out.take(2).collect::<HashSet<_>>().await,
743 vec![
744 (0, (&[1u8, 2, 3] as &[u8]).into()),
745 (1, (&[4u8, 5] as &[u8]).into())
746 ]
747 .into_iter()
748 .collect()
749 );
750 }
751
752 #[tokio::test]
753 async fn echo_external_bytes() {
754 let mut deployment = Deployment::new();
755
756 let flow = FlowBuilder::new();
757 let first_node = flow.process::<()>();
758 let external = flow.external::<()>();
759
760 let (port, input, _membership, complete_sink) = first_node
761 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
762 complete_sink
763 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
764
765 let nodes = flow
766 .with_process(&first_node, deployment.Localhost())
767 .with_external(&external, deployment.Localhost())
768 .deploy(&mut deployment);
769
770 deployment.deploy().await.unwrap();
771
772 let (mut external_out_1, mut external_in_1) = nodes.connect_bytes(port.clone()).await;
773 let (mut external_out_2, mut external_in_2) = nodes.connect_bytes(port).await;
774
775 deployment.start().await.unwrap();
776
777 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
778 external_in_2.send(vec![4, 5].into()).await.unwrap();
779
780 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
781 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
782 }
783
784 #[tokio::test]
785 async fn echo_external_bincode() {
786 let mut deployment = Deployment::new();
787
788 let flow = FlowBuilder::new();
789 let first_node = flow.process::<()>();
790 let external = flow.external::<()>();
791
792 let (port, input, _membership, complete_sink) =
793 first_node.bidi_external_many_bincode(&external);
794 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
795
796 let nodes = flow
797 .with_process(&first_node, deployment.Localhost())
798 .with_external(&external, deployment.Localhost())
799 .deploy(&mut deployment);
800
801 deployment.deploy().await.unwrap();
802
803 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
804 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
805
806 deployment.start().await.unwrap();
807
808 external_in_1.send("hi".to_string()).await.unwrap();
809 external_in_2.send("hello".to_string()).await.unwrap();
810
811 assert_eq!(external_out_1.next().await.unwrap(), "HI");
812 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
813 }
814}