hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33 membership.fold(
34 q!(|| false),
35 q!(|present, event| {
36 match event {
37 MembershipEvent::Joined => *present = true,
38 MembershipEvent::Left => *present = false,
39 }
40 }),
41 )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45 let root = get_this_crate();
46
47 if is_demux {
48 parse_quote! {
49 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50 |(id, data)| {
51 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52 }
53 )
54 }
55 } else {
56 parse_quote! {
57 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58 |data| {
59 #root::runtime_support::bincode::serialize(&data).unwrap().into()
60 }
61 )
62 }
63 }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67 serialize_bincode_with_type(is_demux, "e_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71 let root = get_this_crate();
72 if let Some(c_type) = tagged {
73 parse_quote! {
74 |res| {
75 let (id, b) = res.unwrap();
76 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77 }
78 }
79 } else {
80 parse_quote! {
81 |res| {
82 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83 }
84 }
85 }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89 deserialize_bincode_with_type(tagged, "e_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95 /// using [`bincode`] to serialize/deserialize messages.
96 ///
97 /// The returned stream captures the elements received at the destination, where values will
98 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101 /// dropped no further messages will be sent.
102 ///
103 /// # Example
104 /// ```rust
105 /// # #[cfg(feature = "deploy")] {
106 /// # use hydro_lang::prelude::*;
107 /// # use futures::StreamExt;
108 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109 /// let p1 = flow.process::<()>();
110 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111 /// let p2 = flow.process::<()>();
112 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113 /// // 1, 2, 3
114 /// # on_p2.send_bincode(&p_out)
115 /// # }, |mut stream| async move {
116 /// # for w in 1..=3 {
117 /// # assert_eq!(stream.next().await, Some(w));
118 /// # }
119 /// # }));
120 /// # }
121 /// ```
122 pub fn send_bincode<L2>(
123 self,
124 other: &Process<'a, L2>,
125 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126 where
127 T: Serialize + DeserializeOwned,
128 {
129 self.send(other, TCP.fail_stop().bincode())
130 }
131
132 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133 /// using the configuration in `via` to set up the message transport.
134 ///
135 /// The returned stream captures the elements received at the destination, where values will
136 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139 /// dropped no further messages will be sent.
140 ///
141 /// # Example
142 /// ```rust
143 /// # #[cfg(feature = "deploy")] {
144 /// # use hydro_lang::prelude::*;
145 /// # use futures::StreamExt;
146 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147 /// let p1 = flow.process::<()>();
148 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149 /// let p2 = flow.process::<()>();
150 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151 /// // 1, 2, 3
152 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153 /// # }, |mut stream| async move {
154 /// # for w in 1..=3 {
155 /// # assert_eq!(stream.next().await, Some(w));
156 /// # }
157 /// # }));
158 /// # }
159 /// ```
160 pub fn send<L2, N: NetworkFor<T>>(
161 self,
162 to: &Process<'a, L2>,
163 via: N,
164 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165 where
166 T: Serialize + DeserializeOwned,
167 {
168 let serialize_pipeline = Some(N::serialize_thunk(false));
169 let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171 let name = via.name();
172 if to.multiversioned() && name.is_none() {
173 panic!(
174 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175 );
176 }
177
178 Stream::new(
179 to.clone(),
180 HydroNode::Network {
181 name: name.map(ToOwned::to_owned),
182 networking_info: N::networking_info(),
183 serialize_fn: serialize_pipeline.map(|e| e.into()),
184 instantiate_fn: DebugInstantiate::Building,
185 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
186 input: Box::new(self.ir_node.into_inner()),
187 metadata: to.new_node_metadata(
188 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
189 ),
190 },
191 )
192 }
193
194 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
195 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
196 /// using [`bincode`] to serialize/deserialize messages.
197 ///
198 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
199 /// membership information. This is a common pattern in distributed systems for broadcasting data to
200 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
201 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
202 /// each element to all cluster members.
203 ///
204 /// # Non-Determinism
205 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
206 /// to the current cluster members _at that point in time_. Depending on when we are notified of
207 /// membership changes, we will broadcast each element to different members.
208 ///
209 /// # Example
210 /// ```rust
211 /// # #[cfg(feature = "deploy")] {
212 /// # use hydro_lang::prelude::*;
213 /// # use futures::StreamExt;
214 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
215 /// let p1 = flow.process::<()>();
216 /// let workers: Cluster<()> = flow.cluster::<()>();
217 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
218 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
219 /// # on_worker.send_bincode(&p2).entries()
220 /// // if there are 4 members in the cluster, each receives one element
221 /// // - MemberId::<()>(0): [123]
222 /// // - MemberId::<()>(1): [123]
223 /// // - MemberId::<()>(2): [123]
224 /// // - MemberId::<()>(3): [123]
225 /// # }, |mut stream| async move {
226 /// # let mut results = Vec::new();
227 /// # for w in 0..4 {
228 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
229 /// # }
230 /// # results.sort();
231 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
232 /// # }));
233 /// # }
234 /// ```
235 pub fn broadcast_bincode<L2: 'a>(
236 self,
237 other: &Cluster<'a, L2>,
238 nondet_membership: NonDet,
239 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
240 where
241 T: Clone + Serialize + DeserializeOwned,
242 {
243 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
244 }
245
246 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
247 /// using the configuration in `via` to set up the message transport.
248 ///
249 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
250 /// membership information. This is a common pattern in distributed systems for broadcasting data to
251 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
252 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
253 /// each element to all cluster members.
254 ///
255 /// # Non-Determinism
256 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
257 /// to the current cluster members _at that point in time_. Depending on when we are notified of
258 /// membership changes, we will broadcast each element to different members.
259 ///
260 /// # Example
261 /// ```rust
262 /// # #[cfg(feature = "deploy")] {
263 /// # use hydro_lang::prelude::*;
264 /// # use futures::StreamExt;
265 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
266 /// let p1 = flow.process::<()>();
267 /// let workers: Cluster<()> = flow.cluster::<()>();
268 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
269 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
270 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
271 /// // if there are 4 members in the cluster, each receives one element
272 /// // - MemberId::<()>(0): [123]
273 /// // - MemberId::<()>(1): [123]
274 /// // - MemberId::<()>(2): [123]
275 /// // - MemberId::<()>(3): [123]
276 /// # }, |mut stream| async move {
277 /// # let mut results = Vec::new();
278 /// # for w in 0..4 {
279 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
280 /// # }
281 /// # results.sort();
282 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
283 /// # }));
284 /// # }
285 /// ```
286 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
287 self,
288 to: &Cluster<'a, L2>,
289 via: N,
290 nondet_membership: NonDet,
291 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
292 where
293 T: Clone + Serialize + DeserializeOwned,
294 {
295 let ids = track_membership(self.location.source_cluster_members(to));
296 sliced! {
297 let members_snapshot = use(ids, nondet_membership);
298 let elements = use(self, nondet_membership);
299
300 let current_members = members_snapshot.filter(q!(|b| *b));
301 elements.repeat_with_keys(current_members)
302 }
303 .demux(to, via)
304 }
305
306 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
307 /// serialization. The external process can receive these elements by establishing a TCP
308 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
309 ///
310 /// # Example
311 /// ```rust
312 /// # #[cfg(feature = "deploy")] {
313 /// # use hydro_lang::prelude::*;
314 /// # use futures::StreamExt;
315 /// # tokio_test::block_on(async move {
316 /// let mut flow = FlowBuilder::new();
317 /// let process = flow.process::<()>();
318 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
319 /// let external = flow.external::<()>();
320 /// let external_handle = numbers.send_bincode_external(&external);
321 ///
322 /// let mut deployment = hydro_deploy::Deployment::new();
323 /// let nodes = flow
324 /// .with_process(&process, deployment.Localhost())
325 /// .with_external(&external, deployment.Localhost())
326 /// .deploy(&mut deployment);
327 ///
328 /// deployment.deploy().await.unwrap();
329 /// // establish the TCP connection
330 /// let mut external_recv_stream = nodes.connect(external_handle).await;
331 /// deployment.start().await.unwrap();
332 ///
333 /// for w in 1..=3 {
334 /// assert_eq!(external_recv_stream.next().await, Some(w));
335 /// }
336 /// # });
337 /// # }
338 /// ```
339 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
340 where
341 T: Serialize + DeserializeOwned,
342 {
343 let serialize_pipeline = Some(serialize_bincode::<T>(false));
344
345 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
346
347 let external_port_id = flow_state_borrow.next_external_port();
348
349 flow_state_borrow.push_root(HydroRoot::SendExternal {
350 to_external_key: other.key,
351 to_port_id: external_port_id,
352 to_many: false,
353 unpaired: true,
354 serialize_fn: serialize_pipeline.map(|e| e.into()),
355 instantiate_fn: DebugInstantiate::Building,
356 input: Box::new(self.ir_node.into_inner()),
357 op_metadata: HydroIrOpMetadata::new(),
358 });
359
360 ExternalBincodeStream {
361 process_key: other.key,
362 port_id: external_port_id,
363 _phantom: PhantomData,
364 }
365 }
366
367 #[cfg(feature = "sim")]
368 /// Sets up a simulation output port for this stream, allowing test code to receive elements
369 /// sent to this stream during simulation.
370 pub fn sim_output(self) -> SimReceiver<T, O, R>
371 where
372 T: Serialize + DeserializeOwned,
373 {
374 let external_location: External<'a, ()> = External {
375 key: LocationKey::FIRST,
376 flow_state: self.location.flow_state().clone(),
377 _phantom: PhantomData,
378 };
379
380 let external = self.send_bincode_external(&external_location);
381
382 SimReceiver(external.port_id, PhantomData)
383 }
384}
385
386impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
387 /// Creates an external output for embedded deployment mode.
388 ///
389 /// The `name` parameter specifies the name of the field in the generated
390 /// `EmbeddedOutputs` struct that will receive elements from this stream.
391 /// The generated function will accept an `EmbeddedOutputs` struct with an
392 /// `impl FnMut(T)` field with this name.
393 pub fn embedded_output(self, name: impl Into<String>) {
394 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
395
396 self.location
397 .flow_state()
398 .borrow_mut()
399 .push_root(HydroRoot::EmbeddedOutput {
400 ident,
401 input: Box::new(self.ir_node.into_inner()),
402 op_metadata: HydroIrOpMetadata::new(),
403 });
404 }
405}
406
407impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
408 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
409{
410 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
411 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
412 /// using [`bincode`] to serialize/deserialize messages.
413 ///
414 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
415 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
416 /// this API allows precise targeting of specific cluster members rather than broadcasting to
417 /// all members.
418 ///
419 /// # Example
420 /// ```rust
421 /// # #[cfg(feature = "deploy")] {
422 /// # use hydro_lang::prelude::*;
423 /// # use futures::StreamExt;
424 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
425 /// let p1 = flow.process::<()>();
426 /// let workers: Cluster<()> = flow.cluster::<()>();
427 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
428 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
429 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
430 /// .demux_bincode(&workers);
431 /// # on_worker.send_bincode(&p2).entries()
432 /// // if there are 4 members in the cluster, each receives one element
433 /// // - MemberId::<()>(0): [0]
434 /// // - MemberId::<()>(1): [1]
435 /// // - MemberId::<()>(2): [2]
436 /// // - MemberId::<()>(3): [3]
437 /// # }, |mut stream| async move {
438 /// # let mut results = Vec::new();
439 /// # for w in 0..4 {
440 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
441 /// # }
442 /// # results.sort();
443 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
444 /// # }));
445 /// # }
446 /// ```
447 pub fn demux_bincode(
448 self,
449 other: &Cluster<'a, L2>,
450 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
451 where
452 T: Serialize + DeserializeOwned,
453 {
454 self.demux(other, TCP.fail_stop().bincode())
455 }
456
457 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
458 /// using the configuration in `via` to set up the message transport.
459 ///
460 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
461 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
462 /// this API allows precise targeting of specific cluster members rather than broadcasting to
463 /// all members.
464 ///
465 /// # Example
466 /// ```rust
467 /// # #[cfg(feature = "deploy")] {
468 /// # use hydro_lang::prelude::*;
469 /// # use futures::StreamExt;
470 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
471 /// let p1 = flow.process::<()>();
472 /// let workers: Cluster<()> = flow.cluster::<()>();
473 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
474 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
475 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
476 /// .demux(&workers, TCP.fail_stop().bincode());
477 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
478 /// // if there are 4 members in the cluster, each receives one element
479 /// // - MemberId::<()>(0): [0]
480 /// // - MemberId::<()>(1): [1]
481 /// // - MemberId::<()>(2): [2]
482 /// // - MemberId::<()>(3): [3]
483 /// # }, |mut stream| async move {
484 /// # let mut results = Vec::new();
485 /// # for w in 0..4 {
486 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
487 /// # }
488 /// # results.sort();
489 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
490 /// # }));
491 /// # }
492 /// ```
493 pub fn demux<N: NetworkFor<T>>(
494 self,
495 to: &Cluster<'a, L2>,
496 via: N,
497 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
498 where
499 T: Serialize + DeserializeOwned,
500 {
501 self.into_keyed().demux(to, via)
502 }
503}
504
505impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
506 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
507 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
508 /// [`bincode`] to serialize/deserialize messages.
509 ///
510 /// This provides load balancing by evenly distributing work across cluster members. The
511 /// distribution is deterministic based on element order - the first element goes to member 0,
512 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
513 ///
514 /// # Non-Determinism
515 /// The set of cluster members may asynchronously change over time. Each element is distributed
516 /// based on the current cluster membership _at that point in time_. Depending on when cluster
517 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
518 /// membership is stable, the order of members in the round-robin pattern may change across runs.
519 ///
520 /// # Ordering Requirements
521 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
522 /// order of messages and retries affects the round-robin pattern.
523 ///
524 /// # Example
525 /// ```rust
526 /// # #[cfg(feature = "deploy")] {
527 /// # use hydro_lang::prelude::*;
528 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
531 /// let p1 = flow.process::<()>();
532 /// let workers: Cluster<()> = flow.cluster::<()>();
533 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
534 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
535 /// on_worker.send_bincode(&p2)
536 /// # .first().values() // we use first to assert that each member gets one element
537 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
538 /// // - MemberId::<()>(?): [1]
539 /// // - MemberId::<()>(?): [2]
540 /// // - MemberId::<()>(?): [3]
541 /// // - MemberId::<()>(?): [4]
542 /// # }, |mut stream| async move {
543 /// # let mut results = Vec::new();
544 /// # for w in 0..4 {
545 /// # results.push(stream.next().await.unwrap());
546 /// # }
547 /// # results.sort();
548 /// # assert_eq!(results, vec![1, 2, 3, 4]);
549 /// # }));
550 /// # }
551 /// ```
552 pub fn round_robin_bincode<L2: 'a>(
553 self,
554 other: &Cluster<'a, L2>,
555 nondet_membership: NonDet,
556 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
557 where
558 T: Serialize + DeserializeOwned,
559 {
560 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
561 }
562
563 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
564 /// the configuration in `via` to set up the message transport.
565 ///
566 /// This provides load balancing by evenly distributing work across cluster members. The
567 /// distribution is deterministic based on element order - the first element goes to member 0,
568 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
569 ///
570 /// # Non-Determinism
571 /// The set of cluster members may asynchronously change over time. Each element is distributed
572 /// based on the current cluster membership _at that point in time_. Depending on when cluster
573 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
574 /// membership is stable, the order of members in the round-robin pattern may change across runs.
575 ///
576 /// # Ordering Requirements
577 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
578 /// order of messages and retries affects the round-robin pattern.
579 ///
580 /// # Example
581 /// ```rust
582 /// # #[cfg(feature = "deploy")] {
583 /// # use hydro_lang::prelude::*;
584 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
585 /// # use futures::StreamExt;
586 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
587 /// let p1 = flow.process::<()>();
588 /// let workers: Cluster<()> = flow.cluster::<()>();
589 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
590 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
591 /// on_worker.send(&p2, TCP.fail_stop().bincode())
592 /// # .first().values() // we use first to assert that each member gets one element
593 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
594 /// // - MemberId::<()>(?): [1]
595 /// // - MemberId::<()>(?): [2]
596 /// // - MemberId::<()>(?): [3]
597 /// // - MemberId::<()>(?): [4]
598 /// # }, |mut stream| async move {
599 /// # let mut results = Vec::new();
600 /// # for w in 0..4 {
601 /// # results.push(stream.next().await.unwrap());
602 /// # }
603 /// # results.sort();
604 /// # assert_eq!(results, vec![1, 2, 3, 4]);
605 /// # }));
606 /// # }
607 /// ```
608 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
609 self,
610 to: &Cluster<'a, L2>,
611 via: N,
612 nondet_membership: NonDet,
613 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
614 where
615 T: Serialize + DeserializeOwned,
616 {
617 let ids = track_membership(self.location.source_cluster_members(to));
618 sliced! {
619 let members_snapshot = use(ids, nondet_membership);
620 let elements = use(self.enumerate(), nondet_membership);
621
622 let current_members = members_snapshot
623 .filter(q!(|b| *b))
624 .keys()
625 .assume_ordering::<TotalOrder>(nondet_membership)
626 .collect_vec();
627
628 elements
629 .cross_singleton(current_members)
630 .map(q!(|(data, members)| (
631 members[data.0 % members.len()].clone(),
632 data.1
633 )))
634 }
635 .demux(to, via)
636 }
637}
638
639impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
640 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
641 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
642 /// [`bincode`] to serialize/deserialize messages.
643 ///
644 /// This provides load balancing by evenly distributing work across cluster members. The
645 /// distribution is deterministic based on element order - the first element goes to member 0,
646 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
647 ///
648 /// # Non-Determinism
649 /// The set of cluster members may asynchronously change over time. Each element is distributed
650 /// based on the current cluster membership _at that point in time_. Depending on when cluster
651 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
652 /// membership is stable, the order of members in the round-robin pattern may change across runs.
653 ///
654 /// # Ordering Requirements
655 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
656 /// order of messages and retries affects the round-robin pattern.
657 ///
658 /// # Example
659 /// ```rust
660 /// # #[cfg(feature = "deploy")] {
661 /// # use hydro_lang::prelude::*;
662 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
663 /// # use hydro_lang::location::MemberId;
664 /// # use futures::StreamExt;
665 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
666 /// let p1 = flow.process::<()>();
667 /// let workers1: Cluster<()> = flow.cluster::<()>();
668 /// let workers2: Cluster<()> = flow.cluster::<()>();
669 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
670 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
671 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
672 /// on_worker2.send_bincode(&p2)
673 /// # .entries()
674 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
675 /// # }, |mut stream| async move {
676 /// # let mut results = Vec::new();
677 /// # let mut locations = std::collections::HashSet::new();
678 /// # for w in 0..=16 {
679 /// # let (location, v) = stream.next().await.unwrap();
680 /// # locations.insert(location);
681 /// # results.push(v);
682 /// # }
683 /// # results.sort();
684 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
685 /// # assert_eq!(locations.len(), 16);
686 /// # }));
687 /// # }
688 /// ```
689 pub fn round_robin_bincode<L2: 'a>(
690 self,
691 other: &Cluster<'a, L2>,
692 nondet_membership: NonDet,
693 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
694 where
695 T: Serialize + DeserializeOwned,
696 {
697 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
698 }
699
700 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
701 /// the configuration in `via` to set up the message transport.
702 ///
703 /// This provides load balancing by evenly distributing work across cluster members. The
704 /// distribution is deterministic based on element order - the first element goes to member 0,
705 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
706 ///
707 /// # Non-Determinism
708 /// The set of cluster members may asynchronously change over time. Each element is distributed
709 /// based on the current cluster membership _at that point in time_. Depending on when cluster
710 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
711 /// membership is stable, the order of members in the round-robin pattern may change across runs.
712 ///
713 /// # Ordering Requirements
714 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
715 /// order of messages and retries affects the round-robin pattern.
716 ///
717 /// # Example
718 /// ```rust
719 /// # #[cfg(feature = "deploy")] {
720 /// # use hydro_lang::prelude::*;
721 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
722 /// # use hydro_lang::location::MemberId;
723 /// # use futures::StreamExt;
724 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
725 /// let p1 = flow.process::<()>();
726 /// let workers1: Cluster<()> = flow.cluster::<()>();
727 /// let workers2: Cluster<()> = flow.cluster::<()>();
728 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
729 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
730 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
731 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
732 /// # .entries()
733 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
734 /// # }, |mut stream| async move {
735 /// # let mut results = Vec::new();
736 /// # let mut locations = std::collections::HashSet::new();
737 /// # for w in 0..=16 {
738 /// # let (location, v) = stream.next().await.unwrap();
739 /// # locations.insert(location);
740 /// # results.push(v);
741 /// # }
742 /// # results.sort();
743 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
744 /// # assert_eq!(locations.len(), 16);
745 /// # }));
746 /// # }
747 /// ```
748 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
749 self,
750 to: &Cluster<'a, L2>,
751 via: N,
752 nondet_membership: NonDet,
753 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
754 where
755 T: Serialize + DeserializeOwned,
756 {
757 let ids = track_membership(self.location.source_cluster_members(to));
758 sliced! {
759 let members_snapshot = use(ids, nondet_membership);
760 let elements = use(self.enumerate(), nondet_membership);
761
762 let current_members = members_snapshot
763 .filter(q!(|b| *b))
764 .keys()
765 .assume_ordering::<TotalOrder>(nondet_membership)
766 .collect_vec();
767
768 elements
769 .cross_singleton(current_members)
770 .map(q!(|(data, members)| (
771 members[data.0 % members.len()].clone(),
772 data.1
773 )))
774 }
775 .demux(to, via)
776 }
777}
778
779impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
780 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
781 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
782 /// using [`bincode`] to serialize/deserialize messages.
783 ///
784 /// Each cluster member sends its local stream elements, and they are collected at the destination
785 /// as a [`KeyedStream`] where keys identify the source cluster member.
786 ///
787 /// # Example
788 /// ```rust
789 /// # #[cfg(feature = "deploy")] {
790 /// # use hydro_lang::prelude::*;
791 /// # use futures::StreamExt;
792 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
793 /// let workers: Cluster<()> = flow.cluster::<()>();
794 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
795 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
796 /// # all_received.entries()
797 /// # }, |mut stream| async move {
798 /// // if there are 4 members in the cluster, we should receive 4 elements
799 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
800 /// # let mut results = Vec::new();
801 /// # for w in 0..4 {
802 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
803 /// # }
804 /// # results.sort();
805 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
806 /// # }));
807 /// # }
808 /// ```
809 ///
810 /// If you don't need to know the source for each element, you can use `.values()`
811 /// to get just the data:
812 /// ```rust
813 /// # #[cfg(feature = "deploy")] {
814 /// # use hydro_lang::prelude::*;
815 /// # use hydro_lang::live_collections::stream::NoOrder;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
818 /// # let workers: Cluster<()> = flow.cluster::<()>();
819 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
820 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
821 /// # values
822 /// # }, |mut stream| async move {
823 /// # let mut results = Vec::new();
824 /// # for w in 0..4 {
825 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
826 /// # }
827 /// # results.sort();
828 /// // if there are 4 members in the cluster, we should receive 4 elements
829 /// // 1, 1, 1, 1
830 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
831 /// # }));
832 /// # }
833 /// ```
834 pub fn send_bincode<L2>(
835 self,
836 other: &Process<'a, L2>,
837 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
838 where
839 T: Serialize + DeserializeOwned,
840 {
841 self.send(other, TCP.fail_stop().bincode())
842 }
843
844 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
845 /// using the configuration in `via` to set up the message transport.
846 ///
847 /// Each cluster member sends its local stream elements, and they are collected at the destination
848 /// as a [`KeyedStream`] where keys identify the source cluster member.
849 ///
850 /// # Example
851 /// ```rust
852 /// # #[cfg(feature = "deploy")] {
853 /// # use hydro_lang::prelude::*;
854 /// # use futures::StreamExt;
855 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
856 /// let workers: Cluster<()> = flow.cluster::<()>();
857 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
858 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
859 /// # all_received.entries()
860 /// # }, |mut stream| async move {
861 /// // if there are 4 members in the cluster, we should receive 4 elements
862 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
863 /// # let mut results = Vec::new();
864 /// # for w in 0..4 {
865 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
866 /// # }
867 /// # results.sort();
868 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
869 /// # }));
870 /// # }
871 /// ```
872 ///
873 /// If you don't need to know the source for each element, you can use `.values()`
874 /// to get just the data:
875 /// ```rust
876 /// # #[cfg(feature = "deploy")] {
877 /// # use hydro_lang::prelude::*;
878 /// # use hydro_lang::live_collections::stream::NoOrder;
879 /// # use futures::StreamExt;
880 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
881 /// # let workers: Cluster<()> = flow.cluster::<()>();
882 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
883 /// let values: Stream<i32, _, _, NoOrder> =
884 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
885 /// # values
886 /// # }, |mut stream| async move {
887 /// # let mut results = Vec::new();
888 /// # for w in 0..4 {
889 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
890 /// # }
891 /// # results.sort();
892 /// // if there are 4 members in the cluster, we should receive 4 elements
893 /// // 1, 1, 1, 1
894 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
895 /// # }));
896 /// # }
897 /// ```
898 pub fn send<L2, N: NetworkFor<T>>(
899 self,
900 to: &Process<'a, L2>,
901 via: N,
902 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
903 where
904 T: Serialize + DeserializeOwned,
905 {
906 let serialize_pipeline = Some(N::serialize_thunk(false));
907
908 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
909
910 let name = via.name();
911 if to.multiversioned() && name.is_none() {
912 panic!(
913 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
914 );
915 }
916
917 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
918 to.clone(),
919 HydroNode::Network {
920 name: name.map(ToOwned::to_owned),
921 networking_info: N::networking_info(),
922 serialize_fn: serialize_pipeline.map(|e| e.into()),
923 instantiate_fn: DebugInstantiate::Building,
924 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
925 input: Box::new(self.ir_node.into_inner()),
926 metadata: to.new_node_metadata(Stream::<
927 (MemberId<L>, T),
928 Process<'a, L2>,
929 Unbounded,
930 O,
931 R,
932 >::collection_kind()),
933 },
934 );
935
936 raw_stream.into_keyed()
937 }
938
939 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
940 /// Broadcasts elements of this stream at each source member to all members of a destination
941 /// cluster, using [`bincode`] to serialize/deserialize messages.
942 ///
943 /// Each source member sends each of its stream elements to **every** member of the cluster
944 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
945 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
946 /// **only data elements** and sends each element to all cluster members.
947 ///
948 /// # Non-Determinism
949 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
950 /// to the current cluster members known _at that point in time_ at the source member. Depending
951 /// on when each source member is notified of membership changes, it will broadcast each element
952 /// to different members.
953 ///
954 /// # Example
955 /// ```rust
956 /// # #[cfg(feature = "deploy")] {
957 /// # use hydro_lang::prelude::*;
958 /// # use hydro_lang::location::MemberId;
959 /// # use futures::StreamExt;
960 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
961 /// # type Source = ();
962 /// # type Destination = ();
963 /// let source: Cluster<Source> = flow.cluster::<Source>();
964 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
965 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
966 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
967 /// # on_destination.entries().send_bincode(&p2).entries()
968 /// // if there are 4 members in the desination, each receives one element from each source member
969 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
970 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
971 /// // - ...
972 /// # }, |mut stream| async move {
973 /// # let mut results = Vec::new();
974 /// # for w in 0..16 {
975 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
976 /// # }
977 /// # results.sort();
978 /// # assert_eq!(results, vec![
979 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
980 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
981 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
982 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
983 /// # ]);
984 /// # }));
985 /// # }
986 /// ```
987 pub fn broadcast_bincode<L2: 'a>(
988 self,
989 other: &Cluster<'a, L2>,
990 nondet_membership: NonDet,
991 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
992 where
993 T: Clone + Serialize + DeserializeOwned,
994 {
995 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
996 }
997
998 /// Broadcasts elements of this stream at each source member to all members of a destination
999 /// cluster, using the configuration in `via` to set up the message transport.
1000 ///
1001 /// Each source member sends each of its stream elements to **every** member of the cluster
1002 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1003 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1004 /// **only data elements** and sends each element to all cluster members.
1005 ///
1006 /// # Non-Determinism
1007 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1008 /// to the current cluster members known _at that point in time_ at the source member. Depending
1009 /// on when each source member is notified of membership changes, it will broadcast each element
1010 /// to different members.
1011 ///
1012 /// # Example
1013 /// ```rust
1014 /// # #[cfg(feature = "deploy")] {
1015 /// # use hydro_lang::prelude::*;
1016 /// # use hydro_lang::location::MemberId;
1017 /// # use futures::StreamExt;
1018 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1019 /// # type Source = ();
1020 /// # type Destination = ();
1021 /// let source: Cluster<Source> = flow.cluster::<Source>();
1022 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1023 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1024 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1025 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1026 /// // if there are 4 members in the desination, each receives one element from each source member
1027 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1028 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1029 /// // - ...
1030 /// # }, |mut stream| async move {
1031 /// # let mut results = Vec::new();
1032 /// # for w in 0..16 {
1033 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1034 /// # }
1035 /// # results.sort();
1036 /// # assert_eq!(results, vec![
1037 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1038 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1039 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1040 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1041 /// # ]);
1042 /// # }));
1043 /// # }
1044 /// ```
1045 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1046 self,
1047 to: &Cluster<'a, L2>,
1048 via: N,
1049 nondet_membership: NonDet,
1050 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1051 where
1052 T: Clone + Serialize + DeserializeOwned,
1053 {
1054 let ids = track_membership(self.location.source_cluster_members(to));
1055 sliced! {
1056 let members_snapshot = use(ids, nondet_membership);
1057 let elements = use(self, nondet_membership);
1058
1059 let current_members = members_snapshot.filter(q!(|b| *b));
1060 elements.repeat_with_keys(current_members)
1061 }
1062 .demux(to, via)
1063 }
1064}
1065
1066impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1067 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1068{
1069 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1070 /// Sends elements of this stream at each source member to specific members of a destination
1071 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1072 ///
1073 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1074 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1075 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1076 /// all members.
1077 ///
1078 /// Each cluster member sends its local stream elements, and they are collected at each
1079 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1080 ///
1081 /// # Example
1082 /// ```rust
1083 /// # #[cfg(feature = "deploy")] {
1084 /// # use hydro_lang::prelude::*;
1085 /// # use futures::StreamExt;
1086 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1087 /// # type Source = ();
1088 /// # type Destination = ();
1089 /// let source: Cluster<Source> = flow.cluster::<Source>();
1090 /// let to_send: Stream<_, Cluster<_>, _> = source
1091 /// .source_iter(q!(vec![0, 1, 2, 3]))
1092 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1093 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1094 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1095 /// # all_received.entries().send_bincode(&p2).entries()
1096 /// # }, |mut stream| async move {
1097 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1098 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1099 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1100 /// // - ...
1101 /// # let mut results = Vec::new();
1102 /// # for w in 0..16 {
1103 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1104 /// # }
1105 /// # results.sort();
1106 /// # assert_eq!(results, vec![
1107 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1108 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1109 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1110 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1111 /// # ]);
1112 /// # }));
1113 /// # }
1114 /// ```
1115 pub fn demux_bincode(
1116 self,
1117 other: &Cluster<'a, L2>,
1118 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1119 where
1120 T: Serialize + DeserializeOwned,
1121 {
1122 self.demux(other, TCP.fail_stop().bincode())
1123 }
1124
1125 /// Sends elements of this stream at each source member to specific members of a destination
1126 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1127 /// message transport.
1128 ///
1129 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1130 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1131 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1132 /// all members.
1133 ///
1134 /// Each cluster member sends its local stream elements, and they are collected at each
1135 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1136 ///
1137 /// # Example
1138 /// ```rust
1139 /// # #[cfg(feature = "deploy")] {
1140 /// # use hydro_lang::prelude::*;
1141 /// # use futures::StreamExt;
1142 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1143 /// # type Source = ();
1144 /// # type Destination = ();
1145 /// let source: Cluster<Source> = flow.cluster::<Source>();
1146 /// let to_send: Stream<_, Cluster<_>, _> = source
1147 /// .source_iter(q!(vec![0, 1, 2, 3]))
1148 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1149 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1150 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1151 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1152 /// # }, |mut stream| async move {
1153 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1154 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1155 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1156 /// // - ...
1157 /// # let mut results = Vec::new();
1158 /// # for w in 0..16 {
1159 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1160 /// # }
1161 /// # results.sort();
1162 /// # assert_eq!(results, vec![
1163 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1164 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1165 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1166 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1167 /// # ]);
1168 /// # }));
1169 /// # }
1170 /// ```
1171 pub fn demux<N: NetworkFor<T>>(
1172 self,
1173 to: &Cluster<'a, L2>,
1174 via: N,
1175 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1176 where
1177 T: Serialize + DeserializeOwned,
1178 {
1179 self.into_keyed().demux(to, via)
1180 }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 #[cfg(feature = "sim")]
1186 use stageleft::q;
1187
1188 #[cfg(feature = "sim")]
1189 use crate::location::{Location, MemberId};
1190 #[cfg(feature = "sim")]
1191 use crate::networking::TCP;
1192 #[cfg(feature = "sim")]
1193 use crate::nondet::nondet;
1194 #[cfg(feature = "sim")]
1195 use crate::prelude::FlowBuilder;
1196
1197 #[cfg(feature = "sim")]
1198 #[test]
1199 fn sim_send_bincode_o2o() {
1200 use crate::networking::TCP;
1201
1202 let mut flow = FlowBuilder::new();
1203 let node = flow.process::<()>();
1204 let node2 = flow.process::<()>();
1205
1206 let (in_send, input) = node.sim_input();
1207
1208 let out_recv = input
1209 .send(&node2, TCP.fail_stop().bincode())
1210 .batch(&node2.tick(), nondet!(/** test */))
1211 .count()
1212 .all_ticks()
1213 .sim_output();
1214
1215 let instances = flow.sim().exhaustive(async || {
1216 in_send.send(());
1217 in_send.send(());
1218 in_send.send(());
1219
1220 let received = out_recv.collect::<Vec<_>>().await;
1221 assert!(received.into_iter().sum::<usize>() == 3);
1222 });
1223
1224 assert_eq!(instances, 4); // 2^{3 - 1}
1225 }
1226
1227 #[cfg(feature = "sim")]
1228 #[test]
1229 fn sim_send_bincode_m2o() {
1230 let mut flow = FlowBuilder::new();
1231 let cluster = flow.cluster::<()>();
1232 let node = flow.process::<()>();
1233
1234 let input = cluster.source_iter(q!(vec![1]));
1235
1236 let out_recv = input
1237 .send(&node, TCP.fail_stop().bincode())
1238 .entries()
1239 .batch(&node.tick(), nondet!(/** test */))
1240 .all_ticks()
1241 .sim_output();
1242
1243 let instances = flow
1244 .sim()
1245 .with_cluster_size(&cluster, 4)
1246 .exhaustive(async || {
1247 out_recv
1248 .assert_yields_only_unordered(vec![
1249 (MemberId::from_raw_id(0), 1),
1250 (MemberId::from_raw_id(1), 1),
1251 (MemberId::from_raw_id(2), 1),
1252 (MemberId::from_raw_id(3), 1),
1253 ])
1254 .await
1255 });
1256
1257 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1258 }
1259
1260 #[cfg(feature = "sim")]
1261 #[test]
1262 fn sim_send_bincode_multiple_m2o() {
1263 let mut flow = FlowBuilder::new();
1264 let cluster1 = flow.cluster::<()>();
1265 let cluster2 = flow.cluster::<()>();
1266 let node = flow.process::<()>();
1267
1268 let out_recv_1 = cluster1
1269 .source_iter(q!(vec![1]))
1270 .send(&node, TCP.fail_stop().bincode())
1271 .entries()
1272 .sim_output();
1273
1274 let out_recv_2 = cluster2
1275 .source_iter(q!(vec![2]))
1276 .send(&node, TCP.fail_stop().bincode())
1277 .entries()
1278 .sim_output();
1279
1280 let instances = flow
1281 .sim()
1282 .with_cluster_size(&cluster1, 3)
1283 .with_cluster_size(&cluster2, 4)
1284 .exhaustive(async || {
1285 out_recv_1
1286 .assert_yields_only_unordered(vec![
1287 (MemberId::from_raw_id(0), 1),
1288 (MemberId::from_raw_id(1), 1),
1289 (MemberId::from_raw_id(2), 1),
1290 ])
1291 .await;
1292
1293 out_recv_2
1294 .assert_yields_only_unordered(vec![
1295 (MemberId::from_raw_id(0), 2),
1296 (MemberId::from_raw_id(1), 2),
1297 (MemberId::from_raw_id(2), 2),
1298 (MemberId::from_raw_id(3), 2),
1299 ])
1300 .await;
1301 });
1302
1303 assert_eq!(instances, 1);
1304 }
1305
1306 #[cfg(feature = "sim")]
1307 #[test]
1308 fn sim_send_bincode_o2m() {
1309 let mut flow = FlowBuilder::new();
1310 let cluster = flow.cluster::<()>();
1311 let node = flow.process::<()>();
1312
1313 let input = node.source_iter(q!(vec![
1314 (MemberId::from_raw_id(0), 123),
1315 (MemberId::from_raw_id(1), 456),
1316 ]));
1317
1318 let out_recv = input
1319 .demux(&cluster, TCP.fail_stop().bincode())
1320 .map(q!(|x| x + 1))
1321 .send(&node, TCP.fail_stop().bincode())
1322 .entries()
1323 .sim_output();
1324
1325 flow.sim()
1326 .with_cluster_size(&cluster, 4)
1327 .exhaustive(async || {
1328 out_recv
1329 .assert_yields_only_unordered(vec![
1330 (MemberId::from_raw_id(0), 124),
1331 (MemberId::from_raw_id(1), 457),
1332 ])
1333 .await
1334 });
1335 }
1336
1337 #[cfg(feature = "sim")]
1338 #[test]
1339 fn sim_broadcast_bincode_o2m() {
1340 let mut flow = FlowBuilder::new();
1341 let cluster = flow.cluster::<()>();
1342 let node = flow.process::<()>();
1343
1344 let input = node.source_iter(q!(vec![123, 456]));
1345
1346 let out_recv = input
1347 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1348 .map(q!(|x| x + 1))
1349 .send(&node, TCP.fail_stop().bincode())
1350 .entries()
1351 .sim_output();
1352
1353 let mut c_1_produced = false;
1354 let mut c_2_produced = false;
1355
1356 flow.sim()
1357 .with_cluster_size(&cluster, 2)
1358 .exhaustive(async || {
1359 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1360
1361 // check that order is preserved
1362 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1363 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1364 c_1_produced = true;
1365 }
1366
1367 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1368 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1369 c_2_produced = true;
1370 }
1371 });
1372
1373 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1374 }
1375
1376 #[cfg(feature = "sim")]
1377 #[test]
1378 fn sim_send_bincode_m2m() {
1379 let mut flow = FlowBuilder::new();
1380 let cluster = flow.cluster::<()>();
1381 let node = flow.process::<()>();
1382
1383 let input = node.source_iter(q!(vec![
1384 (MemberId::from_raw_id(0), 123),
1385 (MemberId::from_raw_id(1), 456),
1386 ]));
1387
1388 let out_recv = input
1389 .demux(&cluster, TCP.fail_stop().bincode())
1390 .map(q!(|x| x + 1))
1391 .flat_map_ordered(q!(|x| vec![
1392 (MemberId::from_raw_id(0), x),
1393 (MemberId::from_raw_id(1), x),
1394 ]))
1395 .demux(&cluster, TCP.fail_stop().bincode())
1396 .entries()
1397 .send(&node, TCP.fail_stop().bincode())
1398 .entries()
1399 .sim_output();
1400
1401 flow.sim()
1402 .with_cluster_size(&cluster, 4)
1403 .exhaustive(async || {
1404 out_recv
1405 .assert_yields_only_unordered(vec![
1406 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1407 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1408 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1409 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1410 ])
1411 .await
1412 });
1413 }
1414}