hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33 membership.fold(
34 q!(|| false),
35 q!(|present, event| {
36 match event {
37 MembershipEvent::Joined => *present = true,
38 MembershipEvent::Left => *present = false,
39 }
40 }),
41 )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45 let root = get_this_crate();
46
47 if is_demux {
48 parse_quote! {
49 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50 |(id, data)| {
51 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52 }
53 )
54 }
55 } else {
56 parse_quote! {
57 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58 |data| {
59 #root::runtime_support::bincode::serialize(&data).unwrap().into()
60 }
61 )
62 }
63 }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67 serialize_bincode_with_type(is_demux, "e_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71 let root = get_this_crate();
72 if let Some(c_type) = tagged {
73 parse_quote! {
74 |res| {
75 let (id, b) = res.unwrap();
76 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77 }
78 }
79 } else {
80 parse_quote! {
81 |res| {
82 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83 }
84 }
85 }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89 deserialize_bincode_with_type(tagged, "e_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95 /// using [`bincode`] to serialize/deserialize messages.
96 ///
97 /// The returned stream captures the elements received at the destination, where values will
98 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101 /// dropped no further messages will be sent.
102 ///
103 /// # Example
104 /// ```rust
105 /// # #[cfg(feature = "deploy")] {
106 /// # use hydro_lang::prelude::*;
107 /// # use futures::StreamExt;
108 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109 /// let p1 = flow.process::<()>();
110 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111 /// let p2 = flow.process::<()>();
112 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113 /// // 1, 2, 3
114 /// # on_p2.send_bincode(&p_out)
115 /// # }, |mut stream| async move {
116 /// # for w in 1..=3 {
117 /// # assert_eq!(stream.next().await, Some(w));
118 /// # }
119 /// # }));
120 /// # }
121 /// ```
122 pub fn send_bincode<L2>(
123 self,
124 other: &Process<'a, L2>,
125 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126 where
127 T: Serialize + DeserializeOwned,
128 {
129 self.send(other, TCP.fail_stop().bincode())
130 }
131
132 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133 /// using the configuration in `via` to set up the message transport.
134 ///
135 /// The returned stream captures the elements received at the destination, where values will
136 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139 /// dropped no further messages will be sent.
140 ///
141 /// # Example
142 /// ```rust
143 /// # #[cfg(feature = "deploy")] {
144 /// # use hydro_lang::prelude::*;
145 /// # use futures::StreamExt;
146 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147 /// let p1 = flow.process::<()>();
148 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149 /// let p2 = flow.process::<()>();
150 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151 /// // 1, 2, 3
152 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153 /// # }, |mut stream| async move {
154 /// # for w in 1..=3 {
155 /// # assert_eq!(stream.next().await, Some(w));
156 /// # }
157 /// # }));
158 /// # }
159 /// ```
160 pub fn send<L2, N: NetworkFor<T>>(
161 self,
162 to: &Process<'a, L2>,
163 via: N,
164 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165 where
166 T: Serialize + DeserializeOwned,
167 {
168 let serialize_pipeline = Some(N::serialize_thunk(false));
169 let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171 let name = via.name();
172 if to.multiversioned() && name.is_none() {
173 panic!(
174 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175 );
176 }
177
178 Stream::new(
179 to.clone(),
180 HydroNode::Network {
181 name: name.map(ToOwned::to_owned),
182 serialize_fn: serialize_pipeline.map(|e| e.into()),
183 instantiate_fn: DebugInstantiate::Building,
184 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
185 input: Box::new(self.ir_node.into_inner()),
186 metadata: to.new_node_metadata(
187 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
188 ),
189 },
190 )
191 }
192
193 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
194 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
195 /// using [`bincode`] to serialize/deserialize messages.
196 ///
197 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
198 /// membership information. This is a common pattern in distributed systems for broadcasting data to
199 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
200 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
201 /// each element to all cluster members.
202 ///
203 /// # Non-Determinism
204 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
205 /// to the current cluster members _at that point in time_. Depending on when we are notified of
206 /// membership changes, we will broadcast each element to different members.
207 ///
208 /// # Example
209 /// ```rust
210 /// # #[cfg(feature = "deploy")] {
211 /// # use hydro_lang::prelude::*;
212 /// # use futures::StreamExt;
213 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214 /// let p1 = flow.process::<()>();
215 /// let workers: Cluster<()> = flow.cluster::<()>();
216 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
217 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
218 /// # on_worker.send_bincode(&p2).entries()
219 /// // if there are 4 members in the cluster, each receives one element
220 /// // - MemberId::<()>(0): [123]
221 /// // - MemberId::<()>(1): [123]
222 /// // - MemberId::<()>(2): [123]
223 /// // - MemberId::<()>(3): [123]
224 /// # }, |mut stream| async move {
225 /// # let mut results = Vec::new();
226 /// # for w in 0..4 {
227 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
228 /// # }
229 /// # results.sort();
230 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
231 /// # }));
232 /// # }
233 /// ```
234 pub fn broadcast_bincode<L2: 'a>(
235 self,
236 other: &Cluster<'a, L2>,
237 nondet_membership: NonDet,
238 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
239 where
240 T: Clone + Serialize + DeserializeOwned,
241 {
242 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
243 }
244
245 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
246 /// using the configuration in `via` to set up the message transport.
247 ///
248 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
249 /// membership information. This is a common pattern in distributed systems for broadcasting data to
250 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
251 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
252 /// each element to all cluster members.
253 ///
254 /// # Non-Determinism
255 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
256 /// to the current cluster members _at that point in time_. Depending on when we are notified of
257 /// membership changes, we will broadcast each element to different members.
258 ///
259 /// # Example
260 /// ```rust
261 /// # #[cfg(feature = "deploy")] {
262 /// # use hydro_lang::prelude::*;
263 /// # use futures::StreamExt;
264 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
265 /// let p1 = flow.process::<()>();
266 /// let workers: Cluster<()> = flow.cluster::<()>();
267 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
268 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
269 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
270 /// // if there are 4 members in the cluster, each receives one element
271 /// // - MemberId::<()>(0): [123]
272 /// // - MemberId::<()>(1): [123]
273 /// // - MemberId::<()>(2): [123]
274 /// // - MemberId::<()>(3): [123]
275 /// # }, |mut stream| async move {
276 /// # let mut results = Vec::new();
277 /// # for w in 0..4 {
278 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
279 /// # }
280 /// # results.sort();
281 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
282 /// # }));
283 /// # }
284 /// ```
285 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
286 self,
287 to: &Cluster<'a, L2>,
288 via: N,
289 nondet_membership: NonDet,
290 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
291 where
292 T: Clone + Serialize + DeserializeOwned,
293 {
294 let ids = track_membership(self.location.source_cluster_members(to));
295 sliced! {
296 let members_snapshot = use(ids, nondet_membership);
297 let elements = use(self, nondet_membership);
298
299 let current_members = members_snapshot.filter(q!(|b| *b));
300 elements.repeat_with_keys(current_members)
301 }
302 .demux(to, via)
303 }
304
305 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
306 /// serialization. The external process can receive these elements by establishing a TCP
307 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
308 ///
309 /// # Example
310 /// ```rust
311 /// # #[cfg(feature = "deploy")] {
312 /// # use hydro_lang::prelude::*;
313 /// # use futures::StreamExt;
314 /// # tokio_test::block_on(async move {
315 /// let mut flow = FlowBuilder::new();
316 /// let process = flow.process::<()>();
317 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
318 /// let external = flow.external::<()>();
319 /// let external_handle = numbers.send_bincode_external(&external);
320 ///
321 /// let mut deployment = hydro_deploy::Deployment::new();
322 /// let nodes = flow
323 /// .with_process(&process, deployment.Localhost())
324 /// .with_external(&external, deployment.Localhost())
325 /// .deploy(&mut deployment);
326 ///
327 /// deployment.deploy().await.unwrap();
328 /// // establish the TCP connection
329 /// let mut external_recv_stream = nodes.connect(external_handle).await;
330 /// deployment.start().await.unwrap();
331 ///
332 /// for w in 1..=3 {
333 /// assert_eq!(external_recv_stream.next().await, Some(w));
334 /// }
335 /// # });
336 /// # }
337 /// ```
338 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
339 where
340 T: Serialize + DeserializeOwned,
341 {
342 let serialize_pipeline = Some(serialize_bincode::<T>(false));
343
344 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
345
346 let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
347
348 flow_state_borrow.push_root(HydroRoot::SendExternal {
349 to_external_key: other.key,
350 to_port_id: external_port_id,
351 to_many: false,
352 unpaired: true,
353 serialize_fn: serialize_pipeline.map(|e| e.into()),
354 instantiate_fn: DebugInstantiate::Building,
355 input: Box::new(self.ir_node.into_inner()),
356 op_metadata: HydroIrOpMetadata::new(),
357 });
358
359 ExternalBincodeStream {
360 process_key: other.key,
361 port_id: external_port_id,
362 _phantom: PhantomData,
363 }
364 }
365
366 #[cfg(feature = "sim")]
367 /// Sets up a simulation output port for this stream, allowing test code to receive elements
368 /// sent to this stream during simulation.
369 pub fn sim_output(self) -> SimReceiver<T, O, R>
370 where
371 T: Serialize + DeserializeOwned,
372 {
373 let external_location: External<'a, ()> = External {
374 key: LocationKey::FIRST,
375 flow_state: self.location.flow_state().clone(),
376 _phantom: PhantomData,
377 };
378
379 let external = self.send_bincode_external(&external_location);
380
381 SimReceiver(external.port_id, PhantomData)
382 }
383}
384
385impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
386 /// Creates an external output for embedded deployment mode.
387 ///
388 /// The `name` parameter specifies the name of the field in the generated
389 /// `EmbeddedOutputs` struct that will receive elements from this stream.
390 /// The generated function will accept an `EmbeddedOutputs` struct with an
391 /// `impl FnMut(T)` field with this name.
392 pub fn embedded_output(self, name: impl Into<String>) {
393 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
394
395 self.location
396 .flow_state()
397 .borrow_mut()
398 .push_root(HydroRoot::EmbeddedOutput {
399 ident,
400 input: Box::new(self.ir_node.into_inner()),
401 op_metadata: HydroIrOpMetadata::new(),
402 });
403 }
404}
405
406impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
407 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
408{
409 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
410 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
411 /// using [`bincode`] to serialize/deserialize messages.
412 ///
413 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
414 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
415 /// this API allows precise targeting of specific cluster members rather than broadcasting to
416 /// all members.
417 ///
418 /// # Example
419 /// ```rust
420 /// # #[cfg(feature = "deploy")] {
421 /// # use hydro_lang::prelude::*;
422 /// # use futures::StreamExt;
423 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
424 /// let p1 = flow.process::<()>();
425 /// let workers: Cluster<()> = flow.cluster::<()>();
426 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
427 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
428 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
429 /// .demux_bincode(&workers);
430 /// # on_worker.send_bincode(&p2).entries()
431 /// // if there are 4 members in the cluster, each receives one element
432 /// // - MemberId::<()>(0): [0]
433 /// // - MemberId::<()>(1): [1]
434 /// // - MemberId::<()>(2): [2]
435 /// // - MemberId::<()>(3): [3]
436 /// # }, |mut stream| async move {
437 /// # let mut results = Vec::new();
438 /// # for w in 0..4 {
439 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
440 /// # }
441 /// # results.sort();
442 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
443 /// # }));
444 /// # }
445 /// ```
446 pub fn demux_bincode(
447 self,
448 other: &Cluster<'a, L2>,
449 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
450 where
451 T: Serialize + DeserializeOwned,
452 {
453 self.demux(other, TCP.fail_stop().bincode())
454 }
455
456 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
457 /// using the configuration in `via` to set up the message transport.
458 ///
459 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
460 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
461 /// this API allows precise targeting of specific cluster members rather than broadcasting to
462 /// all members.
463 ///
464 /// # Example
465 /// ```rust
466 /// # #[cfg(feature = "deploy")] {
467 /// # use hydro_lang::prelude::*;
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
470 /// let p1 = flow.process::<()>();
471 /// let workers: Cluster<()> = flow.cluster::<()>();
472 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
473 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
474 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
475 /// .demux(&workers, TCP.fail_stop().bincode());
476 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
477 /// // if there are 4 members in the cluster, each receives one element
478 /// // - MemberId::<()>(0): [0]
479 /// // - MemberId::<()>(1): [1]
480 /// // - MemberId::<()>(2): [2]
481 /// // - MemberId::<()>(3): [3]
482 /// # }, |mut stream| async move {
483 /// # let mut results = Vec::new();
484 /// # for w in 0..4 {
485 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
486 /// # }
487 /// # results.sort();
488 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
489 /// # }));
490 /// # }
491 /// ```
492 pub fn demux<N: NetworkFor<T>>(
493 self,
494 to: &Cluster<'a, L2>,
495 via: N,
496 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
497 where
498 T: Serialize + DeserializeOwned,
499 {
500 self.into_keyed().demux(to, via)
501 }
502}
503
504impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
505 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
506 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
507 /// [`bincode`] to serialize/deserialize messages.
508 ///
509 /// This provides load balancing by evenly distributing work across cluster members. The
510 /// distribution is deterministic based on element order - the first element goes to member 0,
511 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
512 ///
513 /// # Non-Determinism
514 /// The set of cluster members may asynchronously change over time. Each element is distributed
515 /// based on the current cluster membership _at that point in time_. Depending on when cluster
516 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
517 /// membership is stable, the order of members in the round-robin pattern may change across runs.
518 ///
519 /// # Ordering Requirements
520 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
521 /// order of messages and retries affects the round-robin pattern.
522 ///
523 /// # Example
524 /// ```rust
525 /// # #[cfg(feature = "deploy")] {
526 /// # use hydro_lang::prelude::*;
527 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
528 /// # use futures::StreamExt;
529 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
530 /// let p1 = flow.process::<()>();
531 /// let workers: Cluster<()> = flow.cluster::<()>();
532 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
533 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
534 /// on_worker.send_bincode(&p2)
535 /// # .first().values() // we use first to assert that each member gets one element
536 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
537 /// // - MemberId::<()>(?): [1]
538 /// // - MemberId::<()>(?): [2]
539 /// // - MemberId::<()>(?): [3]
540 /// // - MemberId::<()>(?): [4]
541 /// # }, |mut stream| async move {
542 /// # let mut results = Vec::new();
543 /// # for w in 0..4 {
544 /// # results.push(stream.next().await.unwrap());
545 /// # }
546 /// # results.sort();
547 /// # assert_eq!(results, vec![1, 2, 3, 4]);
548 /// # }));
549 /// # }
550 /// ```
551 pub fn round_robin_bincode<L2: 'a>(
552 self,
553 other: &Cluster<'a, L2>,
554 nondet_membership: NonDet,
555 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
556 where
557 T: Serialize + DeserializeOwned,
558 {
559 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
560 }
561
562 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
563 /// the configuration in `via` to set up the message transport.
564 ///
565 /// This provides load balancing by evenly distributing work across cluster members. The
566 /// distribution is deterministic based on element order - the first element goes to member 0,
567 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
568 ///
569 /// # Non-Determinism
570 /// The set of cluster members may asynchronously change over time. Each element is distributed
571 /// based on the current cluster membership _at that point in time_. Depending on when cluster
572 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
573 /// membership is stable, the order of members in the round-robin pattern may change across runs.
574 ///
575 /// # Ordering Requirements
576 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
577 /// order of messages and retries affects the round-robin pattern.
578 ///
579 /// # Example
580 /// ```rust
581 /// # #[cfg(feature = "deploy")] {
582 /// # use hydro_lang::prelude::*;
583 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
584 /// # use futures::StreamExt;
585 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
586 /// let p1 = flow.process::<()>();
587 /// let workers: Cluster<()> = flow.cluster::<()>();
588 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
589 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
590 /// on_worker.send(&p2, TCP.fail_stop().bincode())
591 /// # .first().values() // we use first to assert that each member gets one element
592 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
593 /// // - MemberId::<()>(?): [1]
594 /// // - MemberId::<()>(?): [2]
595 /// // - MemberId::<()>(?): [3]
596 /// // - MemberId::<()>(?): [4]
597 /// # }, |mut stream| async move {
598 /// # let mut results = Vec::new();
599 /// # for w in 0..4 {
600 /// # results.push(stream.next().await.unwrap());
601 /// # }
602 /// # results.sort();
603 /// # assert_eq!(results, vec![1, 2, 3, 4]);
604 /// # }));
605 /// # }
606 /// ```
607 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
608 self,
609 to: &Cluster<'a, L2>,
610 via: N,
611 nondet_membership: NonDet,
612 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
613 where
614 T: Serialize + DeserializeOwned,
615 {
616 let ids = track_membership(self.location.source_cluster_members(to));
617 sliced! {
618 let members_snapshot = use(ids, nondet_membership);
619 let elements = use(self.enumerate(), nondet_membership);
620
621 let current_members = members_snapshot
622 .filter(q!(|b| *b))
623 .keys()
624 .assume_ordering(nondet_membership)
625 .collect_vec();
626
627 elements
628 .cross_singleton(current_members)
629 .map(q!(|(data, members)| (
630 members[data.0 % members.len()].clone(),
631 data.1
632 )))
633 }
634 .demux(to, via)
635 }
636}
637
638impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
639 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
640 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
641 /// [`bincode`] to serialize/deserialize messages.
642 ///
643 /// This provides load balancing by evenly distributing work across cluster members. The
644 /// distribution is deterministic based on element order - the first element goes to member 0,
645 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
646 ///
647 /// # Non-Determinism
648 /// The set of cluster members may asynchronously change over time. Each element is distributed
649 /// based on the current cluster membership _at that point in time_. Depending on when cluster
650 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
651 /// membership is stable, the order of members in the round-robin pattern may change across runs.
652 ///
653 /// # Ordering Requirements
654 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
655 /// order of messages and retries affects the round-robin pattern.
656 ///
657 /// # Example
658 /// ```rust
659 /// # #[cfg(feature = "deploy")] {
660 /// # use hydro_lang::prelude::*;
661 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
662 /// # use hydro_lang::location::MemberId;
663 /// # use futures::StreamExt;
664 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
665 /// let p1 = flow.process::<()>();
666 /// let workers1: Cluster<()> = flow.cluster::<()>();
667 /// let workers2: Cluster<()> = flow.cluster::<()>();
668 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
669 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
670 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
671 /// on_worker2.send_bincode(&p2)
672 /// # .entries()
673 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
674 /// # }, |mut stream| async move {
675 /// # let mut results = Vec::new();
676 /// # let mut locations = std::collections::HashSet::new();
677 /// # for w in 0..=16 {
678 /// # let (location, v) = stream.next().await.unwrap();
679 /// # locations.insert(location);
680 /// # results.push(v);
681 /// # }
682 /// # results.sort();
683 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
684 /// # assert_eq!(locations.len(), 16);
685 /// # }));
686 /// # }
687 /// ```
688 pub fn round_robin_bincode<L2: 'a>(
689 self,
690 other: &Cluster<'a, L2>,
691 nondet_membership: NonDet,
692 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
693 where
694 T: Serialize + DeserializeOwned,
695 {
696 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
697 }
698
699 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
700 /// the configuration in `via` to set up the message transport.
701 ///
702 /// This provides load balancing by evenly distributing work across cluster members. The
703 /// distribution is deterministic based on element order - the first element goes to member 0,
704 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
705 ///
706 /// # Non-Determinism
707 /// The set of cluster members may asynchronously change over time. Each element is distributed
708 /// based on the current cluster membership _at that point in time_. Depending on when cluster
709 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
710 /// membership is stable, the order of members in the round-robin pattern may change across runs.
711 ///
712 /// # Ordering Requirements
713 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
714 /// order of messages and retries affects the round-robin pattern.
715 ///
716 /// # Example
717 /// ```rust
718 /// # #[cfg(feature = "deploy")] {
719 /// # use hydro_lang::prelude::*;
720 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
721 /// # use hydro_lang::location::MemberId;
722 /// # use futures::StreamExt;
723 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
724 /// let p1 = flow.process::<()>();
725 /// let workers1: Cluster<()> = flow.cluster::<()>();
726 /// let workers2: Cluster<()> = flow.cluster::<()>();
727 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
728 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
729 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
730 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
731 /// # .entries()
732 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
733 /// # }, |mut stream| async move {
734 /// # let mut results = Vec::new();
735 /// # let mut locations = std::collections::HashSet::new();
736 /// # for w in 0..=16 {
737 /// # let (location, v) = stream.next().await.unwrap();
738 /// # locations.insert(location);
739 /// # results.push(v);
740 /// # }
741 /// # results.sort();
742 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
743 /// # assert_eq!(locations.len(), 16);
744 /// # }));
745 /// # }
746 /// ```
747 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
748 self,
749 to: &Cluster<'a, L2>,
750 via: N,
751 nondet_membership: NonDet,
752 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
753 where
754 T: Serialize + DeserializeOwned,
755 {
756 let ids = track_membership(self.location.source_cluster_members(to));
757 sliced! {
758 let members_snapshot = use(ids, nondet_membership);
759 let elements = use(self.enumerate(), nondet_membership);
760
761 let current_members = members_snapshot
762 .filter(q!(|b| *b))
763 .keys()
764 .assume_ordering(nondet_membership)
765 .collect_vec();
766
767 elements
768 .cross_singleton(current_members)
769 .map(q!(|(data, members)| (
770 members[data.0 % members.len()].clone(),
771 data.1
772 )))
773 }
774 .demux(to, via)
775 }
776}
777
778impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
779 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
780 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
781 /// using [`bincode`] to serialize/deserialize messages.
782 ///
783 /// Each cluster member sends its local stream elements, and they are collected at the destination
784 /// as a [`KeyedStream`] where keys identify the source cluster member.
785 ///
786 /// # Example
787 /// ```rust
788 /// # #[cfg(feature = "deploy")] {
789 /// # use hydro_lang::prelude::*;
790 /// # use futures::StreamExt;
791 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
792 /// let workers: Cluster<()> = flow.cluster::<()>();
793 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
794 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
795 /// # all_received.entries()
796 /// # }, |mut stream| async move {
797 /// // if there are 4 members in the cluster, we should receive 4 elements
798 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
799 /// # let mut results = Vec::new();
800 /// # for w in 0..4 {
801 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
802 /// # }
803 /// # results.sort();
804 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
805 /// # }));
806 /// # }
807 /// ```
808 ///
809 /// If you don't need to know the source for each element, you can use `.values()`
810 /// to get just the data:
811 /// ```rust
812 /// # #[cfg(feature = "deploy")] {
813 /// # use hydro_lang::prelude::*;
814 /// # use hydro_lang::live_collections::stream::NoOrder;
815 /// # use futures::StreamExt;
816 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
817 /// # let workers: Cluster<()> = flow.cluster::<()>();
818 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
819 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
820 /// # values
821 /// # }, |mut stream| async move {
822 /// # let mut results = Vec::new();
823 /// # for w in 0..4 {
824 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
825 /// # }
826 /// # results.sort();
827 /// // if there are 4 members in the cluster, we should receive 4 elements
828 /// // 1, 1, 1, 1
829 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
830 /// # }));
831 /// # }
832 /// ```
833 pub fn send_bincode<L2>(
834 self,
835 other: &Process<'a, L2>,
836 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
837 where
838 T: Serialize + DeserializeOwned,
839 {
840 self.send(other, TCP.fail_stop().bincode())
841 }
842
843 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
844 /// using the configuration in `via` to set up the message transport.
845 ///
846 /// Each cluster member sends its local stream elements, and they are collected at the destination
847 /// as a [`KeyedStream`] where keys identify the source cluster member.
848 ///
849 /// # Example
850 /// ```rust
851 /// # #[cfg(feature = "deploy")] {
852 /// # use hydro_lang::prelude::*;
853 /// # use futures::StreamExt;
854 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
855 /// let workers: Cluster<()> = flow.cluster::<()>();
856 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
857 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
858 /// # all_received.entries()
859 /// # }, |mut stream| async move {
860 /// // if there are 4 members in the cluster, we should receive 4 elements
861 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
862 /// # let mut results = Vec::new();
863 /// # for w in 0..4 {
864 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
865 /// # }
866 /// # results.sort();
867 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
868 /// # }));
869 /// # }
870 /// ```
871 ///
872 /// If you don't need to know the source for each element, you can use `.values()`
873 /// to get just the data:
874 /// ```rust
875 /// # #[cfg(feature = "deploy")] {
876 /// # use hydro_lang::prelude::*;
877 /// # use hydro_lang::live_collections::stream::NoOrder;
878 /// # use futures::StreamExt;
879 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
880 /// # let workers: Cluster<()> = flow.cluster::<()>();
881 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
882 /// let values: Stream<i32, _, _, NoOrder> =
883 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
884 /// # values
885 /// # }, |mut stream| async move {
886 /// # let mut results = Vec::new();
887 /// # for w in 0..4 {
888 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
889 /// # }
890 /// # results.sort();
891 /// // if there are 4 members in the cluster, we should receive 4 elements
892 /// // 1, 1, 1, 1
893 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
894 /// # }));
895 /// # }
896 /// ```
897 pub fn send<L2, N: NetworkFor<T>>(
898 self,
899 to: &Process<'a, L2>,
900 via: N,
901 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
902 where
903 T: Serialize + DeserializeOwned,
904 {
905 let serialize_pipeline = Some(N::serialize_thunk(false));
906
907 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
908
909 let name = via.name();
910 if to.multiversioned() && name.is_none() {
911 panic!(
912 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
913 );
914 }
915
916 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
917 to.clone(),
918 HydroNode::Network {
919 name: name.map(ToOwned::to_owned),
920 serialize_fn: serialize_pipeline.map(|e| e.into()),
921 instantiate_fn: DebugInstantiate::Building,
922 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
923 input: Box::new(self.ir_node.into_inner()),
924 metadata: to.new_node_metadata(Stream::<
925 (MemberId<L>, T),
926 Process<'a, L2>,
927 Unbounded,
928 O,
929 R,
930 >::collection_kind()),
931 },
932 );
933
934 raw_stream.into_keyed()
935 }
936
937 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
938 /// Broadcasts elements of this stream at each source member to all members of a destination
939 /// cluster, using [`bincode`] to serialize/deserialize messages.
940 ///
941 /// Each source member sends each of its stream elements to **every** member of the cluster
942 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
943 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
944 /// **only data elements** and sends each element to all cluster members.
945 ///
946 /// # Non-Determinism
947 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
948 /// to the current cluster members known _at that point in time_ at the source member. Depending
949 /// on when each source member is notified of membership changes, it will broadcast each element
950 /// to different members.
951 ///
952 /// # Example
953 /// ```rust
954 /// # #[cfg(feature = "deploy")] {
955 /// # use hydro_lang::prelude::*;
956 /// # use hydro_lang::location::MemberId;
957 /// # use futures::StreamExt;
958 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
959 /// # type Source = ();
960 /// # type Destination = ();
961 /// let source: Cluster<Source> = flow.cluster::<Source>();
962 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
963 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
964 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
965 /// # on_destination.entries().send_bincode(&p2).entries()
966 /// // if there are 4 members in the desination, each receives one element from each source member
967 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
968 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
969 /// // - ...
970 /// # }, |mut stream| async move {
971 /// # let mut results = Vec::new();
972 /// # for w in 0..16 {
973 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
974 /// # }
975 /// # results.sort();
976 /// # assert_eq!(results, vec![
977 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
978 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
979 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
980 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
981 /// # ]);
982 /// # }));
983 /// # }
984 /// ```
985 pub fn broadcast_bincode<L2: 'a>(
986 self,
987 other: &Cluster<'a, L2>,
988 nondet_membership: NonDet,
989 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
990 where
991 T: Clone + Serialize + DeserializeOwned,
992 {
993 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
994 }
995
996 /// Broadcasts elements of this stream at each source member to all members of a destination
997 /// cluster, using the configuration in `via` to set up the message transport.
998 ///
999 /// Each source member sends each of its stream elements to **every** member of the cluster
1000 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1001 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1002 /// **only data elements** and sends each element to all cluster members.
1003 ///
1004 /// # Non-Determinism
1005 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1006 /// to the current cluster members known _at that point in time_ at the source member. Depending
1007 /// on when each source member is notified of membership changes, it will broadcast each element
1008 /// to different members.
1009 ///
1010 /// # Example
1011 /// ```rust
1012 /// # #[cfg(feature = "deploy")] {
1013 /// # use hydro_lang::prelude::*;
1014 /// # use hydro_lang::location::MemberId;
1015 /// # use futures::StreamExt;
1016 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1017 /// # type Source = ();
1018 /// # type Destination = ();
1019 /// let source: Cluster<Source> = flow.cluster::<Source>();
1020 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1021 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1022 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1023 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1024 /// // if there are 4 members in the desination, each receives one element from each source member
1025 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1026 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1027 /// // - ...
1028 /// # }, |mut stream| async move {
1029 /// # let mut results = Vec::new();
1030 /// # for w in 0..16 {
1031 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1032 /// # }
1033 /// # results.sort();
1034 /// # assert_eq!(results, vec![
1035 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1036 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1037 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1038 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1039 /// # ]);
1040 /// # }));
1041 /// # }
1042 /// ```
1043 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1044 self,
1045 to: &Cluster<'a, L2>,
1046 via: N,
1047 nondet_membership: NonDet,
1048 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1049 where
1050 T: Clone + Serialize + DeserializeOwned,
1051 {
1052 let ids = track_membership(self.location.source_cluster_members(to));
1053 sliced! {
1054 let members_snapshot = use(ids, nondet_membership);
1055 let elements = use(self, nondet_membership);
1056
1057 let current_members = members_snapshot.filter(q!(|b| *b));
1058 elements.repeat_with_keys(current_members)
1059 }
1060 .demux(to, via)
1061 }
1062}
1063
1064impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1065 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1066{
1067 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1068 /// Sends elements of this stream at each source member to specific members of a destination
1069 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1070 ///
1071 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1072 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1073 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1074 /// all members.
1075 ///
1076 /// Each cluster member sends its local stream elements, and they are collected at each
1077 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1078 ///
1079 /// # Example
1080 /// ```rust
1081 /// # #[cfg(feature = "deploy")] {
1082 /// # use hydro_lang::prelude::*;
1083 /// # use futures::StreamExt;
1084 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1085 /// # type Source = ();
1086 /// # type Destination = ();
1087 /// let source: Cluster<Source> = flow.cluster::<Source>();
1088 /// let to_send: Stream<_, Cluster<_>, _> = source
1089 /// .source_iter(q!(vec![0, 1, 2, 3]))
1090 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1091 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1092 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1093 /// # all_received.entries().send_bincode(&p2).entries()
1094 /// # }, |mut stream| async move {
1095 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1096 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1097 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1098 /// // - ...
1099 /// # let mut results = Vec::new();
1100 /// # for w in 0..16 {
1101 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1102 /// # }
1103 /// # results.sort();
1104 /// # assert_eq!(results, vec![
1105 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1106 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1107 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1108 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1109 /// # ]);
1110 /// # }));
1111 /// # }
1112 /// ```
1113 pub fn demux_bincode(
1114 self,
1115 other: &Cluster<'a, L2>,
1116 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1117 where
1118 T: Serialize + DeserializeOwned,
1119 {
1120 self.demux(other, TCP.fail_stop().bincode())
1121 }
1122
1123 /// Sends elements of this stream at each source member to specific members of a destination
1124 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1125 /// message transport.
1126 ///
1127 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1128 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1129 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1130 /// all members.
1131 ///
1132 /// Each cluster member sends its local stream elements, and they are collected at each
1133 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1134 ///
1135 /// # Example
1136 /// ```rust
1137 /// # #[cfg(feature = "deploy")] {
1138 /// # use hydro_lang::prelude::*;
1139 /// # use futures::StreamExt;
1140 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1141 /// # type Source = ();
1142 /// # type Destination = ();
1143 /// let source: Cluster<Source> = flow.cluster::<Source>();
1144 /// let to_send: Stream<_, Cluster<_>, _> = source
1145 /// .source_iter(q!(vec![0, 1, 2, 3]))
1146 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1147 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1148 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1149 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1150 /// # }, |mut stream| async move {
1151 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1152 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1153 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1154 /// // - ...
1155 /// # let mut results = Vec::new();
1156 /// # for w in 0..16 {
1157 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1158 /// # }
1159 /// # results.sort();
1160 /// # assert_eq!(results, vec![
1161 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1162 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1163 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1164 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1165 /// # ]);
1166 /// # }));
1167 /// # }
1168 /// ```
1169 pub fn demux<N: NetworkFor<T>>(
1170 self,
1171 to: &Cluster<'a, L2>,
1172 via: N,
1173 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1174 where
1175 T: Serialize + DeserializeOwned,
1176 {
1177 self.into_keyed().demux(to, via)
1178 }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 #[cfg(feature = "sim")]
1184 use stageleft::q;
1185
1186 #[cfg(feature = "sim")]
1187 use crate::location::{Location, MemberId};
1188 #[cfg(feature = "sim")]
1189 use crate::networking::TCP;
1190 #[cfg(feature = "sim")]
1191 use crate::nondet::nondet;
1192 #[cfg(feature = "sim")]
1193 use crate::prelude::FlowBuilder;
1194
1195 #[cfg(feature = "sim")]
1196 #[test]
1197 fn sim_send_bincode_o2o() {
1198 use crate::networking::TCP;
1199
1200 let mut flow = FlowBuilder::new();
1201 let node = flow.process::<()>();
1202 let node2 = flow.process::<()>();
1203
1204 let (in_send, input) = node.sim_input();
1205
1206 let out_recv = input
1207 .send(&node2, TCP.fail_stop().bincode())
1208 .batch(&node2.tick(), nondet!(/** test */))
1209 .count()
1210 .all_ticks()
1211 .sim_output();
1212
1213 let instances = flow.sim().exhaustive(async || {
1214 in_send.send(());
1215 in_send.send(());
1216 in_send.send(());
1217
1218 let received = out_recv.collect::<Vec<_>>().await;
1219 assert!(received.into_iter().sum::<usize>() == 3);
1220 });
1221
1222 assert_eq!(instances, 4); // 2^{3 - 1}
1223 }
1224
1225 #[cfg(feature = "sim")]
1226 #[test]
1227 fn sim_send_bincode_m2o() {
1228 let mut flow = FlowBuilder::new();
1229 let cluster = flow.cluster::<()>();
1230 let node = flow.process::<()>();
1231
1232 let input = cluster.source_iter(q!(vec![1]));
1233
1234 let out_recv = input
1235 .send(&node, TCP.fail_stop().bincode())
1236 .entries()
1237 .batch(&node.tick(), nondet!(/** test */))
1238 .all_ticks()
1239 .sim_output();
1240
1241 let instances = flow
1242 .sim()
1243 .with_cluster_size(&cluster, 4)
1244 .exhaustive(async || {
1245 out_recv
1246 .assert_yields_only_unordered(vec![
1247 (MemberId::from_raw_id(0), 1),
1248 (MemberId::from_raw_id(1), 1),
1249 (MemberId::from_raw_id(2), 1),
1250 (MemberId::from_raw_id(3), 1),
1251 ])
1252 .await
1253 });
1254
1255 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1256 }
1257
1258 #[cfg(feature = "sim")]
1259 #[test]
1260 fn sim_send_bincode_multiple_m2o() {
1261 let mut flow = FlowBuilder::new();
1262 let cluster1 = flow.cluster::<()>();
1263 let cluster2 = flow.cluster::<()>();
1264 let node = flow.process::<()>();
1265
1266 let out_recv_1 = cluster1
1267 .source_iter(q!(vec![1]))
1268 .send(&node, TCP.fail_stop().bincode())
1269 .entries()
1270 .sim_output();
1271
1272 let out_recv_2 = cluster2
1273 .source_iter(q!(vec![2]))
1274 .send(&node, TCP.fail_stop().bincode())
1275 .entries()
1276 .sim_output();
1277
1278 let instances = flow
1279 .sim()
1280 .with_cluster_size(&cluster1, 3)
1281 .with_cluster_size(&cluster2, 4)
1282 .exhaustive(async || {
1283 out_recv_1
1284 .assert_yields_only_unordered(vec![
1285 (MemberId::from_raw_id(0), 1),
1286 (MemberId::from_raw_id(1), 1),
1287 (MemberId::from_raw_id(2), 1),
1288 ])
1289 .await;
1290
1291 out_recv_2
1292 .assert_yields_only_unordered(vec![
1293 (MemberId::from_raw_id(0), 2),
1294 (MemberId::from_raw_id(1), 2),
1295 (MemberId::from_raw_id(2), 2),
1296 (MemberId::from_raw_id(3), 2),
1297 ])
1298 .await;
1299 });
1300
1301 assert_eq!(instances, 1);
1302 }
1303
1304 #[cfg(feature = "sim")]
1305 #[test]
1306 fn sim_send_bincode_o2m() {
1307 let mut flow = FlowBuilder::new();
1308 let cluster = flow.cluster::<()>();
1309 let node = flow.process::<()>();
1310
1311 let input = node.source_iter(q!(vec![
1312 (MemberId::from_raw_id(0), 123),
1313 (MemberId::from_raw_id(1), 456),
1314 ]));
1315
1316 let out_recv = input
1317 .demux(&cluster, TCP.fail_stop().bincode())
1318 .map(q!(|x| x + 1))
1319 .send(&node, TCP.fail_stop().bincode())
1320 .entries()
1321 .sim_output();
1322
1323 flow.sim()
1324 .with_cluster_size(&cluster, 4)
1325 .exhaustive(async || {
1326 out_recv
1327 .assert_yields_only_unordered(vec![
1328 (MemberId::from_raw_id(0), 124),
1329 (MemberId::from_raw_id(1), 457),
1330 ])
1331 .await
1332 });
1333 }
1334
1335 #[cfg(feature = "sim")]
1336 #[test]
1337 fn sim_broadcast_bincode_o2m() {
1338 let mut flow = FlowBuilder::new();
1339 let cluster = flow.cluster::<()>();
1340 let node = flow.process::<()>();
1341
1342 let input = node.source_iter(q!(vec![123, 456]));
1343
1344 let out_recv = input
1345 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1346 .map(q!(|x| x + 1))
1347 .send(&node, TCP.fail_stop().bincode())
1348 .entries()
1349 .sim_output();
1350
1351 let mut c_1_produced = false;
1352 let mut c_2_produced = false;
1353
1354 flow.sim()
1355 .with_cluster_size(&cluster, 2)
1356 .exhaustive(async || {
1357 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1358
1359 // check that order is preserved
1360 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1361 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1362 c_1_produced = true;
1363 }
1364
1365 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1366 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1367 c_2_produced = true;
1368 }
1369 });
1370
1371 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1372 }
1373
1374 #[cfg(feature = "sim")]
1375 #[test]
1376 fn sim_send_bincode_m2m() {
1377 let mut flow = FlowBuilder::new();
1378 let cluster = flow.cluster::<()>();
1379 let node = flow.process::<()>();
1380
1381 let input = node.source_iter(q!(vec![
1382 (MemberId::from_raw_id(0), 123),
1383 (MemberId::from_raw_id(1), 456),
1384 ]));
1385
1386 let out_recv = input
1387 .demux(&cluster, TCP.fail_stop().bincode())
1388 .map(q!(|x| x + 1))
1389 .flat_map_ordered(q!(|x| vec![
1390 (MemberId::from_raw_id(0), x),
1391 (MemberId::from_raw_id(1), x),
1392 ]))
1393 .demux(&cluster, TCP.fail_stop().bincode())
1394 .entries()
1395 .send(&node, TCP.fail_stop().bincode())
1396 .entries()
1397 .sim_output();
1398
1399 flow.sim()
1400 .with_cluster_size(&cluster, 4)
1401 .exhaustive(async || {
1402 out_recv
1403 .assert_yields_only_unordered(vec![
1404 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1405 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1406 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1407 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1408 ])
1409 .await
1410 });
1411 }
1412}