hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21#[cfg(feature = "sim")]
22use crate::sim::SimReceiver;
23use crate::staging_util::get_this_crate;
24
25// same as the one in `hydro_std`, but internal use only
26fn track_membership<'a, C, L: Location<'a> + NoTick>(
27 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
28) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
29 membership.fold(
30 q!(|| false),
31 q!(|present, event| {
32 match event {
33 MembershipEvent::Joined => *present = true,
34 MembershipEvent::Left => *present = false,
35 }
36 }),
37 )
38}
39
40fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
41 let root = get_this_crate();
42
43 if is_demux {
44 parse_quote! {
45 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
46 |(id, data)| {
47 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
48 }
49 )
50 }
51 } else {
52 parse_quote! {
53 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
54 |data| {
55 #root::runtime_support::bincode::serialize(&data).unwrap().into()
56 }
57 )
58 }
59 }
60}
61
62pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
63 serialize_bincode_with_type(is_demux, "e_type::<T>())
64}
65
66fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
67 let root = get_this_crate();
68
69 if let Some(c_type) = tagged {
70 parse_quote! {
71 |res| {
72 let (id, b) = res.unwrap();
73 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
74 }
75 }
76 } else {
77 parse_quote! {
78 |res| {
79 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
80 }
81 }
82 }
83}
84
85pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
86 deserialize_bincode_with_type(tagged, "e_type::<T>())
87}
88
89impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
90 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
91 /// using [`bincode`] to serialize/deserialize messages.
92 ///
93 /// The returned stream captures the elements received at the destination, where values will
94 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
95 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
96 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
97 /// dropped no further messages will be sent.
98 ///
99 /// # Example
100 /// ```rust
101 /// # #[cfg(feature = "deploy")] {
102 /// # use hydro_lang::prelude::*;
103 /// # use futures::StreamExt;
104 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
105 /// let p1 = flow.process::<()>();
106 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
107 /// let p2 = flow.process::<()>();
108 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
109 /// // 1, 2, 3
110 /// # on_p2.send_bincode(&p_out)
111 /// # }, |mut stream| async move {
112 /// # for w in 1..=3 {
113 /// # assert_eq!(stream.next().await, Some(w));
114 /// # }
115 /// # }));
116 /// # }
117 /// ```
118 pub fn send_bincode<L2>(
119 self,
120 other: &Process<'a, L2>,
121 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
122 where
123 T: Serialize + DeserializeOwned,
124 {
125 let serialize_pipeline = Some(serialize_bincode::<T>(false));
126
127 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
128
129 Stream::new(
130 other.clone(),
131 HydroNode::Network {
132 serialize_fn: serialize_pipeline.map(|e| e.into()),
133 instantiate_fn: DebugInstantiate::Building,
134 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
135 input: Box::new(self.ir_node.into_inner()),
136 metadata: other.new_node_metadata(
137 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
138 ),
139 },
140 )
141 }
142
143 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
144 /// using [`bincode`] to serialize/deserialize messages.
145 ///
146 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
147 /// membership information. This is a common pattern in distributed systems for broadcasting data to
148 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
149 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
150 /// each element to all cluster members.
151 ///
152 /// # Non-Determinism
153 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
154 /// to the current cluster members _at that point in time_. Depending on when we are notified of
155 /// membership changes, we will broadcast each element to different members.
156 ///
157 /// # Example
158 /// ```rust
159 /// # #[cfg(feature = "deploy")] {
160 /// # use hydro_lang::prelude::*;
161 /// # use futures::StreamExt;
162 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
163 /// let p1 = flow.process::<()>();
164 /// let workers: Cluster<()> = flow.cluster::<()>();
165 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
166 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
167 /// # on_worker.send_bincode(&p2).entries()
168 /// // if there are 4 members in the cluster, each receives one element
169 /// // - MemberId::<()>(0): [123]
170 /// // - MemberId::<()>(1): [123]
171 /// // - MemberId::<()>(2): [123]
172 /// // - MemberId::<()>(3): [123]
173 /// # }, |mut stream| async move {
174 /// # let mut results = Vec::new();
175 /// # for w in 0..4 {
176 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
177 /// # }
178 /// # results.sort();
179 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
180 /// # }));
181 /// # }
182 /// ```
183 pub fn broadcast_bincode<L2: 'a>(
184 self,
185 other: &Cluster<'a, L2>,
186 nondet_membership: NonDet,
187 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
188 where
189 T: Clone + Serialize + DeserializeOwned,
190 {
191 let ids = track_membership(self.location.source_cluster_members(other));
192 let join_tick = self.location.tick();
193 let current_members = ids
194 .snapshot(&join_tick, nondet_membership)
195 .filter(q!(|b| *b));
196
197 self.batch(&join_tick, nondet_membership)
198 .repeat_with_keys(current_members)
199 .all_ticks()
200 .demux_bincode(other)
201 }
202
203 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
204 /// serialization. The external process can receive these elements by establishing a TCP
205 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
206 ///
207 /// # Example
208 /// ```rust
209 /// # #[cfg(feature = "deploy")] {
210 /// # use hydro_lang::prelude::*;
211 /// # use futures::StreamExt;
212 /// # tokio_test::block_on(async move {
213 /// let flow = FlowBuilder::new();
214 /// let process = flow.process::<()>();
215 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
216 /// let external = flow.external::<()>();
217 /// let external_handle = numbers.send_bincode_external(&external);
218 ///
219 /// let mut deployment = hydro_deploy::Deployment::new();
220 /// let nodes = flow
221 /// .with_process(&process, deployment.Localhost())
222 /// .with_external(&external, deployment.Localhost())
223 /// .deploy(&mut deployment);
224 ///
225 /// deployment.deploy().await.unwrap();
226 /// // establish the TCP connection
227 /// let mut external_recv_stream = nodes.connect(external_handle).await;
228 /// deployment.start().await.unwrap();
229 ///
230 /// for w in 1..=3 {
231 /// assert_eq!(external_recv_stream.next().await, Some(w));
232 /// }
233 /// # });
234 /// # }
235 /// ```
236 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
237 where
238 T: Serialize + DeserializeOwned,
239 {
240 let serialize_pipeline = Some(serialize_bincode::<T>(false));
241
242 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
243
244 let external_key = flow_state_borrow.next_external_out;
245 flow_state_borrow.next_external_out += 1;
246
247 flow_state_borrow.push_root(HydroRoot::SendExternal {
248 to_external_id: other.id,
249 to_key: external_key,
250 to_many: false,
251 unpaired: true,
252 serialize_fn: serialize_pipeline.map(|e| e.into()),
253 instantiate_fn: DebugInstantiate::Building,
254 input: Box::new(self.ir_node.into_inner()),
255 op_metadata: HydroIrOpMetadata::new(),
256 });
257
258 ExternalBincodeStream {
259 process_id: other.id,
260 port_id: external_key,
261 _phantom: PhantomData,
262 }
263 }
264
265 #[cfg(feature = "sim")]
266 /// Sets up a simulation output port for this stream, allowing test code to receive elements
267 /// sent to this stream during simulation.
268 pub fn sim_output(self) -> SimReceiver<T, O, R>
269 where
270 T: Serialize + DeserializeOwned,
271 {
272 let external_location: External<'a, ()> = External {
273 id: 0,
274 flow_state: self.location.flow_state().clone(),
275 _phantom: PhantomData,
276 };
277
278 let external = self.send_bincode_external(&external_location);
279
280 SimReceiver(external.port_id, PhantomData)
281 }
282}
283
284impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
285 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
286{
287 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
288 /// using [`bincode`] to serialize/deserialize messages.
289 ///
290 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
291 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
292 /// this API allows precise targeting of specific cluster members rather than broadcasting to
293 /// all members.
294 ///
295 /// # Example
296 /// ```rust
297 /// # #[cfg(feature = "deploy")] {
298 /// # use hydro_lang::prelude::*;
299 /// # use futures::StreamExt;
300 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
301 /// let p1 = flow.process::<()>();
302 /// let workers: Cluster<()> = flow.cluster::<()>();
303 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
304 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
305 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
306 /// .demux_bincode(&workers);
307 /// # on_worker.send_bincode(&p2).entries()
308 /// // if there are 4 members in the cluster, each receives one element
309 /// // - MemberId::<()>(0): [0]
310 /// // - MemberId::<()>(1): [1]
311 /// // - MemberId::<()>(2): [2]
312 /// // - MemberId::<()>(3): [3]
313 /// # }, |mut stream| async move {
314 /// # let mut results = Vec::new();
315 /// # for w in 0..4 {
316 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
317 /// # }
318 /// # results.sort();
319 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
320 /// # }));
321 /// # }
322 /// ```
323 pub fn demux_bincode(
324 self,
325 other: &Cluster<'a, L2>,
326 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
327 where
328 T: Serialize + DeserializeOwned,
329 {
330 self.into_keyed().demux_bincode(other)
331 }
332}
333
334impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
335 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
336 /// [`bincode`] to serialize/deserialize messages.
337 ///
338 /// This provides load balancing by evenly distributing work across cluster members. The
339 /// distribution is deterministic based on element order - the first element goes to member 0,
340 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
341 ///
342 /// # Non-Determinism
343 /// The set of cluster members may asynchronously change over time. Each element is distributed
344 /// based on the current cluster membership _at that point in time_. Depending on when cluster
345 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
346 /// membership is stable, the order of members in the round-robin pattern may change across runs.
347 ///
348 /// # Ordering Requirements
349 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
350 /// order of messages and retries affects the round-robin pattern.
351 ///
352 /// # Example
353 /// ```rust
354 /// # #[cfg(feature = "deploy")] {
355 /// # use hydro_lang::prelude::*;
356 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
357 /// # use futures::StreamExt;
358 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
359 /// let p1 = flow.process::<()>();
360 /// let workers: Cluster<()> = flow.cluster::<()>();
361 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
362 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
363 /// on_worker.send_bincode(&p2)
364 /// # .first().values() // we use first to assert that each member gets one element
365 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
366 /// // - MemberId::<()>(?): [1]
367 /// // - MemberId::<()>(?): [2]
368 /// // - MemberId::<()>(?): [3]
369 /// // - MemberId::<()>(?): [4]
370 /// # }, |mut stream| async move {
371 /// # let mut results = Vec::new();
372 /// # for w in 0..4 {
373 /// # results.push(stream.next().await.unwrap());
374 /// # }
375 /// # results.sort();
376 /// # assert_eq!(results, vec![1, 2, 3, 4]);
377 /// # }));
378 /// # }
379 /// ```
380 pub fn round_robin_bincode<L2: 'a>(
381 self,
382 other: &Cluster<'a, L2>,
383 nondet_membership: NonDet,
384 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
385 where
386 T: Serialize + DeserializeOwned,
387 {
388 let ids = track_membership(self.location.source_cluster_members(other));
389 let join_tick = self.location.tick();
390 let current_members = ids
391 .snapshot(&join_tick, nondet_membership)
392 .filter(q!(|b| *b))
393 .keys()
394 .assume_ordering(nondet_membership)
395 .collect_vec();
396
397 self.enumerate()
398 .batch(&join_tick, nondet_membership)
399 .cross_singleton(current_members)
400 .map(q!(|(data, members)| (
401 members[data.0 % members.len()].clone(),
402 data.1
403 )))
404 .all_ticks()
405 .demux_bincode(other)
406 }
407}
408
409impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
410 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
411 /// [`bincode`] to serialize/deserialize messages.
412 ///
413 /// This provides load balancing by evenly distributing work across cluster members. The
414 /// distribution is deterministic based on element order - the first element goes to member 0,
415 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
416 ///
417 /// # Non-Determinism
418 /// The set of cluster members may asynchronously change over time. Each element is distributed
419 /// based on the current cluster membership _at that point in time_. Depending on when cluster
420 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
421 /// membership is stable, the order of members in the round-robin pattern may change across runs.
422 ///
423 /// # Ordering Requirements
424 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
425 /// order of messages and retries affects the round-robin pattern.
426 ///
427 /// # Example
428 /// ```rust
429 /// # #[cfg(feature = "deploy")] {
430 /// # use hydro_lang::prelude::*;
431 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
432 /// # use hydro_lang::location::MemberId;
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
435 /// let p1 = flow.process::<()>();
436 /// let workers1: Cluster<()> = flow.cluster::<()>();
437 /// let workers2: Cluster<()> = flow.cluster::<()>();
438 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
439 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
440 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
441 /// on_worker2.send_bincode(&p2)
442 /// # .entries()
443 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
444 /// # }, |mut stream| async move {
445 /// # let mut results = Vec::new();
446 /// # let mut locations = std::collections::HashSet::new();
447 /// # for w in 0..=16 {
448 /// # let (location, v) = stream.next().await.unwrap();
449 /// # locations.insert(location);
450 /// # results.push(v);
451 /// # }
452 /// # results.sort();
453 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
454 /// # assert_eq!(locations.len(), 16);
455 /// # }));
456 /// # }
457 /// ```
458 pub fn round_robin_bincode<L2: 'a>(
459 self,
460 other: &Cluster<'a, L2>,
461 nondet_membership: NonDet,
462 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
463 where
464 T: Serialize + DeserializeOwned,
465 {
466 let ids = track_membership(self.location.source_cluster_members(other));
467 let join_tick = self.location.tick();
468 let current_members = ids
469 .snapshot(&join_tick, nondet_membership)
470 .filter(q!(|b| *b))
471 .keys()
472 .assume_ordering(nondet_membership)
473 .collect_vec();
474
475 self.enumerate()
476 .batch(&join_tick, nondet_membership)
477 .cross_singleton(current_members)
478 .map(q!(|(data, members)| (
479 members[data.0 % members.len()].clone(),
480 data.1
481 )))
482 .all_ticks()
483 .demux_bincode(other)
484 }
485}
486
487impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
488 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
489 /// using [`bincode`] to serialize/deserialize messages.
490 ///
491 /// Each cluster member sends its local stream elements, and they are collected at the destination
492 /// as a [`KeyedStream`] where keys identify the source cluster member.
493 ///
494 /// # Example
495 /// ```rust
496 /// # #[cfg(feature = "deploy")] {
497 /// # use hydro_lang::prelude::*;
498 /// # use futures::StreamExt;
499 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
500 /// let workers: Cluster<()> = flow.cluster::<()>();
501 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
502 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
503 /// # all_received.entries()
504 /// # }, |mut stream| async move {
505 /// // if there are 4 members in the cluster, we should receive 4 elements
506 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
507 /// # let mut results = Vec::new();
508 /// # for w in 0..4 {
509 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
510 /// # }
511 /// # results.sort();
512 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
513 /// # }));
514 /// # }
515 /// ```
516 ///
517 /// If you don't need to know the source for each element, you can use `.values()`
518 /// to get just the data:
519 /// ```rust
520 /// # #[cfg(feature = "deploy")] {
521 /// # use hydro_lang::prelude::*;
522 /// # use hydro_lang::live_collections::stream::NoOrder;
523 /// # use futures::StreamExt;
524 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
525 /// # let workers: Cluster<()> = flow.cluster::<()>();
526 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
527 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
528 /// # values
529 /// # }, |mut stream| async move {
530 /// # let mut results = Vec::new();
531 /// # for w in 0..4 {
532 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
533 /// # }
534 /// # results.sort();
535 /// // if there are 4 members in the cluster, we should receive 4 elements
536 /// // 1, 1, 1, 1
537 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
538 /// # }));
539 /// # }
540 /// ```
541 pub fn send_bincode<L2>(
542 self,
543 other: &Process<'a, L2>,
544 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
545 where
546 T: Serialize + DeserializeOwned,
547 {
548 let serialize_pipeline = Some(serialize_bincode::<T>(false));
549
550 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
551
552 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
553 other.clone(),
554 HydroNode::Network {
555 serialize_fn: serialize_pipeline.map(|e| e.into()),
556 instantiate_fn: DebugInstantiate::Building,
557 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
558 input: Box::new(self.ir_node.into_inner()),
559 metadata: other.new_node_metadata(Stream::<
560 (MemberId<L>, T),
561 Process<'a, L2>,
562 Unbounded,
563 O,
564 R,
565 >::collection_kind()),
566 },
567 );
568
569 raw_stream.into_keyed()
570 }
571
572 /// Broadcasts elements of this stream at each source member to all members of a destination
573 /// cluster, using [`bincode`] to serialize/deserialize messages.
574 ///
575 /// Each source member sends each of its stream elements to **every** member of the cluster
576 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
577 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
578 /// **only data elements** and sends each element to all cluster members.
579 ///
580 /// # Non-Determinism
581 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
582 /// to the current cluster members known _at that point in time_ at the source member. Depending
583 /// on when each source member is notified of membership changes, it will broadcast each element
584 /// to different members.
585 ///
586 /// # Example
587 /// ```rust
588 /// # #[cfg(feature = "deploy")] {
589 /// # use hydro_lang::prelude::*;
590 /// # use hydro_lang::location::MemberId;
591 /// # use futures::StreamExt;
592 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
593 /// # type Source = ();
594 /// # type Destination = ();
595 /// let source: Cluster<Source> = flow.cluster::<Source>();
596 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
597 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
598 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
599 /// # on_destination.entries().send_bincode(&p2).entries()
600 /// // if there are 4 members in the desination, each receives one element from each source member
601 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
602 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
603 /// // - ...
604 /// # }, |mut stream| async move {
605 /// # let mut results = Vec::new();
606 /// # for w in 0..16 {
607 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
608 /// # }
609 /// # results.sort();
610 /// # assert_eq!(results, vec![
611 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
612 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
613 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
614 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
615 /// # ]);
616 /// # }));
617 /// # }
618 /// ```
619 pub fn broadcast_bincode<L2: 'a>(
620 self,
621 other: &Cluster<'a, L2>,
622 nondet_membership: NonDet,
623 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
624 where
625 T: Clone + Serialize + DeserializeOwned,
626 {
627 let ids = track_membership(self.location.source_cluster_members(other));
628 let join_tick = self.location.tick();
629 let current_members = ids
630 .snapshot(&join_tick, nondet_membership)
631 .filter(q!(|b| *b));
632
633 self.batch(&join_tick, nondet_membership)
634 .repeat_with_keys(current_members)
635 .all_ticks()
636 .demux_bincode(other)
637 }
638}
639
640impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
641 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
642{
643 /// Sends elements of this stream at each source member to specific members of a destination
644 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
645 ///
646 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
647 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
648 /// this API allows precise targeting of specific cluster members rather than broadcasting to
649 /// all members.
650 ///
651 /// Each cluster member sends its local stream elements, and they are collected at each
652 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
653 ///
654 /// # Example
655 /// ```rust
656 /// # #[cfg(feature = "deploy")] {
657 /// # use hydro_lang::prelude::*;
658 /// # use futures::StreamExt;
659 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
660 /// # type Source = ();
661 /// # type Destination = ();
662 /// let source: Cluster<Source> = flow.cluster::<Source>();
663 /// let to_send: Stream<_, Cluster<_>, _> = source
664 /// .source_iter(q!(vec![0, 1, 2, 3]))
665 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
666 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
667 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
668 /// # all_received.entries().send_bincode(&p2).entries()
669 /// # }, |mut stream| async move {
670 /// // if there are 4 members in the destination cluster, each receives one message from each source member
671 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
672 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
673 /// // - ...
674 /// # let mut results = Vec::new();
675 /// # for w in 0..16 {
676 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
677 /// # }
678 /// # results.sort();
679 /// # assert_eq!(results, vec![
680 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
681 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
682 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
683 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
684 /// # ]);
685 /// # }));
686 /// # }
687 /// ```
688 pub fn demux_bincode(
689 self,
690 other: &Cluster<'a, L2>,
691 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
692 where
693 T: Serialize + DeserializeOwned,
694 {
695 self.into_keyed().demux_bincode(other)
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 #[cfg(feature = "sim")]
702 use stageleft::q;
703
704 #[cfg(feature = "sim")]
705 use crate::location::{Location, MemberId};
706 #[cfg(feature = "sim")]
707 use crate::nondet::nondet;
708 #[cfg(feature = "sim")]
709 use crate::prelude::FlowBuilder;
710
711 #[cfg(feature = "sim")]
712 #[test]
713 fn sim_send_bincode_o2o() {
714 let flow = FlowBuilder::new();
715 let node = flow.process::<()>();
716 let node2 = flow.process::<()>();
717
718 let (in_send, input) = node.sim_input();
719
720 let out_recv = input
721 .send_bincode(&node2)
722 .batch(&node2.tick(), nondet!(/** test */))
723 .count()
724 .all_ticks()
725 .sim_output();
726
727 let instances = flow.sim().exhaustive(async || {
728 in_send.send(());
729 in_send.send(());
730 in_send.send(());
731
732 let received = out_recv.collect::<Vec<_>>().await;
733 assert!(received.into_iter().sum::<usize>() == 3);
734 });
735
736 assert_eq!(instances, 4); // 2^{3 - 1}
737 }
738
739 #[cfg(feature = "sim")]
740 #[test]
741 fn sim_send_bincode_m2o() {
742 let flow = FlowBuilder::new();
743 let cluster = flow.cluster::<()>();
744 let node = flow.process::<()>();
745
746 let input = cluster.source_iter(q!(vec![1]));
747
748 let out_recv = input
749 .send_bincode(&node)
750 .entries()
751 .batch(&node.tick(), nondet!(/** test */))
752 .all_ticks()
753 .sim_output();
754
755 let instances = flow
756 .sim()
757 .with_cluster_size(&cluster, 4)
758 .exhaustive(async || {
759 out_recv
760 .assert_yields_only_unordered(vec![
761 (MemberId::from_raw_id(0), 1),
762 (MemberId::from_raw_id(1), 1),
763 (MemberId::from_raw_id(2), 1),
764 (MemberId::from_raw_id(3), 1),
765 ])
766 .await
767 });
768
769 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
770 }
771
772 #[cfg(feature = "sim")]
773 #[test]
774 fn sim_send_bincode_multiple_m2o() {
775 let flow = FlowBuilder::new();
776 let cluster1 = flow.cluster::<()>();
777 let cluster2 = flow.cluster::<()>();
778 let node = flow.process::<()>();
779
780 let out_recv_1 = cluster1
781 .source_iter(q!(vec![1]))
782 .send_bincode(&node)
783 .entries()
784 .sim_output();
785
786 let out_recv_2 = cluster2
787 .source_iter(q!(vec![2]))
788 .send_bincode(&node)
789 .entries()
790 .sim_output();
791
792 let instances = flow
793 .sim()
794 .with_cluster_size(&cluster1, 3)
795 .with_cluster_size(&cluster2, 4)
796 .exhaustive(async || {
797 out_recv_1
798 .assert_yields_only_unordered(vec![
799 (MemberId::from_raw_id(0), 1),
800 (MemberId::from_raw_id(1), 1),
801 (MemberId::from_raw_id(2), 1),
802 ])
803 .await;
804
805 out_recv_2
806 .assert_yields_only_unordered(vec![
807 (MemberId::from_raw_id(0), 2),
808 (MemberId::from_raw_id(1), 2),
809 (MemberId::from_raw_id(2), 2),
810 (MemberId::from_raw_id(3), 2),
811 ])
812 .await;
813 });
814
815 assert_eq!(instances, 1);
816 }
817
818 #[cfg(feature = "sim")]
819 #[test]
820 fn sim_send_bincode_o2m() {
821 let flow = FlowBuilder::new();
822 let cluster = flow.cluster::<()>();
823 let node = flow.process::<()>();
824
825 let input = node.source_iter(q!(vec![
826 (MemberId::from_raw_id(0), 123),
827 (MemberId::from_raw_id(1), 456),
828 ]));
829
830 let out_recv = input
831 .demux_bincode(&cluster)
832 .map(q!(|x| x + 1))
833 .send_bincode(&node)
834 .entries()
835 .sim_output();
836
837 flow.sim()
838 .with_cluster_size(&cluster, 4)
839 .exhaustive(async || {
840 out_recv
841 .assert_yields_only_unordered(vec![
842 (MemberId::from_raw_id(0), 124),
843 (MemberId::from_raw_id(1), 457),
844 ])
845 .await
846 });
847 }
848
849 #[cfg(feature = "sim")]
850 #[test]
851 fn sim_broadcast_bincode_o2m() {
852 let flow = FlowBuilder::new();
853 let cluster = flow.cluster::<()>();
854 let node = flow.process::<()>();
855
856 let input = node.source_iter(q!(vec![123, 456]));
857
858 let out_recv = input
859 .broadcast_bincode(&cluster, nondet!(/** test */))
860 .map(q!(|x| x + 1))
861 .send_bincode(&node)
862 .entries()
863 .sim_output();
864
865 let mut c_1_produced = false;
866 let mut c_2_produced = false;
867
868 flow.sim()
869 .with_cluster_size(&cluster, 2)
870 .exhaustive(async || {
871 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
872
873 // check that order is preserved
874 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
875 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
876 c_1_produced = true;
877 }
878
879 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
880 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
881 c_2_produced = true;
882 }
883 });
884
885 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
886 }
887
888 #[cfg(feature = "sim")]
889 #[test]
890 fn sim_send_bincode_m2m() {
891 let flow = FlowBuilder::new();
892 let cluster = flow.cluster::<()>();
893 let node = flow.process::<()>();
894
895 let input = node.source_iter(q!(vec![
896 (MemberId::from_raw_id(0), 123),
897 (MemberId::from_raw_id(1), 456),
898 ]));
899
900 let out_recv = input
901 .demux_bincode(&cluster)
902 .map(q!(|x| x + 1))
903 .flat_map_ordered(q!(|x| vec![
904 (MemberId::from_raw_id(0), x),
905 (MemberId::from_raw_id(1), x),
906 ]))
907 .demux_bincode(&cluster)
908 .entries()
909 .send_bincode(&node)
910 .entries()
911 .sim_output();
912
913 flow.sim()
914 .with_cluster_size(&cluster, 4)
915 .exhaustive(async || {
916 out_recv
917 .assert_yields_only_unordered(vec![
918 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
919 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
920 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
921 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
922 ])
923 .await
924 });
925 }
926}