Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.fail_stop().bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165    where
166        T: Serialize + DeserializeOwned,
167    {
168        let serialize_pipeline = Some(N::serialize_thunk(false));
169        let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171        let name = via.name();
172        if to.multiversioned() && name.is_none() {
173            panic!(
174                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175            );
176        }
177
178        Stream::new(
179            to.clone(),
180            HydroNode::Network {
181                name: name.map(ToOwned::to_owned),
182                serialize_fn: serialize_pipeline.map(|e| e.into()),
183                instantiate_fn: DebugInstantiate::Building,
184                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
185                input: Box::new(self.ir_node.into_inner()),
186                metadata: to.new_node_metadata(
187                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
188                ),
189            },
190        )
191    }
192
193    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
194    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
195    /// using [`bincode`] to serialize/deserialize messages.
196    ///
197    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
198    /// membership information. This is a common pattern in distributed systems for broadcasting data to
199    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
200    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
201    /// each element to all cluster members.
202    ///
203    /// # Non-Determinism
204    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
205    /// to the current cluster members _at that point in time_. Depending on when we are notified of
206    /// membership changes, we will broadcast each element to different members.
207    ///
208    /// # Example
209    /// ```rust
210    /// # #[cfg(feature = "deploy")] {
211    /// # use hydro_lang::prelude::*;
212    /// # use futures::StreamExt;
213    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214    /// let p1 = flow.process::<()>();
215    /// let workers: Cluster<()> = flow.cluster::<()>();
216    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
217    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
218    /// # on_worker.send_bincode(&p2).entries()
219    /// // if there are 4 members in the cluster, each receives one element
220    /// // - MemberId::<()>(0): [123]
221    /// // - MemberId::<()>(1): [123]
222    /// // - MemberId::<()>(2): [123]
223    /// // - MemberId::<()>(3): [123]
224    /// # }, |mut stream| async move {
225    /// # let mut results = Vec::new();
226    /// # for w in 0..4 {
227    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
228    /// # }
229    /// # results.sort();
230    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
231    /// # }));
232    /// # }
233    /// ```
234    pub fn broadcast_bincode<L2: 'a>(
235        self,
236        other: &Cluster<'a, L2>,
237        nondet_membership: NonDet,
238    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
239    where
240        T: Clone + Serialize + DeserializeOwned,
241    {
242        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
243    }
244
245    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
246    /// using the configuration in `via` to set up the message transport.
247    ///
248    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
249    /// membership information. This is a common pattern in distributed systems for broadcasting data to
250    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
251    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
252    /// each element to all cluster members.
253    ///
254    /// # Non-Determinism
255    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
256    /// to the current cluster members _at that point in time_. Depending on when we are notified of
257    /// membership changes, we will broadcast each element to different members.
258    ///
259    /// # Example
260    /// ```rust
261    /// # #[cfg(feature = "deploy")] {
262    /// # use hydro_lang::prelude::*;
263    /// # use futures::StreamExt;
264    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
265    /// let p1 = flow.process::<()>();
266    /// let workers: Cluster<()> = flow.cluster::<()>();
267    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
268    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
269    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
270    /// // if there are 4 members in the cluster, each receives one element
271    /// // - MemberId::<()>(0): [123]
272    /// // - MemberId::<()>(1): [123]
273    /// // - MemberId::<()>(2): [123]
274    /// // - MemberId::<()>(3): [123]
275    /// # }, |mut stream| async move {
276    /// # let mut results = Vec::new();
277    /// # for w in 0..4 {
278    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
279    /// # }
280    /// # results.sort();
281    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
282    /// # }));
283    /// # }
284    /// ```
285    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
286        self,
287        to: &Cluster<'a, L2>,
288        via: N,
289        nondet_membership: NonDet,
290    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
291    where
292        T: Clone + Serialize + DeserializeOwned,
293    {
294        let ids = track_membership(self.location.source_cluster_members(to));
295        sliced! {
296            let members_snapshot = use(ids, nondet_membership);
297            let elements = use(self, nondet_membership);
298
299            let current_members = members_snapshot.filter(q!(|b| *b));
300            elements.repeat_with_keys(current_members)
301        }
302        .demux(to, via)
303    }
304
305    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
306    /// serialization. The external process can receive these elements by establishing a TCP
307    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
308    ///
309    /// # Example
310    /// ```rust
311    /// # #[cfg(feature = "deploy")] {
312    /// # use hydro_lang::prelude::*;
313    /// # use futures::StreamExt;
314    /// # tokio_test::block_on(async move {
315    /// let mut flow = FlowBuilder::new();
316    /// let process = flow.process::<()>();
317    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
318    /// let external = flow.external::<()>();
319    /// let external_handle = numbers.send_bincode_external(&external);
320    ///
321    /// let mut deployment = hydro_deploy::Deployment::new();
322    /// let nodes = flow
323    ///     .with_process(&process, deployment.Localhost())
324    ///     .with_external(&external, deployment.Localhost())
325    ///     .deploy(&mut deployment);
326    ///
327    /// deployment.deploy().await.unwrap();
328    /// // establish the TCP connection
329    /// let mut external_recv_stream = nodes.connect(external_handle).await;
330    /// deployment.start().await.unwrap();
331    ///
332    /// for w in 1..=3 {
333    ///     assert_eq!(external_recv_stream.next().await, Some(w));
334    /// }
335    /// # });
336    /// # }
337    /// ```
338    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
339    where
340        T: Serialize + DeserializeOwned,
341    {
342        let serialize_pipeline = Some(serialize_bincode::<T>(false));
343
344        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
345
346        let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
347
348        flow_state_borrow.push_root(HydroRoot::SendExternal {
349            to_external_key: other.key,
350            to_port_id: external_port_id,
351            to_many: false,
352            unpaired: true,
353            serialize_fn: serialize_pipeline.map(|e| e.into()),
354            instantiate_fn: DebugInstantiate::Building,
355            input: Box::new(self.ir_node.into_inner()),
356            op_metadata: HydroIrOpMetadata::new(),
357        });
358
359        ExternalBincodeStream {
360            process_key: other.key,
361            port_id: external_port_id,
362            _phantom: PhantomData,
363        }
364    }
365
366    #[cfg(feature = "sim")]
367    /// Sets up a simulation output port for this stream, allowing test code to receive elements
368    /// sent to this stream during simulation.
369    pub fn sim_output(self) -> SimReceiver<T, O, R>
370    where
371        T: Serialize + DeserializeOwned,
372    {
373        let external_location: External<'a, ()> = External {
374            key: LocationKey::FIRST,
375            flow_state: self.location.flow_state().clone(),
376            _phantom: PhantomData,
377        };
378
379        let external = self.send_bincode_external(&external_location);
380
381        SimReceiver(external.port_id, PhantomData)
382    }
383}
384
385impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
386    /// Creates an external output for embedded deployment mode.
387    ///
388    /// The `name` parameter specifies the name of the field in the generated
389    /// `EmbeddedOutputs` struct that will receive elements from this stream.
390    /// The generated function will accept an `EmbeddedOutputs` struct with an
391    /// `impl FnMut(T)` field with this name.
392    pub fn embedded_output(self, name: impl Into<String>) {
393        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
394
395        self.location
396            .flow_state()
397            .borrow_mut()
398            .push_root(HydroRoot::EmbeddedOutput {
399                ident,
400                input: Box::new(self.ir_node.into_inner()),
401                op_metadata: HydroIrOpMetadata::new(),
402            });
403    }
404}
405
406impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
407    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
408{
409    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
410    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
411    /// using [`bincode`] to serialize/deserialize messages.
412    ///
413    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
414    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
415    /// this API allows precise targeting of specific cluster members rather than broadcasting to
416    /// all members.
417    ///
418    /// # Example
419    /// ```rust
420    /// # #[cfg(feature = "deploy")] {
421    /// # use hydro_lang::prelude::*;
422    /// # use futures::StreamExt;
423    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
424    /// let p1 = flow.process::<()>();
425    /// let workers: Cluster<()> = flow.cluster::<()>();
426    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
427    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
428    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
429    ///     .demux_bincode(&workers);
430    /// # on_worker.send_bincode(&p2).entries()
431    /// // if there are 4 members in the cluster, each receives one element
432    /// // - MemberId::<()>(0): [0]
433    /// // - MemberId::<()>(1): [1]
434    /// // - MemberId::<()>(2): [2]
435    /// // - MemberId::<()>(3): [3]
436    /// # }, |mut stream| async move {
437    /// # let mut results = Vec::new();
438    /// # for w in 0..4 {
439    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
440    /// # }
441    /// # results.sort();
442    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
443    /// # }));
444    /// # }
445    /// ```
446    pub fn demux_bincode(
447        self,
448        other: &Cluster<'a, L2>,
449    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
450    where
451        T: Serialize + DeserializeOwned,
452    {
453        self.demux(other, TCP.fail_stop().bincode())
454    }
455
456    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
457    /// using the configuration in `via` to set up the message transport.
458    ///
459    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
460    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
461    /// this API allows precise targeting of specific cluster members rather than broadcasting to
462    /// all members.
463    ///
464    /// # Example
465    /// ```rust
466    /// # #[cfg(feature = "deploy")] {
467    /// # use hydro_lang::prelude::*;
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
470    /// let p1 = flow.process::<()>();
471    /// let workers: Cluster<()> = flow.cluster::<()>();
472    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
473    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
474    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
475    ///     .demux(&workers, TCP.fail_stop().bincode());
476    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
477    /// // if there are 4 members in the cluster, each receives one element
478    /// // - MemberId::<()>(0): [0]
479    /// // - MemberId::<()>(1): [1]
480    /// // - MemberId::<()>(2): [2]
481    /// // - MemberId::<()>(3): [3]
482    /// # }, |mut stream| async move {
483    /// # let mut results = Vec::new();
484    /// # for w in 0..4 {
485    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
486    /// # }
487    /// # results.sort();
488    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
489    /// # }));
490    /// # }
491    /// ```
492    pub fn demux<N: NetworkFor<T>>(
493        self,
494        to: &Cluster<'a, L2>,
495        via: N,
496    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
497    where
498        T: Serialize + DeserializeOwned,
499    {
500        self.into_keyed().demux(to, via)
501    }
502}
503
504impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
505    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
506    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
507    /// [`bincode`] to serialize/deserialize messages.
508    ///
509    /// This provides load balancing by evenly distributing work across cluster members. The
510    /// distribution is deterministic based on element order - the first element goes to member 0,
511    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
512    ///
513    /// # Non-Determinism
514    /// The set of cluster members may asynchronously change over time. Each element is distributed
515    /// based on the current cluster membership _at that point in time_. Depending on when cluster
516    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
517    /// membership is stable, the order of members in the round-robin pattern may change across runs.
518    ///
519    /// # Ordering Requirements
520    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
521    /// order of messages and retries affects the round-robin pattern.
522    ///
523    /// # Example
524    /// ```rust
525    /// # #[cfg(feature = "deploy")] {
526    /// # use hydro_lang::prelude::*;
527    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
528    /// # use futures::StreamExt;
529    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
530    /// let p1 = flow.process::<()>();
531    /// let workers: Cluster<()> = flow.cluster::<()>();
532    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
533    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
534    /// on_worker.send_bincode(&p2)
535    /// # .first().values() // we use first to assert that each member gets one element
536    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
537    /// // - MemberId::<()>(?): [1]
538    /// // - MemberId::<()>(?): [2]
539    /// // - MemberId::<()>(?): [3]
540    /// // - MemberId::<()>(?): [4]
541    /// # }, |mut stream| async move {
542    /// # let mut results = Vec::new();
543    /// # for w in 0..4 {
544    /// #     results.push(stream.next().await.unwrap());
545    /// # }
546    /// # results.sort();
547    /// # assert_eq!(results, vec![1, 2, 3, 4]);
548    /// # }));
549    /// # }
550    /// ```
551    pub fn round_robin_bincode<L2: 'a>(
552        self,
553        other: &Cluster<'a, L2>,
554        nondet_membership: NonDet,
555    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
556    where
557        T: Serialize + DeserializeOwned,
558    {
559        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
560    }
561
562    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
563    /// the configuration in `via` to set up the message transport.
564    ///
565    /// This provides load balancing by evenly distributing work across cluster members. The
566    /// distribution is deterministic based on element order - the first element goes to member 0,
567    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
568    ///
569    /// # Non-Determinism
570    /// The set of cluster members may asynchronously change over time. Each element is distributed
571    /// based on the current cluster membership _at that point in time_. Depending on when cluster
572    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
573    /// membership is stable, the order of members in the round-robin pattern may change across runs.
574    ///
575    /// # Ordering Requirements
576    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
577    /// order of messages and retries affects the round-robin pattern.
578    ///
579    /// # Example
580    /// ```rust
581    /// # #[cfg(feature = "deploy")] {
582    /// # use hydro_lang::prelude::*;
583    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
584    /// # use futures::StreamExt;
585    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
586    /// let p1 = flow.process::<()>();
587    /// let workers: Cluster<()> = flow.cluster::<()>();
588    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
589    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
590    /// on_worker.send(&p2, TCP.fail_stop().bincode())
591    /// # .first().values() // we use first to assert that each member gets one element
592    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
593    /// // - MemberId::<()>(?): [1]
594    /// // - MemberId::<()>(?): [2]
595    /// // - MemberId::<()>(?): [3]
596    /// // - MemberId::<()>(?): [4]
597    /// # }, |mut stream| async move {
598    /// # let mut results = Vec::new();
599    /// # for w in 0..4 {
600    /// #     results.push(stream.next().await.unwrap());
601    /// # }
602    /// # results.sort();
603    /// # assert_eq!(results, vec![1, 2, 3, 4]);
604    /// # }));
605    /// # }
606    /// ```
607    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
608        self,
609        to: &Cluster<'a, L2>,
610        via: N,
611        nondet_membership: NonDet,
612    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
613    where
614        T: Serialize + DeserializeOwned,
615    {
616        let ids = track_membership(self.location.source_cluster_members(to));
617        sliced! {
618            let members_snapshot = use(ids, nondet_membership);
619            let elements = use(self.enumerate(), nondet_membership);
620
621            let current_members = members_snapshot
622                .filter(q!(|b| *b))
623                .keys()
624                .assume_ordering(nondet_membership)
625                .collect_vec();
626
627            elements
628                .cross_singleton(current_members)
629                .map(q!(|(data, members)| (
630                    members[data.0 % members.len()].clone(),
631                    data.1
632                )))
633        }
634        .demux(to, via)
635    }
636}
637
638impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
639    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
640    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
641    /// [`bincode`] to serialize/deserialize messages.
642    ///
643    /// This provides load balancing by evenly distributing work across cluster members. The
644    /// distribution is deterministic based on element order - the first element goes to member 0,
645    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
646    ///
647    /// # Non-Determinism
648    /// The set of cluster members may asynchronously change over time. Each element is distributed
649    /// based on the current cluster membership _at that point in time_. Depending on when cluster
650    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
651    /// membership is stable, the order of members in the round-robin pattern may change across runs.
652    ///
653    /// # Ordering Requirements
654    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
655    /// order of messages and retries affects the round-robin pattern.
656    ///
657    /// # Example
658    /// ```rust
659    /// # #[cfg(feature = "deploy")] {
660    /// # use hydro_lang::prelude::*;
661    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
662    /// # use hydro_lang::location::MemberId;
663    /// # use futures::StreamExt;
664    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
665    /// let p1 = flow.process::<()>();
666    /// let workers1: Cluster<()> = flow.cluster::<()>();
667    /// let workers2: Cluster<()> = flow.cluster::<()>();
668    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
669    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
670    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
671    /// on_worker2.send_bincode(&p2)
672    /// # .entries()
673    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
674    /// # }, |mut stream| async move {
675    /// # let mut results = Vec::new();
676    /// # let mut locations = std::collections::HashSet::new();
677    /// # for w in 0..=16 {
678    /// #     let (location, v) = stream.next().await.unwrap();
679    /// #     locations.insert(location);
680    /// #     results.push(v);
681    /// # }
682    /// # results.sort();
683    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
684    /// # assert_eq!(locations.len(), 16);
685    /// # }));
686    /// # }
687    /// ```
688    pub fn round_robin_bincode<L2: 'a>(
689        self,
690        other: &Cluster<'a, L2>,
691        nondet_membership: NonDet,
692    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
693    where
694        T: Serialize + DeserializeOwned,
695    {
696        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
697    }
698
699    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
700    /// the configuration in `via` to set up the message transport.
701    ///
702    /// This provides load balancing by evenly distributing work across cluster members. The
703    /// distribution is deterministic based on element order - the first element goes to member 0,
704    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
705    ///
706    /// # Non-Determinism
707    /// The set of cluster members may asynchronously change over time. Each element is distributed
708    /// based on the current cluster membership _at that point in time_. Depending on when cluster
709    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
710    /// membership is stable, the order of members in the round-robin pattern may change across runs.
711    ///
712    /// # Ordering Requirements
713    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
714    /// order of messages and retries affects the round-robin pattern.
715    ///
716    /// # Example
717    /// ```rust
718    /// # #[cfg(feature = "deploy")] {
719    /// # use hydro_lang::prelude::*;
720    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
721    /// # use hydro_lang::location::MemberId;
722    /// # use futures::StreamExt;
723    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
724    /// let p1 = flow.process::<()>();
725    /// let workers1: Cluster<()> = flow.cluster::<()>();
726    /// let workers2: Cluster<()> = flow.cluster::<()>();
727    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
728    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
729    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
730    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
731    /// # .entries()
732    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
733    /// # }, |mut stream| async move {
734    /// # let mut results = Vec::new();
735    /// # let mut locations = std::collections::HashSet::new();
736    /// # for w in 0..=16 {
737    /// #     let (location, v) = stream.next().await.unwrap();
738    /// #     locations.insert(location);
739    /// #     results.push(v);
740    /// # }
741    /// # results.sort();
742    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
743    /// # assert_eq!(locations.len(), 16);
744    /// # }));
745    /// # }
746    /// ```
747    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
748        self,
749        to: &Cluster<'a, L2>,
750        via: N,
751        nondet_membership: NonDet,
752    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
753    where
754        T: Serialize + DeserializeOwned,
755    {
756        let ids = track_membership(self.location.source_cluster_members(to));
757        sliced! {
758            let members_snapshot = use(ids, nondet_membership);
759            let elements = use(self.enumerate(), nondet_membership);
760
761            let current_members = members_snapshot
762                .filter(q!(|b| *b))
763                .keys()
764                .assume_ordering(nondet_membership)
765                .collect_vec();
766
767            elements
768                .cross_singleton(current_members)
769                .map(q!(|(data, members)| (
770                    members[data.0 % members.len()].clone(),
771                    data.1
772                )))
773        }
774        .demux(to, via)
775    }
776}
777
778impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
779    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
780    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
781    /// using [`bincode`] to serialize/deserialize messages.
782    ///
783    /// Each cluster member sends its local stream elements, and they are collected at the destination
784    /// as a [`KeyedStream`] where keys identify the source cluster member.
785    ///
786    /// # Example
787    /// ```rust
788    /// # #[cfg(feature = "deploy")] {
789    /// # use hydro_lang::prelude::*;
790    /// # use futures::StreamExt;
791    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
792    /// let workers: Cluster<()> = flow.cluster::<()>();
793    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
794    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
795    /// # all_received.entries()
796    /// # }, |mut stream| async move {
797    /// // if there are 4 members in the cluster, we should receive 4 elements
798    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
799    /// # let mut results = Vec::new();
800    /// # for w in 0..4 {
801    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
802    /// # }
803    /// # results.sort();
804    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
805    /// # }));
806    /// # }
807    /// ```
808    ///
809    /// If you don't need to know the source for each element, you can use `.values()`
810    /// to get just the data:
811    /// ```rust
812    /// # #[cfg(feature = "deploy")] {
813    /// # use hydro_lang::prelude::*;
814    /// # use hydro_lang::live_collections::stream::NoOrder;
815    /// # use futures::StreamExt;
816    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
817    /// # let workers: Cluster<()> = flow.cluster::<()>();
818    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
819    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
820    /// # values
821    /// # }, |mut stream| async move {
822    /// # let mut results = Vec::new();
823    /// # for w in 0..4 {
824    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
825    /// # }
826    /// # results.sort();
827    /// // if there are 4 members in the cluster, we should receive 4 elements
828    /// // 1, 1, 1, 1
829    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
830    /// # }));
831    /// # }
832    /// ```
833    pub fn send_bincode<L2>(
834        self,
835        other: &Process<'a, L2>,
836    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
837    where
838        T: Serialize + DeserializeOwned,
839    {
840        self.send(other, TCP.fail_stop().bincode())
841    }
842
843    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
844    /// using the configuration in `via` to set up the message transport.
845    ///
846    /// Each cluster member sends its local stream elements, and they are collected at the destination
847    /// as a [`KeyedStream`] where keys identify the source cluster member.
848    ///
849    /// # Example
850    /// ```rust
851    /// # #[cfg(feature = "deploy")] {
852    /// # use hydro_lang::prelude::*;
853    /// # use futures::StreamExt;
854    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
855    /// let workers: Cluster<()> = flow.cluster::<()>();
856    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
857    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
858    /// # all_received.entries()
859    /// # }, |mut stream| async move {
860    /// // if there are 4 members in the cluster, we should receive 4 elements
861    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
862    /// # let mut results = Vec::new();
863    /// # for w in 0..4 {
864    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
865    /// # }
866    /// # results.sort();
867    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
868    /// # }));
869    /// # }
870    /// ```
871    ///
872    /// If you don't need to know the source for each element, you can use `.values()`
873    /// to get just the data:
874    /// ```rust
875    /// # #[cfg(feature = "deploy")] {
876    /// # use hydro_lang::prelude::*;
877    /// # use hydro_lang::live_collections::stream::NoOrder;
878    /// # use futures::StreamExt;
879    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
880    /// # let workers: Cluster<()> = flow.cluster::<()>();
881    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
882    /// let values: Stream<i32, _, _, NoOrder> =
883    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
884    /// # values
885    /// # }, |mut stream| async move {
886    /// # let mut results = Vec::new();
887    /// # for w in 0..4 {
888    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
889    /// # }
890    /// # results.sort();
891    /// // if there are 4 members in the cluster, we should receive 4 elements
892    /// // 1, 1, 1, 1
893    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
894    /// # }));
895    /// # }
896    /// ```
897    pub fn send<L2, N: NetworkFor<T>>(
898        self,
899        to: &Process<'a, L2>,
900        via: N,
901    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
902    where
903        T: Serialize + DeserializeOwned,
904    {
905        let serialize_pipeline = Some(N::serialize_thunk(false));
906
907        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
908
909        let name = via.name();
910        if to.multiversioned() && name.is_none() {
911            panic!(
912                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
913            );
914        }
915
916        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
917            to.clone(),
918            HydroNode::Network {
919                name: name.map(ToOwned::to_owned),
920                serialize_fn: serialize_pipeline.map(|e| e.into()),
921                instantiate_fn: DebugInstantiate::Building,
922                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
923                input: Box::new(self.ir_node.into_inner()),
924                metadata: to.new_node_metadata(Stream::<
925                    (MemberId<L>, T),
926                    Process<'a, L2>,
927                    Unbounded,
928                    O,
929                    R,
930                >::collection_kind()),
931            },
932        );
933
934        raw_stream.into_keyed()
935    }
936
937    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
938    /// Broadcasts elements of this stream at each source member to all members of a destination
939    /// cluster, using [`bincode`] to serialize/deserialize messages.
940    ///
941    /// Each source member sends each of its stream elements to **every** member of the cluster
942    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
943    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
944    /// **only data elements** and sends each element to all cluster members.
945    ///
946    /// # Non-Determinism
947    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
948    /// to the current cluster members known _at that point in time_ at the source member. Depending
949    /// on when each source member is notified of membership changes, it will broadcast each element
950    /// to different members.
951    ///
952    /// # Example
953    /// ```rust
954    /// # #[cfg(feature = "deploy")] {
955    /// # use hydro_lang::prelude::*;
956    /// # use hydro_lang::location::MemberId;
957    /// # use futures::StreamExt;
958    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
959    /// # type Source = ();
960    /// # type Destination = ();
961    /// let source: Cluster<Source> = flow.cluster::<Source>();
962    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
963    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
964    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
965    /// # on_destination.entries().send_bincode(&p2).entries()
966    /// // if there are 4 members in the desination, each receives one element from each source member
967    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
968    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
969    /// // - ...
970    /// # }, |mut stream| async move {
971    /// # let mut results = Vec::new();
972    /// # for w in 0..16 {
973    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
974    /// # }
975    /// # results.sort();
976    /// # assert_eq!(results, vec![
977    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
978    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
979    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
980    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
981    /// # ]);
982    /// # }));
983    /// # }
984    /// ```
985    pub fn broadcast_bincode<L2: 'a>(
986        self,
987        other: &Cluster<'a, L2>,
988        nondet_membership: NonDet,
989    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
990    where
991        T: Clone + Serialize + DeserializeOwned,
992    {
993        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
994    }
995
996    /// Broadcasts elements of this stream at each source member to all members of a destination
997    /// cluster, using the configuration in `via` to set up the message transport.
998    ///
999    /// Each source member sends each of its stream elements to **every** member of the cluster
1000    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1001    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1002    /// **only data elements** and sends each element to all cluster members.
1003    ///
1004    /// # Non-Determinism
1005    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1006    /// to the current cluster members known _at that point in time_ at the source member. Depending
1007    /// on when each source member is notified of membership changes, it will broadcast each element
1008    /// to different members.
1009    ///
1010    /// # Example
1011    /// ```rust
1012    /// # #[cfg(feature = "deploy")] {
1013    /// # use hydro_lang::prelude::*;
1014    /// # use hydro_lang::location::MemberId;
1015    /// # use futures::StreamExt;
1016    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1017    /// # type Source = ();
1018    /// # type Destination = ();
1019    /// let source: Cluster<Source> = flow.cluster::<Source>();
1020    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1021    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1022    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1023    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1024    /// // if there are 4 members in the desination, each receives one element from each source member
1025    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1026    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1027    /// // - ...
1028    /// # }, |mut stream| async move {
1029    /// # let mut results = Vec::new();
1030    /// # for w in 0..16 {
1031    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1032    /// # }
1033    /// # results.sort();
1034    /// # assert_eq!(results, vec![
1035    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1036    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1037    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1038    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1039    /// # ]);
1040    /// # }));
1041    /// # }
1042    /// ```
1043    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1044        self,
1045        to: &Cluster<'a, L2>,
1046        via: N,
1047        nondet_membership: NonDet,
1048    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1049    where
1050        T: Clone + Serialize + DeserializeOwned,
1051    {
1052        let ids = track_membership(self.location.source_cluster_members(to));
1053        sliced! {
1054            let members_snapshot = use(ids, nondet_membership);
1055            let elements = use(self, nondet_membership);
1056
1057            let current_members = members_snapshot.filter(q!(|b| *b));
1058            elements.repeat_with_keys(current_members)
1059        }
1060        .demux(to, via)
1061    }
1062}
1063
1064impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1065    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1066{
1067    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1068    /// Sends elements of this stream at each source member to specific members of a destination
1069    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1070    ///
1071    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1072    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1073    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1074    /// all members.
1075    ///
1076    /// Each cluster member sends its local stream elements, and they are collected at each
1077    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1078    ///
1079    /// # Example
1080    /// ```rust
1081    /// # #[cfg(feature = "deploy")] {
1082    /// # use hydro_lang::prelude::*;
1083    /// # use futures::StreamExt;
1084    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1085    /// # type Source = ();
1086    /// # type Destination = ();
1087    /// let source: Cluster<Source> = flow.cluster::<Source>();
1088    /// let to_send: Stream<_, Cluster<_>, _> = source
1089    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1090    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1091    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1092    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1093    /// # all_received.entries().send_bincode(&p2).entries()
1094    /// # }, |mut stream| async move {
1095    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1096    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1097    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1098    /// // - ...
1099    /// # let mut results = Vec::new();
1100    /// # for w in 0..16 {
1101    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1102    /// # }
1103    /// # results.sort();
1104    /// # assert_eq!(results, vec![
1105    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1106    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1107    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1108    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1109    /// # ]);
1110    /// # }));
1111    /// # }
1112    /// ```
1113    pub fn demux_bincode(
1114        self,
1115        other: &Cluster<'a, L2>,
1116    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1117    where
1118        T: Serialize + DeserializeOwned,
1119    {
1120        self.demux(other, TCP.fail_stop().bincode())
1121    }
1122
1123    /// Sends elements of this stream at each source member to specific members of a destination
1124    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1125    /// message transport.
1126    ///
1127    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1128    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1129    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1130    /// all members.
1131    ///
1132    /// Each cluster member sends its local stream elements, and they are collected at each
1133    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1134    ///
1135    /// # Example
1136    /// ```rust
1137    /// # #[cfg(feature = "deploy")] {
1138    /// # use hydro_lang::prelude::*;
1139    /// # use futures::StreamExt;
1140    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1141    /// # type Source = ();
1142    /// # type Destination = ();
1143    /// let source: Cluster<Source> = flow.cluster::<Source>();
1144    /// let to_send: Stream<_, Cluster<_>, _> = source
1145    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1146    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1147    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1148    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1149    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1150    /// # }, |mut stream| async move {
1151    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1152    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1153    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1154    /// // - ...
1155    /// # let mut results = Vec::new();
1156    /// # for w in 0..16 {
1157    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1158    /// # }
1159    /// # results.sort();
1160    /// # assert_eq!(results, vec![
1161    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1162    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1163    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1164    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1165    /// # ]);
1166    /// # }));
1167    /// # }
1168    /// ```
1169    pub fn demux<N: NetworkFor<T>>(
1170        self,
1171        to: &Cluster<'a, L2>,
1172        via: N,
1173    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1174    where
1175        T: Serialize + DeserializeOwned,
1176    {
1177        self.into_keyed().demux(to, via)
1178    }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    #[cfg(feature = "sim")]
1184    use stageleft::q;
1185
1186    #[cfg(feature = "sim")]
1187    use crate::location::{Location, MemberId};
1188    #[cfg(feature = "sim")]
1189    use crate::networking::TCP;
1190    #[cfg(feature = "sim")]
1191    use crate::nondet::nondet;
1192    #[cfg(feature = "sim")]
1193    use crate::prelude::FlowBuilder;
1194
1195    #[cfg(feature = "sim")]
1196    #[test]
1197    fn sim_send_bincode_o2o() {
1198        use crate::networking::TCP;
1199
1200        let mut flow = FlowBuilder::new();
1201        let node = flow.process::<()>();
1202        let node2 = flow.process::<()>();
1203
1204        let (in_send, input) = node.sim_input();
1205
1206        let out_recv = input
1207            .send(&node2, TCP.fail_stop().bincode())
1208            .batch(&node2.tick(), nondet!(/** test */))
1209            .count()
1210            .all_ticks()
1211            .sim_output();
1212
1213        let instances = flow.sim().exhaustive(async || {
1214            in_send.send(());
1215            in_send.send(());
1216            in_send.send(());
1217
1218            let received = out_recv.collect::<Vec<_>>().await;
1219            assert!(received.into_iter().sum::<usize>() == 3);
1220        });
1221
1222        assert_eq!(instances, 4); // 2^{3 - 1}
1223    }
1224
1225    #[cfg(feature = "sim")]
1226    #[test]
1227    fn sim_send_bincode_m2o() {
1228        let mut flow = FlowBuilder::new();
1229        let cluster = flow.cluster::<()>();
1230        let node = flow.process::<()>();
1231
1232        let input = cluster.source_iter(q!(vec![1]));
1233
1234        let out_recv = input
1235            .send(&node, TCP.fail_stop().bincode())
1236            .entries()
1237            .batch(&node.tick(), nondet!(/** test */))
1238            .all_ticks()
1239            .sim_output();
1240
1241        let instances = flow
1242            .sim()
1243            .with_cluster_size(&cluster, 4)
1244            .exhaustive(async || {
1245                out_recv
1246                    .assert_yields_only_unordered(vec![
1247                        (MemberId::from_raw_id(0), 1),
1248                        (MemberId::from_raw_id(1), 1),
1249                        (MemberId::from_raw_id(2), 1),
1250                        (MemberId::from_raw_id(3), 1),
1251                    ])
1252                    .await
1253            });
1254
1255        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1256    }
1257
1258    #[cfg(feature = "sim")]
1259    #[test]
1260    fn sim_send_bincode_multiple_m2o() {
1261        let mut flow = FlowBuilder::new();
1262        let cluster1 = flow.cluster::<()>();
1263        let cluster2 = flow.cluster::<()>();
1264        let node = flow.process::<()>();
1265
1266        let out_recv_1 = cluster1
1267            .source_iter(q!(vec![1]))
1268            .send(&node, TCP.fail_stop().bincode())
1269            .entries()
1270            .sim_output();
1271
1272        let out_recv_2 = cluster2
1273            .source_iter(q!(vec![2]))
1274            .send(&node, TCP.fail_stop().bincode())
1275            .entries()
1276            .sim_output();
1277
1278        let instances = flow
1279            .sim()
1280            .with_cluster_size(&cluster1, 3)
1281            .with_cluster_size(&cluster2, 4)
1282            .exhaustive(async || {
1283                out_recv_1
1284                    .assert_yields_only_unordered(vec![
1285                        (MemberId::from_raw_id(0), 1),
1286                        (MemberId::from_raw_id(1), 1),
1287                        (MemberId::from_raw_id(2), 1),
1288                    ])
1289                    .await;
1290
1291                out_recv_2
1292                    .assert_yields_only_unordered(vec![
1293                        (MemberId::from_raw_id(0), 2),
1294                        (MemberId::from_raw_id(1), 2),
1295                        (MemberId::from_raw_id(2), 2),
1296                        (MemberId::from_raw_id(3), 2),
1297                    ])
1298                    .await;
1299            });
1300
1301        assert_eq!(instances, 1);
1302    }
1303
1304    #[cfg(feature = "sim")]
1305    #[test]
1306    fn sim_send_bincode_o2m() {
1307        let mut flow = FlowBuilder::new();
1308        let cluster = flow.cluster::<()>();
1309        let node = flow.process::<()>();
1310
1311        let input = node.source_iter(q!(vec![
1312            (MemberId::from_raw_id(0), 123),
1313            (MemberId::from_raw_id(1), 456),
1314        ]));
1315
1316        let out_recv = input
1317            .demux(&cluster, TCP.fail_stop().bincode())
1318            .map(q!(|x| x + 1))
1319            .send(&node, TCP.fail_stop().bincode())
1320            .entries()
1321            .sim_output();
1322
1323        flow.sim()
1324            .with_cluster_size(&cluster, 4)
1325            .exhaustive(async || {
1326                out_recv
1327                    .assert_yields_only_unordered(vec![
1328                        (MemberId::from_raw_id(0), 124),
1329                        (MemberId::from_raw_id(1), 457),
1330                    ])
1331                    .await
1332            });
1333    }
1334
1335    #[cfg(feature = "sim")]
1336    #[test]
1337    fn sim_broadcast_bincode_o2m() {
1338        let mut flow = FlowBuilder::new();
1339        let cluster = flow.cluster::<()>();
1340        let node = flow.process::<()>();
1341
1342        let input = node.source_iter(q!(vec![123, 456]));
1343
1344        let out_recv = input
1345            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1346            .map(q!(|x| x + 1))
1347            .send(&node, TCP.fail_stop().bincode())
1348            .entries()
1349            .sim_output();
1350
1351        let mut c_1_produced = false;
1352        let mut c_2_produced = false;
1353
1354        flow.sim()
1355            .with_cluster_size(&cluster, 2)
1356            .exhaustive(async || {
1357                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1358
1359                // check that order is preserved
1360                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1361                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1362                    c_1_produced = true;
1363                }
1364
1365                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1366                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1367                    c_2_produced = true;
1368                }
1369            });
1370
1371        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1372    }
1373
1374    #[cfg(feature = "sim")]
1375    #[test]
1376    fn sim_send_bincode_m2m() {
1377        let mut flow = FlowBuilder::new();
1378        let cluster = flow.cluster::<()>();
1379        let node = flow.process::<()>();
1380
1381        let input = node.source_iter(q!(vec![
1382            (MemberId::from_raw_id(0), 123),
1383            (MemberId::from_raw_id(1), 456),
1384        ]));
1385
1386        let out_recv = input
1387            .demux(&cluster, TCP.fail_stop().bincode())
1388            .map(q!(|x| x + 1))
1389            .flat_map_ordered(q!(|x| vec![
1390                (MemberId::from_raw_id(0), x),
1391                (MemberId::from_raw_id(1), x),
1392            ]))
1393            .demux(&cluster, TCP.fail_stop().bincode())
1394            .entries()
1395            .send(&node, TCP.fail_stop().bincode())
1396            .entries()
1397            .sim_output();
1398
1399        flow.sim()
1400            .with_cluster_size(&cluster, 4)
1401            .exhaustive(async || {
1402                out_recv
1403                    .assert_yields_only_unordered(vec![
1404                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1405                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1406                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1407                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1408                    ])
1409                    .await
1410            });
1411    }
1412}