hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::DynLocation;
19use crate::location::external_process::ExternalBincodeStream;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::networking::{NetworkFor, TCP};
22use crate::nondet::NonDet;
23#[cfg(feature = "sim")]
24use crate::sim::SimReceiver;
25use crate::staging_util::get_this_crate;
26
27// same as the one in `hydro_std`, but internal use only
28fn track_membership<'a, C, L: Location<'a> + NoTick>(
29 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
30) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
31 membership.fold(
32 q!(|| false),
33 q!(|present, event| {
34 match event {
35 MembershipEvent::Joined => *present = true,
36 MembershipEvent::Left => *present = false,
37 }
38 }),
39 )
40}
41
42fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
43 let root = get_this_crate();
44
45 if is_demux {
46 parse_quote! {
47 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
48 |(id, data)| {
49 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
50 }
51 )
52 }
53 } else {
54 parse_quote! {
55 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
56 |data| {
57 #root::runtime_support::bincode::serialize(&data).unwrap().into()
58 }
59 )
60 }
61 }
62}
63
64pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
65 serialize_bincode_with_type(is_demux, "e_type::<T>())
66}
67
68fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
69 let root = get_this_crate();
70 if let Some(c_type) = tagged {
71 parse_quote! {
72 |res| {
73 let (id, b) = res.unwrap();
74 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75 }
76 }
77 } else {
78 parse_quote! {
79 |res| {
80 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81 }
82 }
83 }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87 deserialize_bincode_with_type(tagged, "e_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91 #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
92 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
93 /// using [`bincode`] to serialize/deserialize messages.
94 ///
95 /// The returned stream captures the elements received at the destination, where values will
96 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
97 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
98 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
99 /// dropped no further messages will be sent.
100 ///
101 /// # Example
102 /// ```rust
103 /// # #[cfg(feature = "deploy")] {
104 /// # use hydro_lang::prelude::*;
105 /// # use futures::StreamExt;
106 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
107 /// let p1 = flow.process::<()>();
108 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
109 /// let p2 = flow.process::<()>();
110 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
111 /// // 1, 2, 3
112 /// # on_p2.send_bincode(&p_out)
113 /// # }, |mut stream| async move {
114 /// # for w in 1..=3 {
115 /// # assert_eq!(stream.next().await, Some(w));
116 /// # }
117 /// # }));
118 /// # }
119 /// ```
120 pub fn send_bincode<L2>(
121 self,
122 other: &Process<'a, L2>,
123 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
124 where
125 T: Serialize + DeserializeOwned,
126 {
127 self.send(other, TCP.bincode())
128 }
129
130 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
131 /// using the configuration in `via` to set up the message transport.
132 ///
133 /// The returned stream captures the elements received at the destination, where values will
134 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
135 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
136 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
137 /// dropped no further messages will be sent.
138 ///
139 /// # Example
140 /// ```rust
141 /// # #[cfg(feature = "deploy")] {
142 /// # use hydro_lang::prelude::*;
143 /// # use futures::StreamExt;
144 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
145 /// let p1 = flow.process::<()>();
146 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
147 /// let p2 = flow.process::<()>();
148 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
149 /// // 1, 2, 3
150 /// # on_p2.send(&p_out, TCP.bincode())
151 /// # }, |mut stream| async move {
152 /// # for w in 1..=3 {
153 /// # assert_eq!(stream.next().await, Some(w));
154 /// # }
155 /// # }));
156 /// # }
157 /// ```
158 pub fn send<L2, N: NetworkFor<T>>(
159 self,
160 to: &Process<'a, L2>,
161 via: N,
162 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
163 where
164 T: Serialize + DeserializeOwned,
165 {
166 let _ = via;
167 let serialize_pipeline = Some(N::serialize_thunk(false));
168 let deserialize_pipeline = Some(N::deserialize_thunk(None));
169
170 Stream::new(
171 to.clone(),
172 HydroNode::Network {
173 serialize_fn: serialize_pipeline.map(|e| e.into()),
174 instantiate_fn: DebugInstantiate::Building,
175 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
176 input: Box::new(self.ir_node.into_inner()),
177 metadata: to.new_node_metadata(
178 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
179 ),
180 },
181 )
182 }
183
184 #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
185 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
186 /// using [`bincode`] to serialize/deserialize messages.
187 ///
188 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
189 /// membership information. This is a common pattern in distributed systems for broadcasting data to
190 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
191 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
192 /// each element to all cluster members.
193 ///
194 /// # Non-Determinism
195 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
196 /// to the current cluster members _at that point in time_. Depending on when we are notified of
197 /// membership changes, we will broadcast each element to different members.
198 ///
199 /// # Example
200 /// ```rust
201 /// # #[cfg(feature = "deploy")] {
202 /// # use hydro_lang::prelude::*;
203 /// # use futures::StreamExt;
204 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
205 /// let p1 = flow.process::<()>();
206 /// let workers: Cluster<()> = flow.cluster::<()>();
207 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
208 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
209 /// # on_worker.send_bincode(&p2).entries()
210 /// // if there are 4 members in the cluster, each receives one element
211 /// // - MemberId::<()>(0): [123]
212 /// // - MemberId::<()>(1): [123]
213 /// // - MemberId::<()>(2): [123]
214 /// // - MemberId::<()>(3): [123]
215 /// # }, |mut stream| async move {
216 /// # let mut results = Vec::new();
217 /// # for w in 0..4 {
218 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
219 /// # }
220 /// # results.sort();
221 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
222 /// # }));
223 /// # }
224 /// ```
225 pub fn broadcast_bincode<L2: 'a>(
226 self,
227 other: &Cluster<'a, L2>,
228 nondet_membership: NonDet,
229 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
230 where
231 T: Clone + Serialize + DeserializeOwned,
232 {
233 self.broadcast(other, TCP.bincode(), nondet_membership)
234 }
235
236 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
237 /// using the configuration in `via` to set up the message transport.
238 ///
239 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
240 /// membership information. This is a common pattern in distributed systems for broadcasting data to
241 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
242 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
243 /// each element to all cluster members.
244 ///
245 /// # Non-Determinism
246 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
247 /// to the current cluster members _at that point in time_. Depending on when we are notified of
248 /// membership changes, we will broadcast each element to different members.
249 ///
250 /// # Example
251 /// ```rust
252 /// # #[cfg(feature = "deploy")] {
253 /// # use hydro_lang::prelude::*;
254 /// # use futures::StreamExt;
255 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
256 /// let p1 = flow.process::<()>();
257 /// let workers: Cluster<()> = flow.cluster::<()>();
258 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
259 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
260 /// # on_worker.send(&p2, TCP.bincode()).entries()
261 /// // if there are 4 members in the cluster, each receives one element
262 /// // - MemberId::<()>(0): [123]
263 /// // - MemberId::<()>(1): [123]
264 /// // - MemberId::<()>(2): [123]
265 /// // - MemberId::<()>(3): [123]
266 /// # }, |mut stream| async move {
267 /// # let mut results = Vec::new();
268 /// # for w in 0..4 {
269 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
270 /// # }
271 /// # results.sort();
272 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
273 /// # }));
274 /// # }
275 /// ```
276 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
277 self,
278 to: &Cluster<'a, L2>,
279 via: N,
280 nondet_membership: NonDet,
281 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
282 where
283 T: Clone + Serialize + DeserializeOwned,
284 {
285 let ids = track_membership(self.location.source_cluster_members(to));
286 sliced! {
287 let members_snapshot = use(ids, nondet_membership);
288 let elements = use(self, nondet_membership);
289
290 let current_members = members_snapshot.filter(q!(|b| *b));
291 elements.repeat_with_keys(current_members)
292 }
293 .demux(to, via)
294 }
295
296 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
297 /// serialization. The external process can receive these elements by establishing a TCP
298 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
299 ///
300 /// # Example
301 /// ```rust
302 /// # #[cfg(feature = "deploy")] {
303 /// # use hydro_lang::prelude::*;
304 /// # use futures::StreamExt;
305 /// # tokio_test::block_on(async move {
306 /// let flow = FlowBuilder::new();
307 /// let process = flow.process::<()>();
308 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
309 /// let external = flow.external::<()>();
310 /// let external_handle = numbers.send_bincode_external(&external);
311 ///
312 /// let mut deployment = hydro_deploy::Deployment::new();
313 /// let nodes = flow
314 /// .with_process(&process, deployment.Localhost())
315 /// .with_external(&external, deployment.Localhost())
316 /// .deploy(&mut deployment);
317 ///
318 /// deployment.deploy().await.unwrap();
319 /// // establish the TCP connection
320 /// let mut external_recv_stream = nodes.connect(external_handle).await;
321 /// deployment.start().await.unwrap();
322 ///
323 /// for w in 1..=3 {
324 /// assert_eq!(external_recv_stream.next().await, Some(w));
325 /// }
326 /// # });
327 /// # }
328 /// ```
329 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
330 where
331 T: Serialize + DeserializeOwned,
332 {
333 let serialize_pipeline = Some(serialize_bincode::<T>(false));
334
335 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
336
337 let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
338
339 flow_state_borrow.push_root(HydroRoot::SendExternal {
340 to_external_id: other.id,
341 to_port_id: external_port_id,
342 to_many: false,
343 unpaired: true,
344 serialize_fn: serialize_pipeline.map(|e| e.into()),
345 instantiate_fn: DebugInstantiate::Building,
346 input: Box::new(self.ir_node.into_inner()),
347 op_metadata: HydroIrOpMetadata::new(),
348 });
349
350 ExternalBincodeStream {
351 process_id: other.id,
352 port_id: external_port_id,
353 _phantom: PhantomData,
354 }
355 }
356
357 #[cfg(feature = "sim")]
358 /// Sets up a simulation output port for this stream, allowing test code to receive elements
359 /// sent to this stream during simulation.
360 pub fn sim_output(self) -> SimReceiver<T, O, R>
361 where
362 T: Serialize + DeserializeOwned,
363 {
364 let external_location: External<'a, ()> = External {
365 id: 0,
366 flow_state: self.location.flow_state().clone(),
367 _phantom: PhantomData,
368 };
369
370 let external = self.send_bincode_external(&external_location);
371
372 SimReceiver(external.port_id, PhantomData)
373 }
374}
375
376impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
377 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
378{
379 #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
380 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
381 /// using [`bincode`] to serialize/deserialize messages.
382 ///
383 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
384 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
385 /// this API allows precise targeting of specific cluster members rather than broadcasting to
386 /// all members.
387 ///
388 /// # Example
389 /// ```rust
390 /// # #[cfg(feature = "deploy")] {
391 /// # use hydro_lang::prelude::*;
392 /// # use futures::StreamExt;
393 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
394 /// let p1 = flow.process::<()>();
395 /// let workers: Cluster<()> = flow.cluster::<()>();
396 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
397 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
398 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
399 /// .demux_bincode(&workers);
400 /// # on_worker.send_bincode(&p2).entries()
401 /// // if there are 4 members in the cluster, each receives one element
402 /// // - MemberId::<()>(0): [0]
403 /// // - MemberId::<()>(1): [1]
404 /// // - MemberId::<()>(2): [2]
405 /// // - MemberId::<()>(3): [3]
406 /// # }, |mut stream| async move {
407 /// # let mut results = Vec::new();
408 /// # for w in 0..4 {
409 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
410 /// # }
411 /// # results.sort();
412 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
413 /// # }));
414 /// # }
415 /// ```
416 pub fn demux_bincode(
417 self,
418 other: &Cluster<'a, L2>,
419 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
420 where
421 T: Serialize + DeserializeOwned,
422 {
423 self.demux(other, TCP.bincode())
424 }
425
426 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
427 /// using the configuration in `via` to set up the message transport.
428 ///
429 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
430 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
431 /// this API allows precise targeting of specific cluster members rather than broadcasting to
432 /// all members.
433 ///
434 /// # Example
435 /// ```rust
436 /// # #[cfg(feature = "deploy")] {
437 /// # use hydro_lang::prelude::*;
438 /// # use futures::StreamExt;
439 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
440 /// let p1 = flow.process::<()>();
441 /// let workers: Cluster<()> = flow.cluster::<()>();
442 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
443 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
444 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
445 /// .demux(&workers, TCP.bincode());
446 /// # on_worker.send(&p2, TCP.bincode()).entries()
447 /// // if there are 4 members in the cluster, each receives one element
448 /// // - MemberId::<()>(0): [0]
449 /// // - MemberId::<()>(1): [1]
450 /// // - MemberId::<()>(2): [2]
451 /// // - MemberId::<()>(3): [3]
452 /// # }, |mut stream| async move {
453 /// # let mut results = Vec::new();
454 /// # for w in 0..4 {
455 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
456 /// # }
457 /// # results.sort();
458 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
459 /// # }));
460 /// # }
461 /// ```
462 pub fn demux<N: NetworkFor<T>>(
463 self,
464 to: &Cluster<'a, L2>,
465 via: N,
466 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
467 where
468 T: Serialize + DeserializeOwned,
469 {
470 self.into_keyed().demux(to, via)
471 }
472}
473
474impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
475 #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
476 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
477 /// [`bincode`] to serialize/deserialize messages.
478 ///
479 /// This provides load balancing by evenly distributing work across cluster members. The
480 /// distribution is deterministic based on element order - the first element goes to member 0,
481 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
482 ///
483 /// # Non-Determinism
484 /// The set of cluster members may asynchronously change over time. Each element is distributed
485 /// based on the current cluster membership _at that point in time_. Depending on when cluster
486 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
487 /// membership is stable, the order of members in the round-robin pattern may change across runs.
488 ///
489 /// # Ordering Requirements
490 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
491 /// order of messages and retries affects the round-robin pattern.
492 ///
493 /// # Example
494 /// ```rust
495 /// # #[cfg(feature = "deploy")] {
496 /// # use hydro_lang::prelude::*;
497 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
498 /// # use futures::StreamExt;
499 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
500 /// let p1 = flow.process::<()>();
501 /// let workers: Cluster<()> = flow.cluster::<()>();
502 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
503 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
504 /// on_worker.send_bincode(&p2)
505 /// # .first().values() // we use first to assert that each member gets one element
506 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
507 /// // - MemberId::<()>(?): [1]
508 /// // - MemberId::<()>(?): [2]
509 /// // - MemberId::<()>(?): [3]
510 /// // - MemberId::<()>(?): [4]
511 /// # }, |mut stream| async move {
512 /// # let mut results = Vec::new();
513 /// # for w in 0..4 {
514 /// # results.push(stream.next().await.unwrap());
515 /// # }
516 /// # results.sort();
517 /// # assert_eq!(results, vec![1, 2, 3, 4]);
518 /// # }));
519 /// # }
520 /// ```
521 pub fn round_robin_bincode<L2: 'a>(
522 self,
523 other: &Cluster<'a, L2>,
524 nondet_membership: NonDet,
525 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
526 where
527 T: Serialize + DeserializeOwned,
528 {
529 self.round_robin(other, TCP.bincode(), nondet_membership)
530 }
531
532 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
533 /// the configuration in `via` to set up the message transport.
534 ///
535 /// This provides load balancing by evenly distributing work across cluster members. The
536 /// distribution is deterministic based on element order - the first element goes to member 0,
537 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
538 ///
539 /// # Non-Determinism
540 /// The set of cluster members may asynchronously change over time. Each element is distributed
541 /// based on the current cluster membership _at that point in time_. Depending on when cluster
542 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
543 /// membership is stable, the order of members in the round-robin pattern may change across runs.
544 ///
545 /// # Ordering Requirements
546 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
547 /// order of messages and retries affects the round-robin pattern.
548 ///
549 /// # Example
550 /// ```rust
551 /// # #[cfg(feature = "deploy")] {
552 /// # use hydro_lang::prelude::*;
553 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
554 /// # use futures::StreamExt;
555 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
556 /// let p1 = flow.process::<()>();
557 /// let workers: Cluster<()> = flow.cluster::<()>();
558 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
559 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
560 /// on_worker.send(&p2, TCP.bincode())
561 /// # .first().values() // we use first to assert that each member gets one element
562 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
563 /// // - MemberId::<()>(?): [1]
564 /// // - MemberId::<()>(?): [2]
565 /// // - MemberId::<()>(?): [3]
566 /// // - MemberId::<()>(?): [4]
567 /// # }, |mut stream| async move {
568 /// # let mut results = Vec::new();
569 /// # for w in 0..4 {
570 /// # results.push(stream.next().await.unwrap());
571 /// # }
572 /// # results.sort();
573 /// # assert_eq!(results, vec![1, 2, 3, 4]);
574 /// # }));
575 /// # }
576 /// ```
577 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
578 self,
579 to: &Cluster<'a, L2>,
580 via: N,
581 nondet_membership: NonDet,
582 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
583 where
584 T: Serialize + DeserializeOwned,
585 {
586 let ids = track_membership(self.location.source_cluster_members(to));
587 sliced! {
588 let members_snapshot = use(ids, nondet_membership);
589 let elements = use(self.enumerate(), nondet_membership);
590
591 let current_members = members_snapshot
592 .filter(q!(|b| *b))
593 .keys()
594 .assume_ordering(nondet_membership)
595 .collect_vec();
596
597 elements
598 .cross_singleton(current_members)
599 .map(q!(|(data, members)| (
600 members[data.0 % members.len()].clone(),
601 data.1
602 )))
603 }
604 .demux(to, via)
605 }
606}
607
608impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
609 #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
610 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
611 /// [`bincode`] to serialize/deserialize messages.
612 ///
613 /// This provides load balancing by evenly distributing work across cluster members. The
614 /// distribution is deterministic based on element order - the first element goes to member 0,
615 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
616 ///
617 /// # Non-Determinism
618 /// The set of cluster members may asynchronously change over time. Each element is distributed
619 /// based on the current cluster membership _at that point in time_. Depending on when cluster
620 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
621 /// membership is stable, the order of members in the round-robin pattern may change across runs.
622 ///
623 /// # Ordering Requirements
624 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
625 /// order of messages and retries affects the round-robin pattern.
626 ///
627 /// # Example
628 /// ```rust
629 /// # #[cfg(feature = "deploy")] {
630 /// # use hydro_lang::prelude::*;
631 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
632 /// # use hydro_lang::location::MemberId;
633 /// # use futures::StreamExt;
634 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
635 /// let p1 = flow.process::<()>();
636 /// let workers1: Cluster<()> = flow.cluster::<()>();
637 /// let workers2: Cluster<()> = flow.cluster::<()>();
638 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
639 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
640 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
641 /// on_worker2.send_bincode(&p2)
642 /// # .entries()
643 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
644 /// # }, |mut stream| async move {
645 /// # let mut results = Vec::new();
646 /// # let mut locations = std::collections::HashSet::new();
647 /// # for w in 0..=16 {
648 /// # let (location, v) = stream.next().await.unwrap();
649 /// # locations.insert(location);
650 /// # results.push(v);
651 /// # }
652 /// # results.sort();
653 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
654 /// # assert_eq!(locations.len(), 16);
655 /// # }));
656 /// # }
657 /// ```
658 pub fn round_robin_bincode<L2: 'a>(
659 self,
660 other: &Cluster<'a, L2>,
661 nondet_membership: NonDet,
662 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
663 where
664 T: Serialize + DeserializeOwned,
665 {
666 self.round_robin(other, TCP.bincode(), nondet_membership)
667 }
668
669 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
670 /// the configuration in `via` to set up the message transport.
671 ///
672 /// This provides load balancing by evenly distributing work across cluster members. The
673 /// distribution is deterministic based on element order - the first element goes to member 0,
674 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
675 ///
676 /// # Non-Determinism
677 /// The set of cluster members may asynchronously change over time. Each element is distributed
678 /// based on the current cluster membership _at that point in time_. Depending on when cluster
679 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
680 /// membership is stable, the order of members in the round-robin pattern may change across runs.
681 ///
682 /// # Ordering Requirements
683 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
684 /// order of messages and retries affects the round-robin pattern.
685 ///
686 /// # Example
687 /// ```rust
688 /// # #[cfg(feature = "deploy")] {
689 /// # use hydro_lang::prelude::*;
690 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
691 /// # use hydro_lang::location::MemberId;
692 /// # use futures::StreamExt;
693 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
694 /// let p1 = flow.process::<()>();
695 /// let workers1: Cluster<()> = flow.cluster::<()>();
696 /// let workers2: Cluster<()> = flow.cluster::<()>();
697 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
698 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.bincode(), nondet!(/** assuming stable membership */));
699 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
700 /// on_worker2.send(&p2, TCP.bincode())
701 /// # .entries()
702 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
703 /// # }, |mut stream| async move {
704 /// # let mut results = Vec::new();
705 /// # let mut locations = std::collections::HashSet::new();
706 /// # for w in 0..=16 {
707 /// # let (location, v) = stream.next().await.unwrap();
708 /// # locations.insert(location);
709 /// # results.push(v);
710 /// # }
711 /// # results.sort();
712 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
713 /// # assert_eq!(locations.len(), 16);
714 /// # }));
715 /// # }
716 /// ```
717 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
718 self,
719 to: &Cluster<'a, L2>,
720 via: N,
721 nondet_membership: NonDet,
722 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
723 where
724 T: Serialize + DeserializeOwned,
725 {
726 let ids = track_membership(self.location.source_cluster_members(to));
727 sliced! {
728 let members_snapshot = use(ids, nondet_membership);
729 let elements = use(self.enumerate(), nondet_membership);
730
731 let current_members = members_snapshot
732 .filter(q!(|b| *b))
733 .keys()
734 .assume_ordering(nondet_membership)
735 .collect_vec();
736
737 elements
738 .cross_singleton(current_members)
739 .map(q!(|(data, members)| (
740 members[data.0 % members.len()].clone(),
741 data.1
742 )))
743 }
744 .demux(to, via)
745 }
746}
747
748impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
749 #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
750 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
751 /// using [`bincode`] to serialize/deserialize messages.
752 ///
753 /// Each cluster member sends its local stream elements, and they are collected at the destination
754 /// as a [`KeyedStream`] where keys identify the source cluster member.
755 ///
756 /// # Example
757 /// ```rust
758 /// # #[cfg(feature = "deploy")] {
759 /// # use hydro_lang::prelude::*;
760 /// # use futures::StreamExt;
761 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
762 /// let workers: Cluster<()> = flow.cluster::<()>();
763 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
764 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
765 /// # all_received.entries()
766 /// # }, |mut stream| async move {
767 /// // if there are 4 members in the cluster, we should receive 4 elements
768 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
769 /// # let mut results = Vec::new();
770 /// # for w in 0..4 {
771 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
772 /// # }
773 /// # results.sort();
774 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
775 /// # }));
776 /// # }
777 /// ```
778 ///
779 /// If you don't need to know the source for each element, you can use `.values()`
780 /// to get just the data:
781 /// ```rust
782 /// # #[cfg(feature = "deploy")] {
783 /// # use hydro_lang::prelude::*;
784 /// # use hydro_lang::live_collections::stream::NoOrder;
785 /// # use futures::StreamExt;
786 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
787 /// # let workers: Cluster<()> = flow.cluster::<()>();
788 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
789 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
790 /// # values
791 /// # }, |mut stream| async move {
792 /// # let mut results = Vec::new();
793 /// # for w in 0..4 {
794 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
795 /// # }
796 /// # results.sort();
797 /// // if there are 4 members in the cluster, we should receive 4 elements
798 /// // 1, 1, 1, 1
799 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
800 /// # }));
801 /// # }
802 /// ```
803 pub fn send_bincode<L2>(
804 self,
805 other: &Process<'a, L2>,
806 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
807 where
808 T: Serialize + DeserializeOwned,
809 {
810 self.send(other, TCP.bincode())
811 }
812
813 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
814 /// using the configuration in `via` to set up the message transport.
815 ///
816 /// Each cluster member sends its local stream elements, and they are collected at the destination
817 /// as a [`KeyedStream`] where keys identify the source cluster member.
818 ///
819 /// # Example
820 /// ```rust
821 /// # #[cfg(feature = "deploy")] {
822 /// # use hydro_lang::prelude::*;
823 /// # use futures::StreamExt;
824 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
825 /// let workers: Cluster<()> = flow.cluster::<()>();
826 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
827 /// let all_received = numbers.send(&process, TCP.bincode()); // KeyedStream<MemberId<()>, i32, ...>
828 /// # all_received.entries()
829 /// # }, |mut stream| async move {
830 /// // if there are 4 members in the cluster, we should receive 4 elements
831 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
832 /// # let mut results = Vec::new();
833 /// # for w in 0..4 {
834 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
835 /// # }
836 /// # results.sort();
837 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
838 /// # }));
839 /// # }
840 /// ```
841 ///
842 /// If you don't need to know the source for each element, you can use `.values()`
843 /// to get just the data:
844 /// ```rust
845 /// # #[cfg(feature = "deploy")] {
846 /// # use hydro_lang::prelude::*;
847 /// # use hydro_lang::live_collections::stream::NoOrder;
848 /// # use futures::StreamExt;
849 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
850 /// # let workers: Cluster<()> = flow.cluster::<()>();
851 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
852 /// let values: Stream<i32, _, _, NoOrder> = numbers.send(&process, TCP.bincode()).values();
853 /// # values
854 /// # }, |mut stream| async move {
855 /// # let mut results = Vec::new();
856 /// # for w in 0..4 {
857 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
858 /// # }
859 /// # results.sort();
860 /// // if there are 4 members in the cluster, we should receive 4 elements
861 /// // 1, 1, 1, 1
862 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
863 /// # }));
864 /// # }
865 /// ```
866 pub fn send<L2, N: NetworkFor<T>>(
867 self,
868 to: &Process<'a, L2>,
869 via: N,
870 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
871 where
872 T: Serialize + DeserializeOwned,
873 {
874 let _ = via;
875 let serialize_pipeline = Some(N::serialize_thunk(false));
876
877 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
878
879 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
880 to.clone(),
881 HydroNode::Network {
882 serialize_fn: serialize_pipeline.map(|e| e.into()),
883 instantiate_fn: DebugInstantiate::Building,
884 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
885 input: Box::new(self.ir_node.into_inner()),
886 metadata: to.new_node_metadata(Stream::<
887 (MemberId<L>, T),
888 Process<'a, L2>,
889 Unbounded,
890 O,
891 R,
892 >::collection_kind()),
893 },
894 );
895
896 raw_stream.into_keyed()
897 }
898
899 #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
900 /// Broadcasts elements of this stream at each source member to all members of a destination
901 /// cluster, using [`bincode`] to serialize/deserialize messages.
902 ///
903 /// Each source member sends each of its stream elements to **every** member of the cluster
904 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
905 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
906 /// **only data elements** and sends each element to all cluster members.
907 ///
908 /// # Non-Determinism
909 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
910 /// to the current cluster members known _at that point in time_ at the source member. Depending
911 /// on when each source member is notified of membership changes, it will broadcast each element
912 /// to different members.
913 ///
914 /// # Example
915 /// ```rust
916 /// # #[cfg(feature = "deploy")] {
917 /// # use hydro_lang::prelude::*;
918 /// # use hydro_lang::location::MemberId;
919 /// # use futures::StreamExt;
920 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
921 /// # type Source = ();
922 /// # type Destination = ();
923 /// let source: Cluster<Source> = flow.cluster::<Source>();
924 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
925 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
926 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
927 /// # on_destination.entries().send_bincode(&p2).entries()
928 /// // if there are 4 members in the desination, each receives one element from each source member
929 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
930 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
931 /// // - ...
932 /// # }, |mut stream| async move {
933 /// # let mut results = Vec::new();
934 /// # for w in 0..16 {
935 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
936 /// # }
937 /// # results.sort();
938 /// # assert_eq!(results, vec![
939 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
940 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
941 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
942 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
943 /// # ]);
944 /// # }));
945 /// # }
946 /// ```
947 pub fn broadcast_bincode<L2: 'a>(
948 self,
949 other: &Cluster<'a, L2>,
950 nondet_membership: NonDet,
951 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
952 where
953 T: Clone + Serialize + DeserializeOwned,
954 {
955 self.broadcast(other, TCP.bincode(), nondet_membership)
956 }
957
958 /// Broadcasts elements of this stream at each source member to all members of a destination
959 /// cluster, using the configuration in `via` to set up the message transport.
960 ///
961 /// Each source member sends each of its stream elements to **every** member of the cluster
962 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
963 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
964 /// **only data elements** and sends each element to all cluster members.
965 ///
966 /// # Non-Determinism
967 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
968 /// to the current cluster members known _at that point in time_ at the source member. Depending
969 /// on when each source member is notified of membership changes, it will broadcast each element
970 /// to different members.
971 ///
972 /// # Example
973 /// ```rust
974 /// # #[cfg(feature = "deploy")] {
975 /// # use hydro_lang::prelude::*;
976 /// # use hydro_lang::location::MemberId;
977 /// # use futures::StreamExt;
978 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
979 /// # type Source = ();
980 /// # type Destination = ();
981 /// let source: Cluster<Source> = flow.cluster::<Source>();
982 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
983 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
984 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.bincode(), nondet!(/** assuming stable membership */));
985 /// # on_destination.entries().send(&p2, TCP.bincode()).entries()
986 /// // if there are 4 members in the desination, each receives one element from each source member
987 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
988 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
989 /// // - ...
990 /// # }, |mut stream| async move {
991 /// # let mut results = Vec::new();
992 /// # for w in 0..16 {
993 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
994 /// # }
995 /// # results.sort();
996 /// # assert_eq!(results, vec![
997 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
998 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
999 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1000 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1001 /// # ]);
1002 /// # }));
1003 /// # }
1004 /// ```
1005 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1006 self,
1007 to: &Cluster<'a, L2>,
1008 via: N,
1009 nondet_membership: NonDet,
1010 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1011 where
1012 T: Clone + Serialize + DeserializeOwned,
1013 {
1014 let ids = track_membership(self.location.source_cluster_members(to));
1015 sliced! {
1016 let members_snapshot = use(ids, nondet_membership);
1017 let elements = use(self, nondet_membership);
1018
1019 let current_members = members_snapshot.filter(q!(|b| *b));
1020 elements.repeat_with_keys(current_members)
1021 }
1022 .demux(to, via)
1023 }
1024}
1025
1026impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1027 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1028{
1029 #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
1030 /// Sends elements of this stream at each source member to specific members of a destination
1031 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1032 ///
1033 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1034 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1035 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1036 /// all members.
1037 ///
1038 /// Each cluster member sends its local stream elements, and they are collected at each
1039 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1040 ///
1041 /// # Example
1042 /// ```rust
1043 /// # #[cfg(feature = "deploy")] {
1044 /// # use hydro_lang::prelude::*;
1045 /// # use futures::StreamExt;
1046 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1047 /// # type Source = ();
1048 /// # type Destination = ();
1049 /// let source: Cluster<Source> = flow.cluster::<Source>();
1050 /// let to_send: Stream<_, Cluster<_>, _> = source
1051 /// .source_iter(q!(vec![0, 1, 2, 3]))
1052 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1053 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1054 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1055 /// # all_received.entries().send_bincode(&p2).entries()
1056 /// # }, |mut stream| async move {
1057 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1058 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1059 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1060 /// // - ...
1061 /// # let mut results = Vec::new();
1062 /// # for w in 0..16 {
1063 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1064 /// # }
1065 /// # results.sort();
1066 /// # assert_eq!(results, vec![
1067 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1068 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1069 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1070 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1071 /// # ]);
1072 /// # }));
1073 /// # }
1074 /// ```
1075 pub fn demux_bincode(
1076 self,
1077 other: &Cluster<'a, L2>,
1078 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1079 where
1080 T: Serialize + DeserializeOwned,
1081 {
1082 self.demux(other, TCP.bincode())
1083 }
1084
1085 /// Sends elements of this stream at each source member to specific members of a destination
1086 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1087 /// message transport.
1088 ///
1089 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1090 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1091 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1092 /// all members.
1093 ///
1094 /// Each cluster member sends its local stream elements, and they are collected at each
1095 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1096 ///
1097 /// # Example
1098 /// ```rust
1099 /// # #[cfg(feature = "deploy")] {
1100 /// # use hydro_lang::prelude::*;
1101 /// # use futures::StreamExt;
1102 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1103 /// # type Source = ();
1104 /// # type Destination = ();
1105 /// let source: Cluster<Source> = flow.cluster::<Source>();
1106 /// let to_send: Stream<_, Cluster<_>, _> = source
1107 /// .source_iter(q!(vec![0, 1, 2, 3]))
1108 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1109 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1110 /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1111 /// # all_received.entries().send(&p2, TCP.bincode()).entries()
1112 /// # }, |mut stream| async move {
1113 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1114 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1115 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1116 /// // - ...
1117 /// # let mut results = Vec::new();
1118 /// # for w in 0..16 {
1119 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1120 /// # }
1121 /// # results.sort();
1122 /// # assert_eq!(results, vec![
1123 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1124 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1125 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1126 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1127 /// # ]);
1128 /// # }));
1129 /// # }
1130 /// ```
1131 pub fn demux<N: NetworkFor<T>>(
1132 self,
1133 to: &Cluster<'a, L2>,
1134 via: N,
1135 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1136 where
1137 T: Serialize + DeserializeOwned,
1138 {
1139 self.into_keyed().demux(to, via)
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 #[cfg(feature = "sim")]
1146 use stageleft::q;
1147
1148 #[cfg(feature = "sim")]
1149 use crate::location::{Location, MemberId};
1150 #[cfg(feature = "sim")]
1151 use crate::networking::TCP;
1152 #[cfg(feature = "sim")]
1153 use crate::nondet::nondet;
1154 #[cfg(feature = "sim")]
1155 use crate::prelude::FlowBuilder;
1156
1157 #[cfg(feature = "sim")]
1158 #[test]
1159 fn sim_send_bincode_o2o() {
1160 use crate::networking::TCP;
1161
1162 let flow = FlowBuilder::new();
1163 let node = flow.process::<()>();
1164 let node2 = flow.process::<()>();
1165
1166 let (in_send, input) = node.sim_input();
1167
1168 let out_recv = input
1169 .send(&node2, TCP.bincode())
1170 .batch(&node2.tick(), nondet!(/** test */))
1171 .count()
1172 .all_ticks()
1173 .sim_output();
1174
1175 let instances = flow.sim().exhaustive(async || {
1176 in_send.send(());
1177 in_send.send(());
1178 in_send.send(());
1179
1180 let received = out_recv.collect::<Vec<_>>().await;
1181 assert!(received.into_iter().sum::<usize>() == 3);
1182 });
1183
1184 assert_eq!(instances, 4); // 2^{3 - 1}
1185 }
1186
1187 #[cfg(feature = "sim")]
1188 #[test]
1189 fn sim_send_bincode_m2o() {
1190 let flow = FlowBuilder::new();
1191 let cluster = flow.cluster::<()>();
1192 let node = flow.process::<()>();
1193
1194 let input = cluster.source_iter(q!(vec![1]));
1195
1196 let out_recv = input
1197 .send(&node, TCP.bincode())
1198 .entries()
1199 .batch(&node.tick(), nondet!(/** test */))
1200 .all_ticks()
1201 .sim_output();
1202
1203 let instances = flow
1204 .sim()
1205 .with_cluster_size(&cluster, 4)
1206 .exhaustive(async || {
1207 out_recv
1208 .assert_yields_only_unordered(vec![
1209 (MemberId::from_raw_id(0), 1),
1210 (MemberId::from_raw_id(1), 1),
1211 (MemberId::from_raw_id(2), 1),
1212 (MemberId::from_raw_id(3), 1),
1213 ])
1214 .await
1215 });
1216
1217 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1218 }
1219
1220 #[cfg(feature = "sim")]
1221 #[test]
1222 fn sim_send_bincode_multiple_m2o() {
1223 let flow = FlowBuilder::new();
1224 let cluster1 = flow.cluster::<()>();
1225 let cluster2 = flow.cluster::<()>();
1226 let node = flow.process::<()>();
1227
1228 let out_recv_1 = cluster1
1229 .source_iter(q!(vec![1]))
1230 .send(&node, TCP.bincode())
1231 .entries()
1232 .sim_output();
1233
1234 let out_recv_2 = cluster2
1235 .source_iter(q!(vec![2]))
1236 .send(&node, TCP.bincode())
1237 .entries()
1238 .sim_output();
1239
1240 let instances = flow
1241 .sim()
1242 .with_cluster_size(&cluster1, 3)
1243 .with_cluster_size(&cluster2, 4)
1244 .exhaustive(async || {
1245 out_recv_1
1246 .assert_yields_only_unordered(vec![
1247 (MemberId::from_raw_id(0), 1),
1248 (MemberId::from_raw_id(1), 1),
1249 (MemberId::from_raw_id(2), 1),
1250 ])
1251 .await;
1252
1253 out_recv_2
1254 .assert_yields_only_unordered(vec![
1255 (MemberId::from_raw_id(0), 2),
1256 (MemberId::from_raw_id(1), 2),
1257 (MemberId::from_raw_id(2), 2),
1258 (MemberId::from_raw_id(3), 2),
1259 ])
1260 .await;
1261 });
1262
1263 assert_eq!(instances, 1);
1264 }
1265
1266 #[cfg(feature = "sim")]
1267 #[test]
1268 fn sim_send_bincode_o2m() {
1269 let flow = FlowBuilder::new();
1270 let cluster = flow.cluster::<()>();
1271 let node = flow.process::<()>();
1272
1273 let input = node.source_iter(q!(vec![
1274 (MemberId::from_raw_id(0), 123),
1275 (MemberId::from_raw_id(1), 456),
1276 ]));
1277
1278 let out_recv = input
1279 .demux(&cluster, TCP.bincode())
1280 .map(q!(|x| x + 1))
1281 .send(&node, TCP.bincode())
1282 .entries()
1283 .sim_output();
1284
1285 flow.sim()
1286 .with_cluster_size(&cluster, 4)
1287 .exhaustive(async || {
1288 out_recv
1289 .assert_yields_only_unordered(vec![
1290 (MemberId::from_raw_id(0), 124),
1291 (MemberId::from_raw_id(1), 457),
1292 ])
1293 .await
1294 });
1295 }
1296
1297 #[cfg(feature = "sim")]
1298 #[test]
1299 fn sim_broadcast_bincode_o2m() {
1300 let flow = FlowBuilder::new();
1301 let cluster = flow.cluster::<()>();
1302 let node = flow.process::<()>();
1303
1304 let input = node.source_iter(q!(vec![123, 456]));
1305
1306 let out_recv = input
1307 .broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1308 .map(q!(|x| x + 1))
1309 .send(&node, TCP.bincode())
1310 .entries()
1311 .sim_output();
1312
1313 let mut c_1_produced = false;
1314 let mut c_2_produced = false;
1315
1316 flow.sim()
1317 .with_cluster_size(&cluster, 2)
1318 .exhaustive(async || {
1319 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1320
1321 // check that order is preserved
1322 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1323 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1324 c_1_produced = true;
1325 }
1326
1327 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1328 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1329 c_2_produced = true;
1330 }
1331 });
1332
1333 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1334 }
1335
1336 #[cfg(feature = "sim")]
1337 #[test]
1338 fn sim_send_bincode_m2m() {
1339 let flow = FlowBuilder::new();
1340 let cluster = flow.cluster::<()>();
1341 let node = flow.process::<()>();
1342
1343 let input = node.source_iter(q!(vec![
1344 (MemberId::from_raw_id(0), 123),
1345 (MemberId::from_raw_id(1), 456),
1346 ]));
1347
1348 let out_recv = input
1349 .demux(&cluster, TCP.bincode())
1350 .map(q!(|x| x + 1))
1351 .flat_map_ordered(q!(|x| vec![
1352 (MemberId::from_raw_id(0), x),
1353 (MemberId::from_raw_id(1), x),
1354 ]))
1355 .demux(&cluster, TCP.bincode())
1356 .entries()
1357 .send(&node, TCP.bincode())
1358 .entries()
1359 .sim_output();
1360
1361 flow.sim()
1362 .with_cluster_size(&cluster, 4)
1363 .exhaustive(async || {
1364 out_recv
1365 .assert_yields_only_unordered(vec![
1366 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1367 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1368 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1369 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1370 ])
1371 .await
1372 });
1373 }
1374}