Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.fail_stop().bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165    where
166        T: Serialize + DeserializeOwned,
167    {
168        let serialize_pipeline = Some(N::serialize_thunk(false));
169        let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171        let name = via.name();
172        if to.multiversioned() && name.is_none() {
173            panic!(
174                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175            );
176        }
177
178        Stream::new(
179            to.clone(),
180            HydroNode::Network {
181                name: name.map(ToOwned::to_owned),
182                networking_info: N::networking_info(),
183                serialize_fn: serialize_pipeline.map(|e| e.into()),
184                instantiate_fn: DebugInstantiate::Building,
185                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
186                input: Box::new(self.ir_node.into_inner()),
187                metadata: to.new_node_metadata(
188                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
189                ),
190            },
191        )
192    }
193
194    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
195    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
196    /// using [`bincode`] to serialize/deserialize messages.
197    ///
198    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
199    /// membership information. This is a common pattern in distributed systems for broadcasting data to
200    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
201    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
202    /// each element to all cluster members.
203    ///
204    /// # Non-Determinism
205    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
206    /// to the current cluster members _at that point in time_. Depending on when we are notified of
207    /// membership changes, we will broadcast each element to different members.
208    ///
209    /// # Example
210    /// ```rust
211    /// # #[cfg(feature = "deploy")] {
212    /// # use hydro_lang::prelude::*;
213    /// # use futures::StreamExt;
214    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
215    /// let p1 = flow.process::<()>();
216    /// let workers: Cluster<()> = flow.cluster::<()>();
217    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
218    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
219    /// # on_worker.send_bincode(&p2).entries()
220    /// // if there are 4 members in the cluster, each receives one element
221    /// // - MemberId::<()>(0): [123]
222    /// // - MemberId::<()>(1): [123]
223    /// // - MemberId::<()>(2): [123]
224    /// // - MemberId::<()>(3): [123]
225    /// # }, |mut stream| async move {
226    /// # let mut results = Vec::new();
227    /// # for w in 0..4 {
228    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
229    /// # }
230    /// # results.sort();
231    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
232    /// # }));
233    /// # }
234    /// ```
235    pub fn broadcast_bincode<L2: 'a>(
236        self,
237        other: &Cluster<'a, L2>,
238        nondet_membership: NonDet,
239    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
240    where
241        T: Clone + Serialize + DeserializeOwned,
242    {
243        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
244    }
245
246    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
247    /// using the configuration in `via` to set up the message transport.
248    ///
249    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
250    /// membership information. This is a common pattern in distributed systems for broadcasting data to
251    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
252    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
253    /// each element to all cluster members.
254    ///
255    /// # Non-Determinism
256    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
257    /// to the current cluster members _at that point in time_. Depending on when we are notified of
258    /// membership changes, we will broadcast each element to different members.
259    ///
260    /// # Example
261    /// ```rust
262    /// # #[cfg(feature = "deploy")] {
263    /// # use hydro_lang::prelude::*;
264    /// # use futures::StreamExt;
265    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
266    /// let p1 = flow.process::<()>();
267    /// let workers: Cluster<()> = flow.cluster::<()>();
268    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
269    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
270    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
271    /// // if there are 4 members in the cluster, each receives one element
272    /// // - MemberId::<()>(0): [123]
273    /// // - MemberId::<()>(1): [123]
274    /// // - MemberId::<()>(2): [123]
275    /// // - MemberId::<()>(3): [123]
276    /// # }, |mut stream| async move {
277    /// # let mut results = Vec::new();
278    /// # for w in 0..4 {
279    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
280    /// # }
281    /// # results.sort();
282    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
283    /// # }));
284    /// # }
285    /// ```
286    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
287        self,
288        to: &Cluster<'a, L2>,
289        via: N,
290        nondet_membership: NonDet,
291    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
292    where
293        T: Clone + Serialize + DeserializeOwned,
294    {
295        let ids = track_membership(self.location.source_cluster_members(to));
296        sliced! {
297            let members_snapshot = use(ids, nondet_membership);
298            let elements = use(self, nondet_membership);
299
300            let current_members = members_snapshot.filter(q!(|b| *b));
301            elements.repeat_with_keys(current_members)
302        }
303        .demux(to, via)
304    }
305
306    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
307    /// serialization. The external process can receive these elements by establishing a TCP
308    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
309    ///
310    /// # Example
311    /// ```rust
312    /// # #[cfg(feature = "deploy")] {
313    /// # use hydro_lang::prelude::*;
314    /// # use futures::StreamExt;
315    /// # tokio_test::block_on(async move {
316    /// let mut flow = FlowBuilder::new();
317    /// let process = flow.process::<()>();
318    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
319    /// let external = flow.external::<()>();
320    /// let external_handle = numbers.send_bincode_external(&external);
321    ///
322    /// let mut deployment = hydro_deploy::Deployment::new();
323    /// let nodes = flow
324    ///     .with_process(&process, deployment.Localhost())
325    ///     .with_external(&external, deployment.Localhost())
326    ///     .deploy(&mut deployment);
327    ///
328    /// deployment.deploy().await.unwrap();
329    /// // establish the TCP connection
330    /// let mut external_recv_stream = nodes.connect(external_handle).await;
331    /// deployment.start().await.unwrap();
332    ///
333    /// for w in 1..=3 {
334    ///     assert_eq!(external_recv_stream.next().await, Some(w));
335    /// }
336    /// # });
337    /// # }
338    /// ```
339    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
340    where
341        T: Serialize + DeserializeOwned,
342    {
343        let serialize_pipeline = Some(serialize_bincode::<T>(false));
344
345        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
346
347        let external_port_id = flow_state_borrow.next_external_port();
348
349        flow_state_borrow.push_root(HydroRoot::SendExternal {
350            to_external_key: other.key,
351            to_port_id: external_port_id,
352            to_many: false,
353            unpaired: true,
354            serialize_fn: serialize_pipeline.map(|e| e.into()),
355            instantiate_fn: DebugInstantiate::Building,
356            input: Box::new(self.ir_node.into_inner()),
357            op_metadata: HydroIrOpMetadata::new(),
358        });
359
360        ExternalBincodeStream {
361            process_key: other.key,
362            port_id: external_port_id,
363            _phantom: PhantomData,
364        }
365    }
366
367    #[cfg(feature = "sim")]
368    /// Sets up a simulation output port for this stream, allowing test code to receive elements
369    /// sent to this stream during simulation.
370    pub fn sim_output(self) -> SimReceiver<T, O, R>
371    where
372        T: Serialize + DeserializeOwned,
373    {
374        let external_location: External<'a, ()> = External {
375            key: LocationKey::FIRST,
376            flow_state: self.location.flow_state().clone(),
377            _phantom: PhantomData,
378        };
379
380        let external = self.send_bincode_external(&external_location);
381
382        SimReceiver(external.port_id, PhantomData)
383    }
384}
385
386impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
387    /// Creates an external output for embedded deployment mode.
388    ///
389    /// The `name` parameter specifies the name of the field in the generated
390    /// `EmbeddedOutputs` struct that will receive elements from this stream.
391    /// The generated function will accept an `EmbeddedOutputs` struct with an
392    /// `impl FnMut(T)` field with this name.
393    pub fn embedded_output(self, name: impl Into<String>) {
394        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
395
396        self.location
397            .flow_state()
398            .borrow_mut()
399            .push_root(HydroRoot::EmbeddedOutput {
400                ident,
401                input: Box::new(self.ir_node.into_inner()),
402                op_metadata: HydroIrOpMetadata::new(),
403            });
404    }
405}
406
407impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
408    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
409{
410    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
411    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
412    /// using [`bincode`] to serialize/deserialize messages.
413    ///
414    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
415    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
416    /// this API allows precise targeting of specific cluster members rather than broadcasting to
417    /// all members.
418    ///
419    /// # Example
420    /// ```rust
421    /// # #[cfg(feature = "deploy")] {
422    /// # use hydro_lang::prelude::*;
423    /// # use futures::StreamExt;
424    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
425    /// let p1 = flow.process::<()>();
426    /// let workers: Cluster<()> = flow.cluster::<()>();
427    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
428    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
429    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
430    ///     .demux_bincode(&workers);
431    /// # on_worker.send_bincode(&p2).entries()
432    /// // if there are 4 members in the cluster, each receives one element
433    /// // - MemberId::<()>(0): [0]
434    /// // - MemberId::<()>(1): [1]
435    /// // - MemberId::<()>(2): [2]
436    /// // - MemberId::<()>(3): [3]
437    /// # }, |mut stream| async move {
438    /// # let mut results = Vec::new();
439    /// # for w in 0..4 {
440    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
441    /// # }
442    /// # results.sort();
443    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
444    /// # }));
445    /// # }
446    /// ```
447    pub fn demux_bincode(
448        self,
449        other: &Cluster<'a, L2>,
450    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
451    where
452        T: Serialize + DeserializeOwned,
453    {
454        self.demux(other, TCP.fail_stop().bincode())
455    }
456
457    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
458    /// using the configuration in `via` to set up the message transport.
459    ///
460    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
461    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
462    /// this API allows precise targeting of specific cluster members rather than broadcasting to
463    /// all members.
464    ///
465    /// # Example
466    /// ```rust
467    /// # #[cfg(feature = "deploy")] {
468    /// # use hydro_lang::prelude::*;
469    /// # use futures::StreamExt;
470    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
471    /// let p1 = flow.process::<()>();
472    /// let workers: Cluster<()> = flow.cluster::<()>();
473    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
474    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
475    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
476    ///     .demux(&workers, TCP.fail_stop().bincode());
477    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
478    /// // if there are 4 members in the cluster, each receives one element
479    /// // - MemberId::<()>(0): [0]
480    /// // - MemberId::<()>(1): [1]
481    /// // - MemberId::<()>(2): [2]
482    /// // - MemberId::<()>(3): [3]
483    /// # }, |mut stream| async move {
484    /// # let mut results = Vec::new();
485    /// # for w in 0..4 {
486    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
487    /// # }
488    /// # results.sort();
489    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
490    /// # }));
491    /// # }
492    /// ```
493    pub fn demux<N: NetworkFor<T>>(
494        self,
495        to: &Cluster<'a, L2>,
496        via: N,
497    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
498    where
499        T: Serialize + DeserializeOwned,
500    {
501        self.into_keyed().demux(to, via)
502    }
503}
504
505impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
506    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
507    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
508    /// [`bincode`] to serialize/deserialize messages.
509    ///
510    /// This provides load balancing by evenly distributing work across cluster members. The
511    /// distribution is deterministic based on element order - the first element goes to member 0,
512    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
513    ///
514    /// # Non-Determinism
515    /// The set of cluster members may asynchronously change over time. Each element is distributed
516    /// based on the current cluster membership _at that point in time_. Depending on when cluster
517    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
518    /// membership is stable, the order of members in the round-robin pattern may change across runs.
519    ///
520    /// # Ordering Requirements
521    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
522    /// order of messages and retries affects the round-robin pattern.
523    ///
524    /// # Example
525    /// ```rust
526    /// # #[cfg(feature = "deploy")] {
527    /// # use hydro_lang::prelude::*;
528    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
529    /// # use futures::StreamExt;
530    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
531    /// let p1 = flow.process::<()>();
532    /// let workers: Cluster<()> = flow.cluster::<()>();
533    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
534    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
535    /// on_worker.send_bincode(&p2)
536    /// # .first().values() // we use first to assert that each member gets one element
537    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
538    /// // - MemberId::<()>(?): [1]
539    /// // - MemberId::<()>(?): [2]
540    /// // - MemberId::<()>(?): [3]
541    /// // - MemberId::<()>(?): [4]
542    /// # }, |mut stream| async move {
543    /// # let mut results = Vec::new();
544    /// # for w in 0..4 {
545    /// #     results.push(stream.next().await.unwrap());
546    /// # }
547    /// # results.sort();
548    /// # assert_eq!(results, vec![1, 2, 3, 4]);
549    /// # }));
550    /// # }
551    /// ```
552    pub fn round_robin_bincode<L2: 'a>(
553        self,
554        other: &Cluster<'a, L2>,
555        nondet_membership: NonDet,
556    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
557    where
558        T: Serialize + DeserializeOwned,
559    {
560        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
561    }
562
563    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
564    /// the configuration in `via` to set up the message transport.
565    ///
566    /// This provides load balancing by evenly distributing work across cluster members. The
567    /// distribution is deterministic based on element order - the first element goes to member 0,
568    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
569    ///
570    /// # Non-Determinism
571    /// The set of cluster members may asynchronously change over time. Each element is distributed
572    /// based on the current cluster membership _at that point in time_. Depending on when cluster
573    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
574    /// membership is stable, the order of members in the round-robin pattern may change across runs.
575    ///
576    /// # Ordering Requirements
577    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
578    /// order of messages and retries affects the round-robin pattern.
579    ///
580    /// # Example
581    /// ```rust
582    /// # #[cfg(feature = "deploy")] {
583    /// # use hydro_lang::prelude::*;
584    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
585    /// # use futures::StreamExt;
586    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
587    /// let p1 = flow.process::<()>();
588    /// let workers: Cluster<()> = flow.cluster::<()>();
589    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
590    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
591    /// on_worker.send(&p2, TCP.fail_stop().bincode())
592    /// # .first().values() // we use first to assert that each member gets one element
593    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
594    /// // - MemberId::<()>(?): [1]
595    /// // - MemberId::<()>(?): [2]
596    /// // - MemberId::<()>(?): [3]
597    /// // - MemberId::<()>(?): [4]
598    /// # }, |mut stream| async move {
599    /// # let mut results = Vec::new();
600    /// # for w in 0..4 {
601    /// #     results.push(stream.next().await.unwrap());
602    /// # }
603    /// # results.sort();
604    /// # assert_eq!(results, vec![1, 2, 3, 4]);
605    /// # }));
606    /// # }
607    /// ```
608    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
609        self,
610        to: &Cluster<'a, L2>,
611        via: N,
612        nondet_membership: NonDet,
613    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
614    where
615        T: Serialize + DeserializeOwned,
616    {
617        let ids = track_membership(self.location.source_cluster_members(to));
618        sliced! {
619            let members_snapshot = use(ids, nondet_membership);
620            let elements = use(self.enumerate(), nondet_membership);
621
622            let current_members = members_snapshot
623                .filter(q!(|b| *b))
624                .keys()
625                .assume_ordering::<TotalOrder>(nondet_membership)
626                .collect_vec();
627
628            elements
629                .cross_singleton(current_members)
630                .map(q!(|(data, members)| (
631                    members[data.0 % members.len()].clone(),
632                    data.1
633                )))
634        }
635        .demux(to, via)
636    }
637}
638
639impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
640    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
641    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
642    /// [`bincode`] to serialize/deserialize messages.
643    ///
644    /// This provides load balancing by evenly distributing work across cluster members. The
645    /// distribution is deterministic based on element order - the first element goes to member 0,
646    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
647    ///
648    /// # Non-Determinism
649    /// The set of cluster members may asynchronously change over time. Each element is distributed
650    /// based on the current cluster membership _at that point in time_. Depending on when cluster
651    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
652    /// membership is stable, the order of members in the round-robin pattern may change across runs.
653    ///
654    /// # Ordering Requirements
655    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
656    /// order of messages and retries affects the round-robin pattern.
657    ///
658    /// # Example
659    /// ```rust
660    /// # #[cfg(feature = "deploy")] {
661    /// # use hydro_lang::prelude::*;
662    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
663    /// # use hydro_lang::location::MemberId;
664    /// # use futures::StreamExt;
665    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
666    /// let p1 = flow.process::<()>();
667    /// let workers1: Cluster<()> = flow.cluster::<()>();
668    /// let workers2: Cluster<()> = flow.cluster::<()>();
669    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
670    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
671    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
672    /// on_worker2.send_bincode(&p2)
673    /// # .entries()
674    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
675    /// # }, |mut stream| async move {
676    /// # let mut results = Vec::new();
677    /// # let mut locations = std::collections::HashSet::new();
678    /// # for w in 0..=16 {
679    /// #     let (location, v) = stream.next().await.unwrap();
680    /// #     locations.insert(location);
681    /// #     results.push(v);
682    /// # }
683    /// # results.sort();
684    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
685    /// # assert_eq!(locations.len(), 16);
686    /// # }));
687    /// # }
688    /// ```
689    pub fn round_robin_bincode<L2: 'a>(
690        self,
691        other: &Cluster<'a, L2>,
692        nondet_membership: NonDet,
693    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
694    where
695        T: Serialize + DeserializeOwned,
696    {
697        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
698    }
699
700    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
701    /// the configuration in `via` to set up the message transport.
702    ///
703    /// This provides load balancing by evenly distributing work across cluster members. The
704    /// distribution is deterministic based on element order - the first element goes to member 0,
705    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
706    ///
707    /// # Non-Determinism
708    /// The set of cluster members may asynchronously change over time. Each element is distributed
709    /// based on the current cluster membership _at that point in time_. Depending on when cluster
710    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
711    /// membership is stable, the order of members in the round-robin pattern may change across runs.
712    ///
713    /// # Ordering Requirements
714    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
715    /// order of messages and retries affects the round-robin pattern.
716    ///
717    /// # Example
718    /// ```rust
719    /// # #[cfg(feature = "deploy")] {
720    /// # use hydro_lang::prelude::*;
721    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
722    /// # use hydro_lang::location::MemberId;
723    /// # use futures::StreamExt;
724    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
725    /// let p1 = flow.process::<()>();
726    /// let workers1: Cluster<()> = flow.cluster::<()>();
727    /// let workers2: Cluster<()> = flow.cluster::<()>();
728    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
729    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
730    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
731    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
732    /// # .entries()
733    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
734    /// # }, |mut stream| async move {
735    /// # let mut results = Vec::new();
736    /// # let mut locations = std::collections::HashSet::new();
737    /// # for w in 0..=16 {
738    /// #     let (location, v) = stream.next().await.unwrap();
739    /// #     locations.insert(location);
740    /// #     results.push(v);
741    /// # }
742    /// # results.sort();
743    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
744    /// # assert_eq!(locations.len(), 16);
745    /// # }));
746    /// # }
747    /// ```
748    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
749        self,
750        to: &Cluster<'a, L2>,
751        via: N,
752        nondet_membership: NonDet,
753    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
754    where
755        T: Serialize + DeserializeOwned,
756    {
757        let ids = track_membership(self.location.source_cluster_members(to));
758        sliced! {
759            let members_snapshot = use(ids, nondet_membership);
760            let elements = use(self.enumerate(), nondet_membership);
761
762            let current_members = members_snapshot
763                .filter(q!(|b| *b))
764                .keys()
765                .assume_ordering::<TotalOrder>(nondet_membership)
766                .collect_vec();
767
768            elements
769                .cross_singleton(current_members)
770                .map(q!(|(data, members)| (
771                    members[data.0 % members.len()].clone(),
772                    data.1
773                )))
774        }
775        .demux(to, via)
776    }
777}
778
779impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
780    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
781    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
782    /// using [`bincode`] to serialize/deserialize messages.
783    ///
784    /// Each cluster member sends its local stream elements, and they are collected at the destination
785    /// as a [`KeyedStream`] where keys identify the source cluster member.
786    ///
787    /// # Example
788    /// ```rust
789    /// # #[cfg(feature = "deploy")] {
790    /// # use hydro_lang::prelude::*;
791    /// # use futures::StreamExt;
792    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
793    /// let workers: Cluster<()> = flow.cluster::<()>();
794    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
795    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
796    /// # all_received.entries()
797    /// # }, |mut stream| async move {
798    /// // if there are 4 members in the cluster, we should receive 4 elements
799    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
800    /// # let mut results = Vec::new();
801    /// # for w in 0..4 {
802    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
803    /// # }
804    /// # results.sort();
805    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
806    /// # }));
807    /// # }
808    /// ```
809    ///
810    /// If you don't need to know the source for each element, you can use `.values()`
811    /// to get just the data:
812    /// ```rust
813    /// # #[cfg(feature = "deploy")] {
814    /// # use hydro_lang::prelude::*;
815    /// # use hydro_lang::live_collections::stream::NoOrder;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
818    /// # let workers: Cluster<()> = flow.cluster::<()>();
819    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
820    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
821    /// # values
822    /// # }, |mut stream| async move {
823    /// # let mut results = Vec::new();
824    /// # for w in 0..4 {
825    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
826    /// # }
827    /// # results.sort();
828    /// // if there are 4 members in the cluster, we should receive 4 elements
829    /// // 1, 1, 1, 1
830    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
831    /// # }));
832    /// # }
833    /// ```
834    pub fn send_bincode<L2>(
835        self,
836        other: &Process<'a, L2>,
837    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
838    where
839        T: Serialize + DeserializeOwned,
840    {
841        self.send(other, TCP.fail_stop().bincode())
842    }
843
844    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
845    /// using the configuration in `via` to set up the message transport.
846    ///
847    /// Each cluster member sends its local stream elements, and they are collected at the destination
848    /// as a [`KeyedStream`] where keys identify the source cluster member.
849    ///
850    /// # Example
851    /// ```rust
852    /// # #[cfg(feature = "deploy")] {
853    /// # use hydro_lang::prelude::*;
854    /// # use futures::StreamExt;
855    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
856    /// let workers: Cluster<()> = flow.cluster::<()>();
857    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
858    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
859    /// # all_received.entries()
860    /// # }, |mut stream| async move {
861    /// // if there are 4 members in the cluster, we should receive 4 elements
862    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
863    /// # let mut results = Vec::new();
864    /// # for w in 0..4 {
865    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
866    /// # }
867    /// # results.sort();
868    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
869    /// # }));
870    /// # }
871    /// ```
872    ///
873    /// If you don't need to know the source for each element, you can use `.values()`
874    /// to get just the data:
875    /// ```rust
876    /// # #[cfg(feature = "deploy")] {
877    /// # use hydro_lang::prelude::*;
878    /// # use hydro_lang::live_collections::stream::NoOrder;
879    /// # use futures::StreamExt;
880    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
881    /// # let workers: Cluster<()> = flow.cluster::<()>();
882    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
883    /// let values: Stream<i32, _, _, NoOrder> =
884    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
885    /// # values
886    /// # }, |mut stream| async move {
887    /// # let mut results = Vec::new();
888    /// # for w in 0..4 {
889    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
890    /// # }
891    /// # results.sort();
892    /// // if there are 4 members in the cluster, we should receive 4 elements
893    /// // 1, 1, 1, 1
894    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
895    /// # }));
896    /// # }
897    /// ```
898    pub fn send<L2, N: NetworkFor<T>>(
899        self,
900        to: &Process<'a, L2>,
901        via: N,
902    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
903    where
904        T: Serialize + DeserializeOwned,
905    {
906        let serialize_pipeline = Some(N::serialize_thunk(false));
907
908        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
909
910        let name = via.name();
911        if to.multiversioned() && name.is_none() {
912            panic!(
913                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
914            );
915        }
916
917        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
918            to.clone(),
919            HydroNode::Network {
920                name: name.map(ToOwned::to_owned),
921                networking_info: N::networking_info(),
922                serialize_fn: serialize_pipeline.map(|e| e.into()),
923                instantiate_fn: DebugInstantiate::Building,
924                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
925                input: Box::new(self.ir_node.into_inner()),
926                metadata: to.new_node_metadata(Stream::<
927                    (MemberId<L>, T),
928                    Process<'a, L2>,
929                    Unbounded,
930                    O,
931                    R,
932                >::collection_kind()),
933            },
934        );
935
936        raw_stream.into_keyed()
937    }
938
939    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
940    /// Broadcasts elements of this stream at each source member to all members of a destination
941    /// cluster, using [`bincode`] to serialize/deserialize messages.
942    ///
943    /// Each source member sends each of its stream elements to **every** member of the cluster
944    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
945    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
946    /// **only data elements** and sends each element to all cluster members.
947    ///
948    /// # Non-Determinism
949    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
950    /// to the current cluster members known _at that point in time_ at the source member. Depending
951    /// on when each source member is notified of membership changes, it will broadcast each element
952    /// to different members.
953    ///
954    /// # Example
955    /// ```rust
956    /// # #[cfg(feature = "deploy")] {
957    /// # use hydro_lang::prelude::*;
958    /// # use hydro_lang::location::MemberId;
959    /// # use futures::StreamExt;
960    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
961    /// # type Source = ();
962    /// # type Destination = ();
963    /// let source: Cluster<Source> = flow.cluster::<Source>();
964    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
965    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
966    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
967    /// # on_destination.entries().send_bincode(&p2).entries()
968    /// // if there are 4 members in the desination, each receives one element from each source member
969    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
970    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
971    /// // - ...
972    /// # }, |mut stream| async move {
973    /// # let mut results = Vec::new();
974    /// # for w in 0..16 {
975    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
976    /// # }
977    /// # results.sort();
978    /// # assert_eq!(results, vec![
979    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
980    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
981    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
982    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
983    /// # ]);
984    /// # }));
985    /// # }
986    /// ```
987    pub fn broadcast_bincode<L2: 'a>(
988        self,
989        other: &Cluster<'a, L2>,
990        nondet_membership: NonDet,
991    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
992    where
993        T: Clone + Serialize + DeserializeOwned,
994    {
995        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
996    }
997
998    /// Broadcasts elements of this stream at each source member to all members of a destination
999    /// cluster, using the configuration in `via` to set up the message transport.
1000    ///
1001    /// Each source member sends each of its stream elements to **every** member of the cluster
1002    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1003    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1004    /// **only data elements** and sends each element to all cluster members.
1005    ///
1006    /// # Non-Determinism
1007    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1008    /// to the current cluster members known _at that point in time_ at the source member. Depending
1009    /// on when each source member is notified of membership changes, it will broadcast each element
1010    /// to different members.
1011    ///
1012    /// # Example
1013    /// ```rust
1014    /// # #[cfg(feature = "deploy")] {
1015    /// # use hydro_lang::prelude::*;
1016    /// # use hydro_lang::location::MemberId;
1017    /// # use futures::StreamExt;
1018    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1019    /// # type Source = ();
1020    /// # type Destination = ();
1021    /// let source: Cluster<Source> = flow.cluster::<Source>();
1022    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1023    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1024    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1025    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1026    /// // if there are 4 members in the desination, each receives one element from each source member
1027    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1028    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1029    /// // - ...
1030    /// # }, |mut stream| async move {
1031    /// # let mut results = Vec::new();
1032    /// # for w in 0..16 {
1033    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1034    /// # }
1035    /// # results.sort();
1036    /// # assert_eq!(results, vec![
1037    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1038    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1039    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1040    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1041    /// # ]);
1042    /// # }));
1043    /// # }
1044    /// ```
1045    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1046        self,
1047        to: &Cluster<'a, L2>,
1048        via: N,
1049        nondet_membership: NonDet,
1050    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1051    where
1052        T: Clone + Serialize + DeserializeOwned,
1053    {
1054        let ids = track_membership(self.location.source_cluster_members(to));
1055        sliced! {
1056            let members_snapshot = use(ids, nondet_membership);
1057            let elements = use(self, nondet_membership);
1058
1059            let current_members = members_snapshot.filter(q!(|b| *b));
1060            elements.repeat_with_keys(current_members)
1061        }
1062        .demux(to, via)
1063    }
1064}
1065
1066impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1067    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1068{
1069    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1070    /// Sends elements of this stream at each source member to specific members of a destination
1071    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1072    ///
1073    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1074    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1075    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1076    /// all members.
1077    ///
1078    /// Each cluster member sends its local stream elements, and they are collected at each
1079    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1080    ///
1081    /// # Example
1082    /// ```rust
1083    /// # #[cfg(feature = "deploy")] {
1084    /// # use hydro_lang::prelude::*;
1085    /// # use futures::StreamExt;
1086    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1087    /// # type Source = ();
1088    /// # type Destination = ();
1089    /// let source: Cluster<Source> = flow.cluster::<Source>();
1090    /// let to_send: Stream<_, Cluster<_>, _> = source
1091    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1092    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1093    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1094    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1095    /// # all_received.entries().send_bincode(&p2).entries()
1096    /// # }, |mut stream| async move {
1097    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1098    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1099    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1100    /// // - ...
1101    /// # let mut results = Vec::new();
1102    /// # for w in 0..16 {
1103    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1104    /// # }
1105    /// # results.sort();
1106    /// # assert_eq!(results, vec![
1107    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1108    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1109    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1110    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1111    /// # ]);
1112    /// # }));
1113    /// # }
1114    /// ```
1115    pub fn demux_bincode(
1116        self,
1117        other: &Cluster<'a, L2>,
1118    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1119    where
1120        T: Serialize + DeserializeOwned,
1121    {
1122        self.demux(other, TCP.fail_stop().bincode())
1123    }
1124
1125    /// Sends elements of this stream at each source member to specific members of a destination
1126    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1127    /// message transport.
1128    ///
1129    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1130    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1131    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1132    /// all members.
1133    ///
1134    /// Each cluster member sends its local stream elements, and they are collected at each
1135    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1136    ///
1137    /// # Example
1138    /// ```rust
1139    /// # #[cfg(feature = "deploy")] {
1140    /// # use hydro_lang::prelude::*;
1141    /// # use futures::StreamExt;
1142    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1143    /// # type Source = ();
1144    /// # type Destination = ();
1145    /// let source: Cluster<Source> = flow.cluster::<Source>();
1146    /// let to_send: Stream<_, Cluster<_>, _> = source
1147    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1148    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1149    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1150    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1151    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1152    /// # }, |mut stream| async move {
1153    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1154    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1155    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1156    /// // - ...
1157    /// # let mut results = Vec::new();
1158    /// # for w in 0..16 {
1159    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1160    /// # }
1161    /// # results.sort();
1162    /// # assert_eq!(results, vec![
1163    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1164    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1165    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1166    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1167    /// # ]);
1168    /// # }));
1169    /// # }
1170    /// ```
1171    pub fn demux<N: NetworkFor<T>>(
1172        self,
1173        to: &Cluster<'a, L2>,
1174        via: N,
1175    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1176    where
1177        T: Serialize + DeserializeOwned,
1178    {
1179        self.into_keyed().demux(to, via)
1180    }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    #[cfg(feature = "sim")]
1186    use stageleft::q;
1187
1188    #[cfg(feature = "sim")]
1189    use crate::location::{Location, MemberId};
1190    #[cfg(feature = "sim")]
1191    use crate::networking::TCP;
1192    #[cfg(feature = "sim")]
1193    use crate::nondet::nondet;
1194    #[cfg(feature = "sim")]
1195    use crate::prelude::FlowBuilder;
1196
1197    #[cfg(feature = "sim")]
1198    #[test]
1199    fn sim_send_bincode_o2o() {
1200        use crate::networking::TCP;
1201
1202        let mut flow = FlowBuilder::new();
1203        let node = flow.process::<()>();
1204        let node2 = flow.process::<()>();
1205
1206        let (in_send, input) = node.sim_input();
1207
1208        let out_recv = input
1209            .send(&node2, TCP.fail_stop().bincode())
1210            .batch(&node2.tick(), nondet!(/** test */))
1211            .count()
1212            .all_ticks()
1213            .sim_output();
1214
1215        let instances = flow.sim().exhaustive(async || {
1216            in_send.send(());
1217            in_send.send(());
1218            in_send.send(());
1219
1220            let received = out_recv.collect::<Vec<_>>().await;
1221            assert!(received.into_iter().sum::<usize>() == 3);
1222        });
1223
1224        assert_eq!(instances, 4); // 2^{3 - 1}
1225    }
1226
1227    #[cfg(feature = "sim")]
1228    #[test]
1229    fn sim_send_bincode_m2o() {
1230        let mut flow = FlowBuilder::new();
1231        let cluster = flow.cluster::<()>();
1232        let node = flow.process::<()>();
1233
1234        let input = cluster.source_iter(q!(vec![1]));
1235
1236        let out_recv = input
1237            .send(&node, TCP.fail_stop().bincode())
1238            .entries()
1239            .batch(&node.tick(), nondet!(/** test */))
1240            .all_ticks()
1241            .sim_output();
1242
1243        let instances = flow
1244            .sim()
1245            .with_cluster_size(&cluster, 4)
1246            .exhaustive(async || {
1247                out_recv
1248                    .assert_yields_only_unordered(vec![
1249                        (MemberId::from_raw_id(0), 1),
1250                        (MemberId::from_raw_id(1), 1),
1251                        (MemberId::from_raw_id(2), 1),
1252                        (MemberId::from_raw_id(3), 1),
1253                    ])
1254                    .await
1255            });
1256
1257        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1258    }
1259
1260    #[cfg(feature = "sim")]
1261    #[test]
1262    fn sim_send_bincode_multiple_m2o() {
1263        let mut flow = FlowBuilder::new();
1264        let cluster1 = flow.cluster::<()>();
1265        let cluster2 = flow.cluster::<()>();
1266        let node = flow.process::<()>();
1267
1268        let out_recv_1 = cluster1
1269            .source_iter(q!(vec![1]))
1270            .send(&node, TCP.fail_stop().bincode())
1271            .entries()
1272            .sim_output();
1273
1274        let out_recv_2 = cluster2
1275            .source_iter(q!(vec![2]))
1276            .send(&node, TCP.fail_stop().bincode())
1277            .entries()
1278            .sim_output();
1279
1280        let instances = flow
1281            .sim()
1282            .with_cluster_size(&cluster1, 3)
1283            .with_cluster_size(&cluster2, 4)
1284            .exhaustive(async || {
1285                out_recv_1
1286                    .assert_yields_only_unordered(vec![
1287                        (MemberId::from_raw_id(0), 1),
1288                        (MemberId::from_raw_id(1), 1),
1289                        (MemberId::from_raw_id(2), 1),
1290                    ])
1291                    .await;
1292
1293                out_recv_2
1294                    .assert_yields_only_unordered(vec![
1295                        (MemberId::from_raw_id(0), 2),
1296                        (MemberId::from_raw_id(1), 2),
1297                        (MemberId::from_raw_id(2), 2),
1298                        (MemberId::from_raw_id(3), 2),
1299                    ])
1300                    .await;
1301            });
1302
1303        assert_eq!(instances, 1);
1304    }
1305
1306    #[cfg(feature = "sim")]
1307    #[test]
1308    fn sim_send_bincode_o2m() {
1309        let mut flow = FlowBuilder::new();
1310        let cluster = flow.cluster::<()>();
1311        let node = flow.process::<()>();
1312
1313        let input = node.source_iter(q!(vec![
1314            (MemberId::from_raw_id(0), 123),
1315            (MemberId::from_raw_id(1), 456),
1316        ]));
1317
1318        let out_recv = input
1319            .demux(&cluster, TCP.fail_stop().bincode())
1320            .map(q!(|x| x + 1))
1321            .send(&node, TCP.fail_stop().bincode())
1322            .entries()
1323            .sim_output();
1324
1325        flow.sim()
1326            .with_cluster_size(&cluster, 4)
1327            .exhaustive(async || {
1328                out_recv
1329                    .assert_yields_only_unordered(vec![
1330                        (MemberId::from_raw_id(0), 124),
1331                        (MemberId::from_raw_id(1), 457),
1332                    ])
1333                    .await
1334            });
1335    }
1336
1337    #[cfg(feature = "sim")]
1338    #[test]
1339    fn sim_broadcast_bincode_o2m() {
1340        let mut flow = FlowBuilder::new();
1341        let cluster = flow.cluster::<()>();
1342        let node = flow.process::<()>();
1343
1344        let input = node.source_iter(q!(vec![123, 456]));
1345
1346        let out_recv = input
1347            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1348            .map(q!(|x| x + 1))
1349            .send(&node, TCP.fail_stop().bincode())
1350            .entries()
1351            .sim_output();
1352
1353        let mut c_1_produced = false;
1354        let mut c_2_produced = false;
1355
1356        flow.sim()
1357            .with_cluster_size(&cluster, 2)
1358            .exhaustive(async || {
1359                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1360
1361                // check that order is preserved
1362                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1363                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1364                    c_1_produced = true;
1365                }
1366
1367                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1368                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1369                    c_2_produced = true;
1370                }
1371            });
1372
1373        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1374    }
1375
1376    #[cfg(feature = "sim")]
1377    #[test]
1378    fn sim_send_bincode_m2m() {
1379        let mut flow = FlowBuilder::new();
1380        let cluster = flow.cluster::<()>();
1381        let node = flow.process::<()>();
1382
1383        let input = node.source_iter(q!(vec![
1384            (MemberId::from_raw_id(0), 123),
1385            (MemberId::from_raw_id(1), 456),
1386        ]));
1387
1388        let out_recv = input
1389            .demux(&cluster, TCP.fail_stop().bincode())
1390            .map(q!(|x| x + 1))
1391            .flat_map_ordered(q!(|x| vec![
1392                (MemberId::from_raw_id(0), x),
1393                (MemberId::from_raw_id(1), x),
1394            ]))
1395            .demux(&cluster, TCP.fail_stop().bincode())
1396            .entries()
1397            .send(&node, TCP.fail_stop().bincode())
1398            .entries()
1399            .sim_output();
1400
1401        flow.sim()
1402            .with_cluster_size(&cluster, 4)
1403            .exhaustive(async || {
1404                out_recv
1405                    .assert_yields_only_unordered(vec![
1406                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1407                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1408                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1409                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1410                    ])
1411                    .await
1412            });
1413    }
1414}