hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::DynLocation;
19use crate::location::external_process::ExternalBincodeStream;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::networking::{NetworkFor, TCP};
22use crate::nondet::NonDet;
23#[cfg(feature = "sim")]
24use crate::sim::SimReceiver;
25use crate::staging_util::get_this_crate;
26
27// same as the one in `hydro_std`, but internal use only
28fn track_membership<'a, C, L: Location<'a> + NoTick>(
29    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
30) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
31    membership.fold(
32        q!(|| false),
33        q!(|present, event| {
34            match event {
35                MembershipEvent::Joined => *present = true,
36                MembershipEvent::Left => *present = false,
37            }
38        }),
39    )
40}
41
42fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
43    let root = get_this_crate();
44
45    if is_demux {
46        parse_quote! {
47            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
48                |(id, data)| {
49                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
50                }
51            )
52        }
53    } else {
54        parse_quote! {
55            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
56                |data| {
57                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
58                }
59            )
60        }
61    }
62}
63
64pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
65    serialize_bincode_with_type(is_demux, &quote_type::<T>())
66}
67
68fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
69    let root = get_this_crate();
70    if let Some(c_type) = tagged {
71        parse_quote! {
72            |res| {
73                let (id, b) = res.unwrap();
74                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75            }
76        }
77    } else {
78        parse_quote! {
79            |res| {
80                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81            }
82        }
83    }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87    deserialize_bincode_with_type(tagged, &quote_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91    #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
92    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
93    /// using [`bincode`] to serialize/deserialize messages.
94    ///
95    /// The returned stream captures the elements received at the destination, where values will
96    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
97    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
98    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
99    /// dropped no further messages will be sent.
100    ///
101    /// # Example
102    /// ```rust
103    /// # #[cfg(feature = "deploy")] {
104    /// # use hydro_lang::prelude::*;
105    /// # use futures::StreamExt;
106    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
107    /// let p1 = flow.process::<()>();
108    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
109    /// let p2 = flow.process::<()>();
110    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
111    /// // 1, 2, 3
112    /// # on_p2.send_bincode(&p_out)
113    /// # }, |mut stream| async move {
114    /// # for w in 1..=3 {
115    /// #     assert_eq!(stream.next().await, Some(w));
116    /// # }
117    /// # }));
118    /// # }
119    /// ```
120    pub fn send_bincode<L2>(
121        self,
122        other: &Process<'a, L2>,
123    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
124    where
125        T: Serialize + DeserializeOwned,
126    {
127        self.send(other, TCP.bincode())
128    }
129
130    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
131    /// using the configuration in `via` to set up the message transport.
132    ///
133    /// The returned stream captures the elements received at the destination, where values will
134    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
135    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
136    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
137    /// dropped no further messages will be sent.
138    ///
139    /// # Example
140    /// ```rust
141    /// # #[cfg(feature = "deploy")] {
142    /// # use hydro_lang::prelude::*;
143    /// # use futures::StreamExt;
144    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
145    /// let p1 = flow.process::<()>();
146    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
147    /// let p2 = flow.process::<()>();
148    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
149    /// // 1, 2, 3
150    /// # on_p2.send(&p_out, TCP.bincode())
151    /// # }, |mut stream| async move {
152    /// # for w in 1..=3 {
153    /// #     assert_eq!(stream.next().await, Some(w));
154    /// # }
155    /// # }));
156    /// # }
157    /// ```
158    pub fn send<L2, N: NetworkFor<T>>(
159        self,
160        to: &Process<'a, L2>,
161        via: N,
162    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
163    where
164        T: Serialize + DeserializeOwned,
165    {
166        let _ = via;
167        let serialize_pipeline = Some(N::serialize_thunk(false));
168        let deserialize_pipeline = Some(N::deserialize_thunk(None));
169
170        Stream::new(
171            to.clone(),
172            HydroNode::Network {
173                serialize_fn: serialize_pipeline.map(|e| e.into()),
174                instantiate_fn: DebugInstantiate::Building,
175                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
176                input: Box::new(self.ir_node.into_inner()),
177                metadata: to.new_node_metadata(
178                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
179                ),
180            },
181        )
182    }
183
184    #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
185    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
186    /// using [`bincode`] to serialize/deserialize messages.
187    ///
188    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
189    /// membership information. This is a common pattern in distributed systems for broadcasting data to
190    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
191    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
192    /// each element to all cluster members.
193    ///
194    /// # Non-Determinism
195    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
196    /// to the current cluster members _at that point in time_. Depending on when we are notified of
197    /// membership changes, we will broadcast each element to different members.
198    ///
199    /// # Example
200    /// ```rust
201    /// # #[cfg(feature = "deploy")] {
202    /// # use hydro_lang::prelude::*;
203    /// # use futures::StreamExt;
204    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
205    /// let p1 = flow.process::<()>();
206    /// let workers: Cluster<()> = flow.cluster::<()>();
207    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
208    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
209    /// # on_worker.send_bincode(&p2).entries()
210    /// // if there are 4 members in the cluster, each receives one element
211    /// // - MemberId::<()>(0): [123]
212    /// // - MemberId::<()>(1): [123]
213    /// // - MemberId::<()>(2): [123]
214    /// // - MemberId::<()>(3): [123]
215    /// # }, |mut stream| async move {
216    /// # let mut results = Vec::new();
217    /// # for w in 0..4 {
218    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
219    /// # }
220    /// # results.sort();
221    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
222    /// # }));
223    /// # }
224    /// ```
225    pub fn broadcast_bincode<L2: 'a>(
226        self,
227        other: &Cluster<'a, L2>,
228        nondet_membership: NonDet,
229    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
230    where
231        T: Clone + Serialize + DeserializeOwned,
232    {
233        self.broadcast(other, TCP.bincode(), nondet_membership)
234    }
235
236    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
237    /// using the configuration in `via` to set up the message transport.
238    ///
239    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
240    /// membership information. This is a common pattern in distributed systems for broadcasting data to
241    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
242    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
243    /// each element to all cluster members.
244    ///
245    /// # Non-Determinism
246    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
247    /// to the current cluster members _at that point in time_. Depending on when we are notified of
248    /// membership changes, we will broadcast each element to different members.
249    ///
250    /// # Example
251    /// ```rust
252    /// # #[cfg(feature = "deploy")] {
253    /// # use hydro_lang::prelude::*;
254    /// # use futures::StreamExt;
255    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
256    /// let p1 = flow.process::<()>();
257    /// let workers: Cluster<()> = flow.cluster::<()>();
258    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
259    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
260    /// # on_worker.send(&p2, TCP.bincode()).entries()
261    /// // if there are 4 members in the cluster, each receives one element
262    /// // - MemberId::<()>(0): [123]
263    /// // - MemberId::<()>(1): [123]
264    /// // - MemberId::<()>(2): [123]
265    /// // - MemberId::<()>(3): [123]
266    /// # }, |mut stream| async move {
267    /// # let mut results = Vec::new();
268    /// # for w in 0..4 {
269    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
270    /// # }
271    /// # results.sort();
272    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
273    /// # }));
274    /// # }
275    /// ```
276    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
277        self,
278        to: &Cluster<'a, L2>,
279        via: N,
280        nondet_membership: NonDet,
281    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
282    where
283        T: Clone + Serialize + DeserializeOwned,
284    {
285        let ids = track_membership(self.location.source_cluster_members(to));
286        sliced! {
287            let members_snapshot = use(ids, nondet_membership);
288            let elements = use(self, nondet_membership);
289
290            let current_members = members_snapshot.filter(q!(|b| *b));
291            elements.repeat_with_keys(current_members)
292        }
293        .demux(to, via)
294    }
295
296    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
297    /// serialization. The external process can receive these elements by establishing a TCP
298    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
299    ///
300    /// # Example
301    /// ```rust
302    /// # #[cfg(feature = "deploy")] {
303    /// # use hydro_lang::prelude::*;
304    /// # use futures::StreamExt;
305    /// # tokio_test::block_on(async move {
306    /// let flow = FlowBuilder::new();
307    /// let process = flow.process::<()>();
308    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
309    /// let external = flow.external::<()>();
310    /// let external_handle = numbers.send_bincode_external(&external);
311    ///
312    /// let mut deployment = hydro_deploy::Deployment::new();
313    /// let nodes = flow
314    ///     .with_process(&process, deployment.Localhost())
315    ///     .with_external(&external, deployment.Localhost())
316    ///     .deploy(&mut deployment);
317    ///
318    /// deployment.deploy().await.unwrap();
319    /// // establish the TCP connection
320    /// let mut external_recv_stream = nodes.connect(external_handle).await;
321    /// deployment.start().await.unwrap();
322    ///
323    /// for w in 1..=3 {
324    ///     assert_eq!(external_recv_stream.next().await, Some(w));
325    /// }
326    /// # });
327    /// # }
328    /// ```
329    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
330    where
331        T: Serialize + DeserializeOwned,
332    {
333        let serialize_pipeline = Some(serialize_bincode::<T>(false));
334
335        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
336
337        let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
338
339        flow_state_borrow.push_root(HydroRoot::SendExternal {
340            to_external_id: other.id,
341            to_port_id: external_port_id,
342            to_many: false,
343            unpaired: true,
344            serialize_fn: serialize_pipeline.map(|e| e.into()),
345            instantiate_fn: DebugInstantiate::Building,
346            input: Box::new(self.ir_node.into_inner()),
347            op_metadata: HydroIrOpMetadata::new(),
348        });
349
350        ExternalBincodeStream {
351            process_id: other.id,
352            port_id: external_port_id,
353            _phantom: PhantomData,
354        }
355    }
356
357    #[cfg(feature = "sim")]
358    /// Sets up a simulation output port for this stream, allowing test code to receive elements
359    /// sent to this stream during simulation.
360    pub fn sim_output(self) -> SimReceiver<T, O, R>
361    where
362        T: Serialize + DeserializeOwned,
363    {
364        let external_location: External<'a, ()> = External {
365            id: 0,
366            flow_state: self.location.flow_state().clone(),
367            _phantom: PhantomData,
368        };
369
370        let external = self.send_bincode_external(&external_location);
371
372        SimReceiver(external.port_id, PhantomData)
373    }
374}
375
376impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
377    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
378{
379    #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
380    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
381    /// using [`bincode`] to serialize/deserialize messages.
382    ///
383    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
384    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
385    /// this API allows precise targeting of specific cluster members rather than broadcasting to
386    /// all members.
387    ///
388    /// # Example
389    /// ```rust
390    /// # #[cfg(feature = "deploy")] {
391    /// # use hydro_lang::prelude::*;
392    /// # use futures::StreamExt;
393    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
394    /// let p1 = flow.process::<()>();
395    /// let workers: Cluster<()> = flow.cluster::<()>();
396    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
397    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
398    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
399    ///     .demux_bincode(&workers);
400    /// # on_worker.send_bincode(&p2).entries()
401    /// // if there are 4 members in the cluster, each receives one element
402    /// // - MemberId::<()>(0): [0]
403    /// // - MemberId::<()>(1): [1]
404    /// // - MemberId::<()>(2): [2]
405    /// // - MemberId::<()>(3): [3]
406    /// # }, |mut stream| async move {
407    /// # let mut results = Vec::new();
408    /// # for w in 0..4 {
409    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
410    /// # }
411    /// # results.sort();
412    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
413    /// # }));
414    /// # }
415    /// ```
416    pub fn demux_bincode(
417        self,
418        other: &Cluster<'a, L2>,
419    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
420    where
421        T: Serialize + DeserializeOwned,
422    {
423        self.demux(other, TCP.bincode())
424    }
425
426    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
427    /// using the configuration in `via` to set up the message transport.
428    ///
429    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
430    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
431    /// this API allows precise targeting of specific cluster members rather than broadcasting to
432    /// all members.
433    ///
434    /// # Example
435    /// ```rust
436    /// # #[cfg(feature = "deploy")] {
437    /// # use hydro_lang::prelude::*;
438    /// # use futures::StreamExt;
439    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
440    /// let p1 = flow.process::<()>();
441    /// let workers: Cluster<()> = flow.cluster::<()>();
442    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
443    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
444    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
445    ///     .demux(&workers, TCP.bincode());
446    /// # on_worker.send(&p2, TCP.bincode()).entries()
447    /// // if there are 4 members in the cluster, each receives one element
448    /// // - MemberId::<()>(0): [0]
449    /// // - MemberId::<()>(1): [1]
450    /// // - MemberId::<()>(2): [2]
451    /// // - MemberId::<()>(3): [3]
452    /// # }, |mut stream| async move {
453    /// # let mut results = Vec::new();
454    /// # for w in 0..4 {
455    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
456    /// # }
457    /// # results.sort();
458    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
459    /// # }));
460    /// # }
461    /// ```
462    pub fn demux<N: NetworkFor<T>>(
463        self,
464        to: &Cluster<'a, L2>,
465        via: N,
466    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
467    where
468        T: Serialize + DeserializeOwned,
469    {
470        self.into_keyed().demux(to, via)
471    }
472}
473
474impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
475    #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
476    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
477    /// [`bincode`] to serialize/deserialize messages.
478    ///
479    /// This provides load balancing by evenly distributing work across cluster members. The
480    /// distribution is deterministic based on element order - the first element goes to member 0,
481    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
482    ///
483    /// # Non-Determinism
484    /// The set of cluster members may asynchronously change over time. Each element is distributed
485    /// based on the current cluster membership _at that point in time_. Depending on when cluster
486    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
487    /// membership is stable, the order of members in the round-robin pattern may change across runs.
488    ///
489    /// # Ordering Requirements
490    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
491    /// order of messages and retries affects the round-robin pattern.
492    ///
493    /// # Example
494    /// ```rust
495    /// # #[cfg(feature = "deploy")] {
496    /// # use hydro_lang::prelude::*;
497    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
498    /// # use futures::StreamExt;
499    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
500    /// let p1 = flow.process::<()>();
501    /// let workers: Cluster<()> = flow.cluster::<()>();
502    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
503    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
504    /// on_worker.send_bincode(&p2)
505    /// # .first().values() // we use first to assert that each member gets one element
506    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
507    /// // - MemberId::<()>(?): [1]
508    /// // - MemberId::<()>(?): [2]
509    /// // - MemberId::<()>(?): [3]
510    /// // - MemberId::<()>(?): [4]
511    /// # }, |mut stream| async move {
512    /// # let mut results = Vec::new();
513    /// # for w in 0..4 {
514    /// #     results.push(stream.next().await.unwrap());
515    /// # }
516    /// # results.sort();
517    /// # assert_eq!(results, vec![1, 2, 3, 4]);
518    /// # }));
519    /// # }
520    /// ```
521    pub fn round_robin_bincode<L2: 'a>(
522        self,
523        other: &Cluster<'a, L2>,
524        nondet_membership: NonDet,
525    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
526    where
527        T: Serialize + DeserializeOwned,
528    {
529        self.round_robin(other, TCP.bincode(), nondet_membership)
530    }
531
532    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
533    /// the configuration in `via` to set up the message transport.
534    ///
535    /// This provides load balancing by evenly distributing work across cluster members. The
536    /// distribution is deterministic based on element order - the first element goes to member 0,
537    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
538    ///
539    /// # Non-Determinism
540    /// The set of cluster members may asynchronously change over time. Each element is distributed
541    /// based on the current cluster membership _at that point in time_. Depending on when cluster
542    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
543    /// membership is stable, the order of members in the round-robin pattern may change across runs.
544    ///
545    /// # Ordering Requirements
546    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
547    /// order of messages and retries affects the round-robin pattern.
548    ///
549    /// # Example
550    /// ```rust
551    /// # #[cfg(feature = "deploy")] {
552    /// # use hydro_lang::prelude::*;
553    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
554    /// # use futures::StreamExt;
555    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
556    /// let p1 = flow.process::<()>();
557    /// let workers: Cluster<()> = flow.cluster::<()>();
558    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
559    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
560    /// on_worker.send(&p2, TCP.bincode())
561    /// # .first().values() // we use first to assert that each member gets one element
562    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
563    /// // - MemberId::<()>(?): [1]
564    /// // - MemberId::<()>(?): [2]
565    /// // - MemberId::<()>(?): [3]
566    /// // - MemberId::<()>(?): [4]
567    /// # }, |mut stream| async move {
568    /// # let mut results = Vec::new();
569    /// # for w in 0..4 {
570    /// #     results.push(stream.next().await.unwrap());
571    /// # }
572    /// # results.sort();
573    /// # assert_eq!(results, vec![1, 2, 3, 4]);
574    /// # }));
575    /// # }
576    /// ```
577    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
578        self,
579        to: &Cluster<'a, L2>,
580        via: N,
581        nondet_membership: NonDet,
582    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
583    where
584        T: Serialize + DeserializeOwned,
585    {
586        let ids = track_membership(self.location.source_cluster_members(to));
587        sliced! {
588            let members_snapshot = use(ids, nondet_membership);
589            let elements = use(self.enumerate(), nondet_membership);
590
591            let current_members = members_snapshot
592                .filter(q!(|b| *b))
593                .keys()
594                .assume_ordering(nondet_membership)
595                .collect_vec();
596
597            elements
598                .cross_singleton(current_members)
599                .map(q!(|(data, members)| (
600                    members[data.0 % members.len()].clone(),
601                    data.1
602                )))
603        }
604        .demux(to, via)
605    }
606}
607
608impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
609    #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
610    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
611    /// [`bincode`] to serialize/deserialize messages.
612    ///
613    /// This provides load balancing by evenly distributing work across cluster members. The
614    /// distribution is deterministic based on element order - the first element goes to member 0,
615    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
616    ///
617    /// # Non-Determinism
618    /// The set of cluster members may asynchronously change over time. Each element is distributed
619    /// based on the current cluster membership _at that point in time_. Depending on when cluster
620    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
621    /// membership is stable, the order of members in the round-robin pattern may change across runs.
622    ///
623    /// # Ordering Requirements
624    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
625    /// order of messages and retries affects the round-robin pattern.
626    ///
627    /// # Example
628    /// ```rust
629    /// # #[cfg(feature = "deploy")] {
630    /// # use hydro_lang::prelude::*;
631    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
632    /// # use hydro_lang::location::MemberId;
633    /// # use futures::StreamExt;
634    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
635    /// let p1 = flow.process::<()>();
636    /// let workers1: Cluster<()> = flow.cluster::<()>();
637    /// let workers2: Cluster<()> = flow.cluster::<()>();
638    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
639    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
640    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
641    /// on_worker2.send_bincode(&p2)
642    /// # .entries()
643    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
644    /// # }, |mut stream| async move {
645    /// # let mut results = Vec::new();
646    /// # let mut locations = std::collections::HashSet::new();
647    /// # for w in 0..=16 {
648    /// #     let (location, v) = stream.next().await.unwrap();
649    /// #     locations.insert(location);
650    /// #     results.push(v);
651    /// # }
652    /// # results.sort();
653    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
654    /// # assert_eq!(locations.len(), 16);
655    /// # }));
656    /// # }
657    /// ```
658    pub fn round_robin_bincode<L2: 'a>(
659        self,
660        other: &Cluster<'a, L2>,
661        nondet_membership: NonDet,
662    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
663    where
664        T: Serialize + DeserializeOwned,
665    {
666        self.round_robin(other, TCP.bincode(), nondet_membership)
667    }
668
669    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
670    /// the configuration in `via` to set up the message transport.
671    ///
672    /// This provides load balancing by evenly distributing work across cluster members. The
673    /// distribution is deterministic based on element order - the first element goes to member 0,
674    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
675    ///
676    /// # Non-Determinism
677    /// The set of cluster members may asynchronously change over time. Each element is distributed
678    /// based on the current cluster membership _at that point in time_. Depending on when cluster
679    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
680    /// membership is stable, the order of members in the round-robin pattern may change across runs.
681    ///
682    /// # Ordering Requirements
683    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
684    /// order of messages and retries affects the round-robin pattern.
685    ///
686    /// # Example
687    /// ```rust
688    /// # #[cfg(feature = "deploy")] {
689    /// # use hydro_lang::prelude::*;
690    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
691    /// # use hydro_lang::location::MemberId;
692    /// # use futures::StreamExt;
693    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
694    /// let p1 = flow.process::<()>();
695    /// let workers1: Cluster<()> = flow.cluster::<()>();
696    /// let workers2: Cluster<()> = flow.cluster::<()>();
697    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
698    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.bincode(), nondet!(/** assuming stable membership */));
699    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
700    /// on_worker2.send(&p2, TCP.bincode())
701    /// # .entries()
702    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
703    /// # }, |mut stream| async move {
704    /// # let mut results = Vec::new();
705    /// # let mut locations = std::collections::HashSet::new();
706    /// # for w in 0..=16 {
707    /// #     let (location, v) = stream.next().await.unwrap();
708    /// #     locations.insert(location);
709    /// #     results.push(v);
710    /// # }
711    /// # results.sort();
712    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
713    /// # assert_eq!(locations.len(), 16);
714    /// # }));
715    /// # }
716    /// ```
717    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
718        self,
719        to: &Cluster<'a, L2>,
720        via: N,
721        nondet_membership: NonDet,
722    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
723    where
724        T: Serialize + DeserializeOwned,
725    {
726        let ids = track_membership(self.location.source_cluster_members(to));
727        sliced! {
728            let members_snapshot = use(ids, nondet_membership);
729            let elements = use(self.enumerate(), nondet_membership);
730
731            let current_members = members_snapshot
732                .filter(q!(|b| *b))
733                .keys()
734                .assume_ordering(nondet_membership)
735                .collect_vec();
736
737            elements
738                .cross_singleton(current_members)
739                .map(q!(|(data, members)| (
740                    members[data.0 % members.len()].clone(),
741                    data.1
742                )))
743        }
744        .demux(to, via)
745    }
746}
747
748impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
749    #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
750    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
751    /// using [`bincode`] to serialize/deserialize messages.
752    ///
753    /// Each cluster member sends its local stream elements, and they are collected at the destination
754    /// as a [`KeyedStream`] where keys identify the source cluster member.
755    ///
756    /// # Example
757    /// ```rust
758    /// # #[cfg(feature = "deploy")] {
759    /// # use hydro_lang::prelude::*;
760    /// # use futures::StreamExt;
761    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
762    /// let workers: Cluster<()> = flow.cluster::<()>();
763    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
764    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
765    /// # all_received.entries()
766    /// # }, |mut stream| async move {
767    /// // if there are 4 members in the cluster, we should receive 4 elements
768    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
769    /// # let mut results = Vec::new();
770    /// # for w in 0..4 {
771    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
772    /// # }
773    /// # results.sort();
774    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
775    /// # }));
776    /// # }
777    /// ```
778    ///
779    /// If you don't need to know the source for each element, you can use `.values()`
780    /// to get just the data:
781    /// ```rust
782    /// # #[cfg(feature = "deploy")] {
783    /// # use hydro_lang::prelude::*;
784    /// # use hydro_lang::live_collections::stream::NoOrder;
785    /// # use futures::StreamExt;
786    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
787    /// # let workers: Cluster<()> = flow.cluster::<()>();
788    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
789    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
790    /// # values
791    /// # }, |mut stream| async move {
792    /// # let mut results = Vec::new();
793    /// # for w in 0..4 {
794    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
795    /// # }
796    /// # results.sort();
797    /// // if there are 4 members in the cluster, we should receive 4 elements
798    /// // 1, 1, 1, 1
799    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
800    /// # }));
801    /// # }
802    /// ```
803    pub fn send_bincode<L2>(
804        self,
805        other: &Process<'a, L2>,
806    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
807    where
808        T: Serialize + DeserializeOwned,
809    {
810        self.send(other, TCP.bincode())
811    }
812
813    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
814    /// using the configuration in `via` to set up the message transport.
815    ///
816    /// Each cluster member sends its local stream elements, and they are collected at the destination
817    /// as a [`KeyedStream`] where keys identify the source cluster member.
818    ///
819    /// # Example
820    /// ```rust
821    /// # #[cfg(feature = "deploy")] {
822    /// # use hydro_lang::prelude::*;
823    /// # use futures::StreamExt;
824    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
825    /// let workers: Cluster<()> = flow.cluster::<()>();
826    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
827    /// let all_received = numbers.send(&process, TCP.bincode()); // KeyedStream<MemberId<()>, i32, ...>
828    /// # all_received.entries()
829    /// # }, |mut stream| async move {
830    /// // if there are 4 members in the cluster, we should receive 4 elements
831    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
832    /// # let mut results = Vec::new();
833    /// # for w in 0..4 {
834    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
835    /// # }
836    /// # results.sort();
837    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
838    /// # }));
839    /// # }
840    /// ```
841    ///
842    /// If you don't need to know the source for each element, you can use `.values()`
843    /// to get just the data:
844    /// ```rust
845    /// # #[cfg(feature = "deploy")] {
846    /// # use hydro_lang::prelude::*;
847    /// # use hydro_lang::live_collections::stream::NoOrder;
848    /// # use futures::StreamExt;
849    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
850    /// # let workers: Cluster<()> = flow.cluster::<()>();
851    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
852    /// let values: Stream<i32, _, _, NoOrder> = numbers.send(&process, TCP.bincode()).values();
853    /// # values
854    /// # }, |mut stream| async move {
855    /// # let mut results = Vec::new();
856    /// # for w in 0..4 {
857    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
858    /// # }
859    /// # results.sort();
860    /// // if there are 4 members in the cluster, we should receive 4 elements
861    /// // 1, 1, 1, 1
862    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
863    /// # }));
864    /// # }
865    /// ```
866    pub fn send<L2, N: NetworkFor<T>>(
867        self,
868        to: &Process<'a, L2>,
869        via: N,
870    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
871    where
872        T: Serialize + DeserializeOwned,
873    {
874        let _ = via;
875        let serialize_pipeline = Some(N::serialize_thunk(false));
876
877        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
878
879        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
880            to.clone(),
881            HydroNode::Network {
882                serialize_fn: serialize_pipeline.map(|e| e.into()),
883                instantiate_fn: DebugInstantiate::Building,
884                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
885                input: Box::new(self.ir_node.into_inner()),
886                metadata: to.new_node_metadata(Stream::<
887                    (MemberId<L>, T),
888                    Process<'a, L2>,
889                    Unbounded,
890                    O,
891                    R,
892                >::collection_kind()),
893            },
894        );
895
896        raw_stream.into_keyed()
897    }
898
899    #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
900    /// Broadcasts elements of this stream at each source member to all members of a destination
901    /// cluster, using [`bincode`] to serialize/deserialize messages.
902    ///
903    /// Each source member sends each of its stream elements to **every** member of the cluster
904    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
905    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
906    /// **only data elements** and sends each element to all cluster members.
907    ///
908    /// # Non-Determinism
909    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
910    /// to the current cluster members known _at that point in time_ at the source member. Depending
911    /// on when each source member is notified of membership changes, it will broadcast each element
912    /// to different members.
913    ///
914    /// # Example
915    /// ```rust
916    /// # #[cfg(feature = "deploy")] {
917    /// # use hydro_lang::prelude::*;
918    /// # use hydro_lang::location::MemberId;
919    /// # use futures::StreamExt;
920    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
921    /// # type Source = ();
922    /// # type Destination = ();
923    /// let source: Cluster<Source> = flow.cluster::<Source>();
924    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
925    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
926    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
927    /// # on_destination.entries().send_bincode(&p2).entries()
928    /// // if there are 4 members in the desination, each receives one element from each source member
929    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
930    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
931    /// // - ...
932    /// # }, |mut stream| async move {
933    /// # let mut results = Vec::new();
934    /// # for w in 0..16 {
935    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
936    /// # }
937    /// # results.sort();
938    /// # assert_eq!(results, vec![
939    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
940    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
941    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
942    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
943    /// # ]);
944    /// # }));
945    /// # }
946    /// ```
947    pub fn broadcast_bincode<L2: 'a>(
948        self,
949        other: &Cluster<'a, L2>,
950        nondet_membership: NonDet,
951    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
952    where
953        T: Clone + Serialize + DeserializeOwned,
954    {
955        self.broadcast(other, TCP.bincode(), nondet_membership)
956    }
957
958    /// Broadcasts elements of this stream at each source member to all members of a destination
959    /// cluster, using the configuration in `via` to set up the message transport.
960    ///
961    /// Each source member sends each of its stream elements to **every** member of the cluster
962    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
963    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
964    /// **only data elements** and sends each element to all cluster members.
965    ///
966    /// # Non-Determinism
967    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
968    /// to the current cluster members known _at that point in time_ at the source member. Depending
969    /// on when each source member is notified of membership changes, it will broadcast each element
970    /// to different members.
971    ///
972    /// # Example
973    /// ```rust
974    /// # #[cfg(feature = "deploy")] {
975    /// # use hydro_lang::prelude::*;
976    /// # use hydro_lang::location::MemberId;
977    /// # use futures::StreamExt;
978    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
979    /// # type Source = ();
980    /// # type Destination = ();
981    /// let source: Cluster<Source> = flow.cluster::<Source>();
982    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
983    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
984    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.bincode(), nondet!(/** assuming stable membership */));
985    /// # on_destination.entries().send(&p2, TCP.bincode()).entries()
986    /// // if there are 4 members in the desination, each receives one element from each source member
987    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
988    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
989    /// // - ...
990    /// # }, |mut stream| async move {
991    /// # let mut results = Vec::new();
992    /// # for w in 0..16 {
993    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
994    /// # }
995    /// # results.sort();
996    /// # assert_eq!(results, vec![
997    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
998    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
999    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1000    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1001    /// # ]);
1002    /// # }));
1003    /// # }
1004    /// ```
1005    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1006        self,
1007        to: &Cluster<'a, L2>,
1008        via: N,
1009        nondet_membership: NonDet,
1010    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1011    where
1012        T: Clone + Serialize + DeserializeOwned,
1013    {
1014        let ids = track_membership(self.location.source_cluster_members(to));
1015        sliced! {
1016            let members_snapshot = use(ids, nondet_membership);
1017            let elements = use(self, nondet_membership);
1018
1019            let current_members = members_snapshot.filter(q!(|b| *b));
1020            elements.repeat_with_keys(current_members)
1021        }
1022        .demux(to, via)
1023    }
1024}
1025
1026impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1027    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1028{
1029    #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
1030    /// Sends elements of this stream at each source member to specific members of a destination
1031    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1032    ///
1033    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1034    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1035    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1036    /// all members.
1037    ///
1038    /// Each cluster member sends its local stream elements, and they are collected at each
1039    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1040    ///
1041    /// # Example
1042    /// ```rust
1043    /// # #[cfg(feature = "deploy")] {
1044    /// # use hydro_lang::prelude::*;
1045    /// # use futures::StreamExt;
1046    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1047    /// # type Source = ();
1048    /// # type Destination = ();
1049    /// let source: Cluster<Source> = flow.cluster::<Source>();
1050    /// let to_send: Stream<_, Cluster<_>, _> = source
1051    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1052    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1053    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1054    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1055    /// # all_received.entries().send_bincode(&p2).entries()
1056    /// # }, |mut stream| async move {
1057    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1058    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1059    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1060    /// // - ...
1061    /// # let mut results = Vec::new();
1062    /// # for w in 0..16 {
1063    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1064    /// # }
1065    /// # results.sort();
1066    /// # assert_eq!(results, vec![
1067    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1068    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1069    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1070    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1071    /// # ]);
1072    /// # }));
1073    /// # }
1074    /// ```
1075    pub fn demux_bincode(
1076        self,
1077        other: &Cluster<'a, L2>,
1078    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1079    where
1080        T: Serialize + DeserializeOwned,
1081    {
1082        self.demux(other, TCP.bincode())
1083    }
1084
1085    /// Sends elements of this stream at each source member to specific members of a destination
1086    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1087    /// message transport.
1088    ///
1089    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1090    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1091    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1092    /// all members.
1093    ///
1094    /// Each cluster member sends its local stream elements, and they are collected at each
1095    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1096    ///
1097    /// # Example
1098    /// ```rust
1099    /// # #[cfg(feature = "deploy")] {
1100    /// # use hydro_lang::prelude::*;
1101    /// # use futures::StreamExt;
1102    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1103    /// # type Source = ();
1104    /// # type Destination = ();
1105    /// let source: Cluster<Source> = flow.cluster::<Source>();
1106    /// let to_send: Stream<_, Cluster<_>, _> = source
1107    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1108    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1109    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1110    /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1111    /// # all_received.entries().send(&p2, TCP.bincode()).entries()
1112    /// # }, |mut stream| async move {
1113    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1114    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1115    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1116    /// // - ...
1117    /// # let mut results = Vec::new();
1118    /// # for w in 0..16 {
1119    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1120    /// # }
1121    /// # results.sort();
1122    /// # assert_eq!(results, vec![
1123    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1124    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1125    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1126    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1127    /// # ]);
1128    /// # }));
1129    /// # }
1130    /// ```
1131    pub fn demux<N: NetworkFor<T>>(
1132        self,
1133        to: &Cluster<'a, L2>,
1134        via: N,
1135    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1136    where
1137        T: Serialize + DeserializeOwned,
1138    {
1139        self.into_keyed().demux(to, via)
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    #[cfg(feature = "sim")]
1146    use stageleft::q;
1147
1148    #[cfg(feature = "sim")]
1149    use crate::location::{Location, MemberId};
1150    #[cfg(feature = "sim")]
1151    use crate::networking::TCP;
1152    #[cfg(feature = "sim")]
1153    use crate::nondet::nondet;
1154    #[cfg(feature = "sim")]
1155    use crate::prelude::FlowBuilder;
1156
1157    #[cfg(feature = "sim")]
1158    #[test]
1159    fn sim_send_bincode_o2o() {
1160        use crate::networking::TCP;
1161
1162        let flow = FlowBuilder::new();
1163        let node = flow.process::<()>();
1164        let node2 = flow.process::<()>();
1165
1166        let (in_send, input) = node.sim_input();
1167
1168        let out_recv = input
1169            .send(&node2, TCP.bincode())
1170            .batch(&node2.tick(), nondet!(/** test */))
1171            .count()
1172            .all_ticks()
1173            .sim_output();
1174
1175        let instances = flow.sim().exhaustive(async || {
1176            in_send.send(());
1177            in_send.send(());
1178            in_send.send(());
1179
1180            let received = out_recv.collect::<Vec<_>>().await;
1181            assert!(received.into_iter().sum::<usize>() == 3);
1182        });
1183
1184        assert_eq!(instances, 4); // 2^{3 - 1}
1185    }
1186
1187    #[cfg(feature = "sim")]
1188    #[test]
1189    fn sim_send_bincode_m2o() {
1190        let flow = FlowBuilder::new();
1191        let cluster = flow.cluster::<()>();
1192        let node = flow.process::<()>();
1193
1194        let input = cluster.source_iter(q!(vec![1]));
1195
1196        let out_recv = input
1197            .send(&node, TCP.bincode())
1198            .entries()
1199            .batch(&node.tick(), nondet!(/** test */))
1200            .all_ticks()
1201            .sim_output();
1202
1203        let instances = flow
1204            .sim()
1205            .with_cluster_size(&cluster, 4)
1206            .exhaustive(async || {
1207                out_recv
1208                    .assert_yields_only_unordered(vec![
1209                        (MemberId::from_raw_id(0), 1),
1210                        (MemberId::from_raw_id(1), 1),
1211                        (MemberId::from_raw_id(2), 1),
1212                        (MemberId::from_raw_id(3), 1),
1213                    ])
1214                    .await
1215            });
1216
1217        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1218    }
1219
1220    #[cfg(feature = "sim")]
1221    #[test]
1222    fn sim_send_bincode_multiple_m2o() {
1223        let flow = FlowBuilder::new();
1224        let cluster1 = flow.cluster::<()>();
1225        let cluster2 = flow.cluster::<()>();
1226        let node = flow.process::<()>();
1227
1228        let out_recv_1 = cluster1
1229            .source_iter(q!(vec![1]))
1230            .send(&node, TCP.bincode())
1231            .entries()
1232            .sim_output();
1233
1234        let out_recv_2 = cluster2
1235            .source_iter(q!(vec![2]))
1236            .send(&node, TCP.bincode())
1237            .entries()
1238            .sim_output();
1239
1240        let instances = flow
1241            .sim()
1242            .with_cluster_size(&cluster1, 3)
1243            .with_cluster_size(&cluster2, 4)
1244            .exhaustive(async || {
1245                out_recv_1
1246                    .assert_yields_only_unordered(vec![
1247                        (MemberId::from_raw_id(0), 1),
1248                        (MemberId::from_raw_id(1), 1),
1249                        (MemberId::from_raw_id(2), 1),
1250                    ])
1251                    .await;
1252
1253                out_recv_2
1254                    .assert_yields_only_unordered(vec![
1255                        (MemberId::from_raw_id(0), 2),
1256                        (MemberId::from_raw_id(1), 2),
1257                        (MemberId::from_raw_id(2), 2),
1258                        (MemberId::from_raw_id(3), 2),
1259                    ])
1260                    .await;
1261            });
1262
1263        assert_eq!(instances, 1);
1264    }
1265
1266    #[cfg(feature = "sim")]
1267    #[test]
1268    fn sim_send_bincode_o2m() {
1269        let flow = FlowBuilder::new();
1270        let cluster = flow.cluster::<()>();
1271        let node = flow.process::<()>();
1272
1273        let input = node.source_iter(q!(vec![
1274            (MemberId::from_raw_id(0), 123),
1275            (MemberId::from_raw_id(1), 456),
1276        ]));
1277
1278        let out_recv = input
1279            .demux(&cluster, TCP.bincode())
1280            .map(q!(|x| x + 1))
1281            .send(&node, TCP.bincode())
1282            .entries()
1283            .sim_output();
1284
1285        flow.sim()
1286            .with_cluster_size(&cluster, 4)
1287            .exhaustive(async || {
1288                out_recv
1289                    .assert_yields_only_unordered(vec![
1290                        (MemberId::from_raw_id(0), 124),
1291                        (MemberId::from_raw_id(1), 457),
1292                    ])
1293                    .await
1294            });
1295    }
1296
1297    #[cfg(feature = "sim")]
1298    #[test]
1299    fn sim_broadcast_bincode_o2m() {
1300        let flow = FlowBuilder::new();
1301        let cluster = flow.cluster::<()>();
1302        let node = flow.process::<()>();
1303
1304        let input = node.source_iter(q!(vec![123, 456]));
1305
1306        let out_recv = input
1307            .broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1308            .map(q!(|x| x + 1))
1309            .send(&node, TCP.bincode())
1310            .entries()
1311            .sim_output();
1312
1313        let mut c_1_produced = false;
1314        let mut c_2_produced = false;
1315
1316        flow.sim()
1317            .with_cluster_size(&cluster, 2)
1318            .exhaustive(async || {
1319                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1320
1321                // check that order is preserved
1322                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1323                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1324                    c_1_produced = true;
1325                }
1326
1327                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1328                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1329                    c_2_produced = true;
1330                }
1331            });
1332
1333        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1334    }
1335
1336    #[cfg(feature = "sim")]
1337    #[test]
1338    fn sim_send_bincode_m2m() {
1339        let flow = FlowBuilder::new();
1340        let cluster = flow.cluster::<()>();
1341        let node = flow.process::<()>();
1342
1343        let input = node.source_iter(q!(vec![
1344            (MemberId::from_raw_id(0), 123),
1345            (MemberId::from_raw_id(1), 456),
1346        ]));
1347
1348        let out_recv = input
1349            .demux(&cluster, TCP.bincode())
1350            .map(q!(|x| x + 1))
1351            .flat_map_ordered(q!(|x| vec![
1352                (MemberId::from_raw_id(0), x),
1353                (MemberId::from_raw_id(1), x),
1354            ]))
1355            .demux(&cluster, TCP.bincode())
1356            .entries()
1357            .send(&node, TCP.bincode())
1358            .entries()
1359            .sim_output();
1360
1361        flow.sim()
1362            .with_cluster_size(&cluster, 4)
1363            .exhaustive(async || {
1364                out_recv
1365                    .assert_yields_only_unordered(vec![
1366                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1367                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1368                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1369                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1370                    ])
1371                    .await
1372            });
1373    }
1374}