hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
27 membership.fold(
28 q!(|| false),
29 q!(|present, event| {
30 match event {
31 MembershipEvent::Joined => *present = true,
32 MembershipEvent::Left => *present = false,
33 }
34 }),
35 )
36}
37
38fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
39 let root = get_this_crate();
40
41 if is_demux {
42 parse_quote! {
43 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
44 |(id, data)| {
45 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
46 }
47 )
48 }
49 } else {
50 parse_quote! {
51 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
52 |data| {
53 #root::runtime_support::bincode::serialize(&data).unwrap().into()
54 }
55 )
56 }
57 }
58}
59
60pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
61 serialize_bincode_with_type(is_demux, "e_type::<T>())
62}
63
64fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
65 let root = get_this_crate();
66
67 if let Some(c_type) = tagged {
68 parse_quote! {
69 |res| {
70 let (id, b) = res.unwrap();
71 (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
72 }
73 }
74 } else {
75 parse_quote! {
76 |res| {
77 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
78 }
79 }
80 }
81}
82
83pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
84 deserialize_bincode_with_type(tagged, "e_type::<T>())
85}
86
87impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
88 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
89 /// using [`bincode`] to serialize/deserialize messages.
90 ///
91 /// The returned stream captures the elements received at the destination, where values will
92 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
93 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
94 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
95 /// dropped no further messages will be sent.
96 ///
97 /// # Example
98 /// ```rust
99 /// # use hydro_lang::prelude::*;
100 /// # use futures::StreamExt;
101 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
102 /// let p1 = flow.process::<()>();
103 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
104 /// let p2 = flow.process::<()>();
105 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
106 /// // 1, 2, 3
107 /// # on_p2.send_bincode(&p_out)
108 /// # }, |mut stream| async move {
109 /// # for w in 1..=3 {
110 /// # assert_eq!(stream.next().await, Some(w));
111 /// # }
112 /// # }));
113 /// ```
114 pub fn send_bincode<L2>(
115 self,
116 other: &Process<'a, L2>,
117 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
118 where
119 T: Serialize + DeserializeOwned,
120 {
121 let serialize_pipeline = Some(serialize_bincode::<T>(false));
122
123 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
124
125 Stream::new(
126 other.clone(),
127 HydroNode::Network {
128 serialize_fn: serialize_pipeline.map(|e| e.into()),
129 instantiate_fn: DebugInstantiate::Building,
130 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
131 input: Box::new(self.ir_node.into_inner()),
132 metadata: other.new_node_metadata(
133 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
134 ),
135 },
136 )
137 }
138
139 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
140 /// using [`bincode`] to serialize/deserialize messages.
141 ///
142 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
143 /// membership information. This is a common pattern in distributed systems for broadcasting data to
144 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
145 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
146 /// each element to all cluster members.
147 ///
148 /// # Non-Determinism
149 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
150 /// to the current cluster members _at that point in time_. Depending on when we are notified of
151 /// membership changes, we will broadcast each element to different members.
152 ///
153 /// # Example
154 /// ```rust
155 /// # use hydro_lang::prelude::*;
156 /// # use futures::StreamExt;
157 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
158 /// let p1 = flow.process::<()>();
159 /// let workers: Cluster<()> = flow.cluster::<()>();
160 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
161 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
162 /// # on_worker.send_bincode(&p2).entries()
163 /// // if there are 4 members in the cluster, each receives one element
164 /// // - MemberId::<()>(0): [123]
165 /// // - MemberId::<()>(1): [123]
166 /// // - MemberId::<()>(2): [123]
167 /// // - MemberId::<()>(3): [123]
168 /// # }, |mut stream| async move {
169 /// # let mut results = Vec::new();
170 /// # for w in 0..4 {
171 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
172 /// # }
173 /// # results.sort();
174 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
175 /// # }));
176 /// ```
177 pub fn broadcast_bincode<L2: 'a>(
178 self,
179 other: &Cluster<'a, L2>,
180 nondet_membership: NonDet,
181 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
182 where
183 T: Clone + Serialize + DeserializeOwned,
184 {
185 let ids = track_membership(self.location.source_cluster_members(other));
186 let join_tick = self.location.tick();
187 let current_members = ids
188 .snapshot(&join_tick, nondet_membership)
189 .filter(q!(|b| *b));
190
191 self.batch(&join_tick, nondet_membership)
192 .repeat_with_keys(current_members)
193 .all_ticks()
194 .demux_bincode(other)
195 }
196
197 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
198 /// serialization. The external process can receive these elements by establishing a TCP
199 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
200 ///
201 /// # Example
202 /// ```rust
203 /// # use hydro_lang::prelude::*;
204 /// # use futures::StreamExt;
205 /// # tokio_test::block_on(async move {
206 /// let flow = FlowBuilder::new();
207 /// let process = flow.process::<()>();
208 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
209 /// let external = flow.external::<()>();
210 /// let external_handle = numbers.send_bincode_external(&external);
211 ///
212 /// let mut deployment = hydro_deploy::Deployment::new();
213 /// let nodes = flow
214 /// .with_process(&process, deployment.Localhost())
215 /// .with_external(&external, deployment.Localhost())
216 /// .deploy(&mut deployment);
217 ///
218 /// deployment.deploy().await.unwrap();
219 /// // establish the TCP connection
220 /// let mut external_recv_stream = nodes.connect(external_handle).await;
221 /// deployment.start().await.unwrap();
222 ///
223 /// for w in 1..=3 {
224 /// assert_eq!(external_recv_stream.next().await, Some(w));
225 /// }
226 /// # });
227 /// ```
228 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
229 where
230 T: Serialize + DeserializeOwned,
231 {
232 let serialize_pipeline = Some(serialize_bincode::<T>(false));
233
234 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
235
236 let external_key = flow_state_borrow.next_external_out;
237 flow_state_borrow.next_external_out += 1;
238
239 flow_state_borrow.push_root(HydroRoot::SendExternal {
240 to_external_id: other.id,
241 to_key: external_key,
242 to_many: false,
243 unpaired: true,
244 serialize_fn: serialize_pipeline.map(|e| e.into()),
245 instantiate_fn: DebugInstantiate::Building,
246 input: Box::new(self.ir_node.into_inner()),
247 op_metadata: HydroIrOpMetadata::new(),
248 });
249
250 ExternalBincodeStream {
251 process_id: other.id,
252 port_id: external_key,
253 _phantom: PhantomData,
254 }
255 }
256}
257
258impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
259 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
260{
261 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
262 /// using [`bincode`] to serialize/deserialize messages.
263 ///
264 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
265 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
266 /// this API allows precise targeting of specific cluster members rather than broadcasting to
267 /// all members.
268 ///
269 /// # Example
270 /// ```rust
271 /// # use hydro_lang::prelude::*;
272 /// # use futures::StreamExt;
273 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
274 /// let p1 = flow.process::<()>();
275 /// let workers: Cluster<()> = flow.cluster::<()>();
276 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
277 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
278 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
279 /// .demux_bincode(&workers);
280 /// # on_worker.send_bincode(&p2).entries()
281 /// // if there are 4 members in the cluster, each receives one element
282 /// // - MemberId::<()>(0): [0]
283 /// // - MemberId::<()>(1): [1]
284 /// // - MemberId::<()>(2): [2]
285 /// // - MemberId::<()>(3): [3]
286 /// # }, |mut stream| async move {
287 /// # let mut results = Vec::new();
288 /// # for w in 0..4 {
289 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
290 /// # }
291 /// # results.sort();
292 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
293 /// # }));
294 /// ```
295 pub fn demux_bincode(
296 self,
297 other: &Cluster<'a, L2>,
298 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
299 where
300 T: Serialize + DeserializeOwned,
301 {
302 self.into_keyed().demux_bincode(other)
303 }
304}
305
306impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
307 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
308 /// [`bincode`] to serialize/deserialize messages.
309 ///
310 /// This provides load balancing by evenly distributing work across cluster members. The
311 /// distribution is deterministic based on element order - the first element goes to member 0,
312 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
313 ///
314 /// # Non-Determinism
315 /// The set of cluster members may asynchronously change over time. Each element is distributed
316 /// based on the current cluster membership _at that point in time_. Depending on when cluster
317 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
318 /// membership is stable, the order of members in the round-robin pattern may change across runs.
319 ///
320 /// # Ordering Requirements
321 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
322 /// order of messages and retries affects the round-robin pattern.
323 ///
324 /// # Example
325 /// ```rust
326 /// # use hydro_lang::prelude::*;
327 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
328 /// # use futures::StreamExt;
329 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
330 /// let p1 = flow.process::<()>();
331 /// let workers: Cluster<()> = flow.cluster::<()>();
332 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
333 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
334 /// on_worker.send_bincode(&p2)
335 /// # .first().values() // we use first to assert that each member gets one element
336 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
337 /// // - MemberId::<()>(?): [1]
338 /// // - MemberId::<()>(?): [2]
339 /// // - MemberId::<()>(?): [3]
340 /// // - MemberId::<()>(?): [4]
341 /// # }, |mut stream| async move {
342 /// # let mut results = Vec::new();
343 /// # for w in 0..4 {
344 /// # results.push(stream.next().await.unwrap());
345 /// # }
346 /// # results.sort();
347 /// # assert_eq!(results, vec![1, 2, 3, 4]);
348 /// # }));
349 /// ```
350 pub fn round_robin_bincode<L2: 'a>(
351 self,
352 other: &Cluster<'a, L2>,
353 nondet_membership: NonDet,
354 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
355 where
356 T: Serialize + DeserializeOwned,
357 {
358 let ids = track_membership(self.location.source_cluster_members(other));
359 let join_tick = self.location.tick();
360 let current_members = ids
361 .snapshot(&join_tick, nondet_membership)
362 .filter(q!(|b| *b))
363 .keys()
364 .assume_ordering(nondet_membership)
365 .collect_vec();
366
367 self.enumerate()
368 .batch(&join_tick, nondet_membership)
369 .cross_singleton(current_members)
370 .map(q!(|(data, members)| (
371 members[data.0 % members.len()],
372 data.1
373 )))
374 .all_ticks()
375 .demux_bincode(other)
376 }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
380 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
381 /// using [`bincode`] to serialize/deserialize messages.
382 ///
383 /// Each cluster member sends its local stream elements, and they are collected at the destination
384 /// as a [`KeyedStream`] where keys identify the source cluster member.
385 ///
386 /// # Example
387 /// ```rust
388 /// # use hydro_lang::prelude::*;
389 /// # use futures::StreamExt;
390 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
391 /// let workers: Cluster<()> = flow.cluster::<()>();
392 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
393 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
394 /// # all_received.entries()
395 /// # }, |mut stream| async move {
396 /// // if there are 4 members in the cluster, we should receive 4 elements
397 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
398 /// # let mut results = Vec::new();
399 /// # for w in 0..4 {
400 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
401 /// # }
402 /// # results.sort();
403 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
404 /// # }));
405 /// ```
406 ///
407 /// If you don't need to know the source for each element, you can use `.values()`
408 /// to get just the data:
409 /// ```rust
410 /// # use hydro_lang::prelude::*;
411 /// # use hydro_lang::live_collections::stream::NoOrder;
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
414 /// # let workers: Cluster<()> = flow.cluster::<()>();
415 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
416 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
417 /// # values
418 /// # }, |mut stream| async move {
419 /// # let mut results = Vec::new();
420 /// # for w in 0..4 {
421 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
422 /// # }
423 /// # results.sort();
424 /// // if there are 4 members in the cluster, we should receive 4 elements
425 /// // 1, 1, 1, 1
426 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
427 /// # }));
428 /// ```
429 pub fn send_bincode<L2>(
430 self,
431 other: &Process<'a, L2>,
432 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
433 where
434 T: Serialize + DeserializeOwned,
435 {
436 let serialize_pipeline = Some(serialize_bincode::<T>(false));
437
438 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
439
440 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
441 other.clone(),
442 HydroNode::Network {
443 serialize_fn: serialize_pipeline.map(|e| e.into()),
444 instantiate_fn: DebugInstantiate::Building,
445 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
446 input: Box::new(self.ir_node.into_inner()),
447 metadata: other.new_node_metadata(Stream::<
448 (MemberId<L>, T),
449 Process<'a, L2>,
450 Unbounded,
451 O,
452 R,
453 >::collection_kind()),
454 },
455 );
456
457 raw_stream.into_keyed()
458 }
459
460 /// Broadcasts elements of this stream at each source member to all members of a destination
461 /// cluster, using [`bincode`] to serialize/deserialize messages.
462 ///
463 /// Each source member sends each of its stream elements to **every** member of the cluster
464 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
465 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
466 /// **only data elements** and sends each element to all cluster members.
467 ///
468 /// # Non-Determinism
469 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
470 /// to the current cluster members known _at that point in time_ at the source member. Depending
471 /// on when each source member is notified of membership changes, it will broadcast each element
472 /// to different members.
473 ///
474 /// # Example
475 /// ```rust
476 /// # use hydro_lang::prelude::*;
477 /// # use hydro_lang::location::MemberId;
478 /// # use futures::StreamExt;
479 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
480 /// # type Source = ();
481 /// # type Destination = ();
482 /// let source: Cluster<Source> = flow.cluster::<Source>();
483 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
484 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
485 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
486 /// # on_destination.entries().send_bincode(&p2).entries()
487 /// // if there are 4 members in the desination, each receives one element from each source member
488 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
489 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
490 /// // - ...
491 /// # }, |mut stream| async move {
492 /// # let mut results = Vec::new();
493 /// # for w in 0..16 {
494 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
495 /// # }
496 /// # results.sort();
497 /// # assert_eq!(results, vec![
498 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
499 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
500 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
501 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
502 /// # ]);
503 /// # }));
504 /// ```
505 pub fn broadcast_bincode<L2: 'a>(
506 self,
507 other: &Cluster<'a, L2>,
508 nondet_membership: NonDet,
509 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
510 where
511 T: Clone + Serialize + DeserializeOwned,
512 {
513 let ids = track_membership(self.location.source_cluster_members(other));
514 let join_tick = self.location.tick();
515 let current_members = ids
516 .snapshot(&join_tick, nondet_membership)
517 .filter(q!(|b| *b));
518
519 self.batch(&join_tick, nondet_membership)
520 .repeat_with_keys(current_members)
521 .all_ticks()
522 .demux_bincode(other)
523 }
524}
525
526impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
527 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
528{
529 /// Sends elements of this stream at each source member to specific members of a destination
530 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
531 ///
532 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
533 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
534 /// this API allows precise targeting of specific cluster members rather than broadcasting to
535 /// all members.
536 ///
537 /// Each cluster member sends its local stream elements, and they are collected at each
538 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
539 ///
540 /// # Example
541 /// ```rust
542 /// # use hydro_lang::prelude::*;
543 /// # use futures::StreamExt;
544 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
545 /// # type Source = ();
546 /// # type Destination = ();
547 /// let source: Cluster<Source> = flow.cluster::<Source>();
548 /// let to_send: Stream<_, Cluster<_>, _> = source
549 /// .source_iter(q!(vec![0, 1, 2, 3]))
550 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
551 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
552 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
553 /// # all_received.entries().send_bincode(&p2).entries()
554 /// # }, |mut stream| async move {
555 /// // if there are 4 members in the destination cluster, each receives one message from each source member
556 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
557 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
558 /// // - ...
559 /// # let mut results = Vec::new();
560 /// # for w in 0..16 {
561 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
562 /// # }
563 /// # results.sort();
564 /// # assert_eq!(results, vec![
565 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
566 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
567 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
568 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
569 /// # ]);
570 /// # }));
571 /// ```
572 pub fn demux_bincode(
573 self,
574 other: &Cluster<'a, L2>,
575 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
576 where
577 T: Serialize + DeserializeOwned,
578 {
579 self.into_keyed().demux_bincode(other)
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use stageleft::q;
586
587 use crate::location::{Location, MemberId};
588 use crate::nondet::nondet;
589 use crate::prelude::FlowBuilder;
590
591 #[test]
592 fn sim_send_bincode_o2o() {
593 let flow = FlowBuilder::new();
594 let external = flow.external::<()>();
595 let node = flow.process::<()>();
596 let node2 = flow.process::<()>();
597
598 let (port, input) = node.source_external_bincode(&external);
599
600 let out_port = input
601 .send_bincode(&node2)
602 .batch(&node2.tick(), nondet!(/** test */))
603 .count()
604 .all_ticks()
605 .send_bincode_external(&external);
606
607 let instances = flow.sim().exhaustive(async |mut compiled| {
608 let in_send = compiled.connect(&port);
609 let out_recv = compiled.connect(&out_port);
610 compiled.launch();
611
612 in_send.send(());
613 in_send.send(());
614 in_send.send(());
615
616 let received = out_recv.collect::<Vec<_>>().await;
617 assert!(received.into_iter().sum::<usize>() == 3);
618 });
619
620 assert_eq!(instances, 4); // 2^{3 - 1}
621 }
622
623 #[test]
624 fn sim_send_bincode_m2o() {
625 let flow = FlowBuilder::new();
626 let external = flow.external::<()>();
627 let cluster = flow.cluster::<()>();
628 let node = flow.process::<()>();
629
630 let input = cluster.source_iter(q!(vec![1]));
631
632 let out_port = input
633 .send_bincode(&node)
634 .entries()
635 .batch(&node.tick(), nondet!(/** test */))
636 .all_ticks()
637 .send_bincode_external(&external);
638
639 let instances =
640 flow.sim()
641 .with_cluster_size(&cluster, 4)
642 .exhaustive(async |mut compiled| {
643 let out_recv = compiled.connect(&out_port);
644 compiled.launch();
645
646 out_recv
647 .assert_yields_only_unordered(vec![
648 (MemberId::from_raw(0), 1),
649 (MemberId::from_raw(1), 1),
650 (MemberId::from_raw(2), 1),
651 (MemberId::from_raw(3), 1),
652 ])
653 .await
654 });
655
656 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
657 }
658
659 #[test]
660 fn sim_send_bincode_multiple_m2o() {
661 let flow = FlowBuilder::new();
662 let external = flow.external::<()>();
663 let cluster1 = flow.cluster::<()>();
664 let cluster2 = flow.cluster::<()>();
665 let node = flow.process::<()>();
666
667 let out_port_1 = cluster1
668 .source_iter(q!(vec![1]))
669 .send_bincode(&node)
670 .entries()
671 .send_bincode_external(&external);
672
673 let out_port_2 = cluster2
674 .source_iter(q!(vec![2]))
675 .send_bincode(&node)
676 .entries()
677 .send_bincode_external(&external);
678
679 let instances = flow
680 .sim()
681 .with_cluster_size(&cluster1, 3)
682 .with_cluster_size(&cluster2, 4)
683 .exhaustive(async |mut compiled| {
684 let out_recv_1 = compiled.connect(&out_port_1);
685 let out_recv_2 = compiled.connect(&out_port_2);
686 compiled.launch();
687
688 out_recv_1
689 .assert_yields_only_unordered(vec![
690 (MemberId::from_raw(0), 1),
691 (MemberId::from_raw(1), 1),
692 (MemberId::from_raw(2), 1),
693 ])
694 .await;
695
696 out_recv_2
697 .assert_yields_only_unordered(vec![
698 (MemberId::from_raw(0), 2),
699 (MemberId::from_raw(1), 2),
700 (MemberId::from_raw(2), 2),
701 (MemberId::from_raw(3), 2),
702 ])
703 .await;
704 });
705
706 assert_eq!(instances, 1);
707 }
708
709 #[test]
710 fn sim_send_bincode_o2m() {
711 let flow = FlowBuilder::new();
712 let external = flow.external::<()>();
713 let cluster = flow.cluster::<()>();
714 let node = flow.process::<()>();
715
716 let input = node.source_iter(q!(vec![
717 (MemberId::from_raw(0), 123),
718 (MemberId::from_raw(1), 456),
719 ]));
720
721 let out_port = input
722 .demux_bincode(&cluster)
723 .map(q!(|x| x + 1))
724 .send_bincode(&node)
725 .entries()
726 .send_bincode_external(&external);
727
728 flow.sim()
729 .with_cluster_size(&cluster, 4)
730 .exhaustive(async |mut compiled| {
731 let out_recv = compiled.connect(&out_port);
732 compiled.launch();
733
734 out_recv
735 .assert_yields_only_unordered(vec![
736 (MemberId::from_raw(0), 124),
737 (MemberId::from_raw(1), 457),
738 ])
739 .await
740 });
741 }
742
743 #[test]
744 fn sim_broadcast_bincode_o2m() {
745 let flow = FlowBuilder::new();
746 let external = flow.external::<()>();
747 let cluster = flow.cluster::<()>();
748 let node = flow.process::<()>();
749
750 let input = node.source_iter(q!(vec![123, 456]));
751
752 let out_port = input
753 .broadcast_bincode(&cluster, nondet!(/** test */))
754 .map(q!(|x| x + 1))
755 .send_bincode(&node)
756 .entries()
757 .send_bincode_external(&external);
758
759 let mut c_1_produced = false;
760 let mut c_2_produced = false;
761
762 flow.sim()
763 .with_cluster_size(&cluster, 2)
764 .exhaustive(async |mut compiled| {
765 let out_recv = compiled.connect(&out_port);
766 compiled.launch();
767
768 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
769
770 // check that order is preserved
771 if all_out.contains(&(MemberId::from_raw(0), 124)) {
772 assert!(all_out.contains(&(MemberId::from_raw(0), 457)));
773 c_1_produced = true;
774 }
775
776 if all_out.contains(&(MemberId::from_raw(1), 124)) {
777 assert!(all_out.contains(&(MemberId::from_raw(1), 457)));
778 c_2_produced = true;
779 }
780 });
781
782 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
783 }
784
785 #[test]
786 fn sim_send_bincode_m2m() {
787 let flow = FlowBuilder::new();
788 let external = flow.external::<()>();
789 let cluster = flow.cluster::<()>();
790 let node = flow.process::<()>();
791
792 let input = node.source_iter(q!(vec![
793 (MemberId::from_raw(0), 123),
794 (MemberId::from_raw(1), 456),
795 ]));
796
797 let out_port = input
798 .demux_bincode(&cluster)
799 .map(q!(|x| x + 1))
800 .flat_map_ordered(q!(|x| vec![
801 (MemberId::from_raw(0), x),
802 (MemberId::from_raw(1), x),
803 ]))
804 .demux_bincode(&cluster)
805 .entries()
806 .send_bincode(&node)
807 .entries()
808 .send_bincode_external(&external);
809
810 flow.sim()
811 .with_cluster_size(&cluster, 4)
812 .exhaustive(async |mut compiled| {
813 let out_recv = compiled.connect(&out_port);
814 compiled.launch();
815
816 out_recv
817 .assert_yields_only_unordered(vec![
818 (MemberId::from_raw(0), (MemberId::from_raw(0), 124)),
819 (MemberId::from_raw(0), (MemberId::from_raw(1), 457)),
820 (MemberId::from_raw(1), (MemberId::from_raw(0), 124)),
821 (MemberId::from_raw(1), (MemberId::from_raw(1), 457)),
822 ])
823 .await
824 });
825 }
826}