hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::tick::NoAtomic;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::nondet::NonDet;
22use crate::staging_util::get_this_crate;
23
24// same as the one in `hydro_std`, but internal use only
25fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
26    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
27) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
28    membership
29        .fold(
30            q!(|| false),
31            q!(|present, event| {
32                match event {
33                    MembershipEvent::Joined => *present = true,
34                    MembershipEvent::Left => *present = false,
35                }
36            }),
37        )
38        .filter_map(q!(|v| if v { Some(()) } else { None }))
39}
40
41fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
42    let root = get_this_crate();
43
44    if is_demux {
45        parse_quote! {
46            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
47                |(id, data)| {
48                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
49                }
50            )
51        }
52    } else {
53        parse_quote! {
54            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
55                |data| {
56                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
57                }
58            )
59        }
60    }
61}
62
63pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
64    serialize_bincode_with_type(is_demux, &quote_type::<T>())
65}
66
67fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
68    let root = get_this_crate();
69
70    if let Some(c_type) = tagged {
71        parse_quote! {
72            |res| {
73                let (id, b) = res.unwrap();
74                (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75            }
76        }
77    } else {
78        parse_quote! {
79            |res| {
80                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81            }
82        }
83    }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87    deserialize_bincode_with_type(tagged, &quote_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
92    /// using [`bincode`] to serialize/deserialize messages.
93    ///
94    /// The returned stream captures the elements received at the destination, where values will
95    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
96    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
97    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
98    /// dropped no further messages will be sent.
99    ///
100    /// # Example
101    /// ```rust
102    /// # use hydro_lang::prelude::*;
103    /// # use futures::StreamExt;
104    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
105    /// let p1 = flow.process::<()>();
106    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
107    /// let p2 = flow.process::<()>();
108    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
109    /// // 1, 2, 3
110    /// # on_p2.send_bincode(&p_out)
111    /// # }, |mut stream| async move {
112    /// # for w in 1..=3 {
113    /// #     assert_eq!(stream.next().await, Some(w));
114    /// # }
115    /// # }));
116    /// ```
117    pub fn send_bincode<L2>(
118        self,
119        other: &Process<'a, L2>,
120    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
121    where
122        T: Serialize + DeserializeOwned,
123    {
124        let serialize_pipeline = Some(serialize_bincode::<T>(false));
125
126        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
127
128        Stream::new(
129            other.clone(),
130            HydroNode::Network {
131                serialize_fn: serialize_pipeline.map(|e| e.into()),
132                instantiate_fn: DebugInstantiate::Building,
133                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
134                input: Box::new(self.ir_node.into_inner()),
135                metadata: other.new_node_metadata::<T>(),
136            },
137        )
138    }
139
140    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
141    /// using [`bincode`] to serialize/deserialize messages.
142    ///
143    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
144    /// membership information. This is a common pattern in distributed systems for broadcasting data to
145    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
146    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
147    /// each element to all cluster members.
148    ///
149    /// # Non-Determinism
150    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
151    /// to the current cluster members _at that point in time_. Depending on when we are notified of
152    /// membership changes, we will broadcast each element to different members.
153    ///
154    /// # Example
155    /// ```rust
156    /// # use hydro_lang::prelude::*;
157    /// # use futures::StreamExt;
158    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
159    /// let p1 = flow.process::<()>();
160    /// let workers: Cluster<()> = flow.cluster::<()>();
161    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
162    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
163    /// # on_worker.send_bincode(&p2).entries()
164    /// // if there are 4 members in the cluster, each receives one element
165    /// // - MemberId::<()>(0): [123]
166    /// // - MemberId::<()>(1): [123]
167    /// // - MemberId::<()>(2): [123]
168    /// // - MemberId::<()>(3): [123]
169    /// # }, |mut stream| async move {
170    /// # let mut results = Vec::new();
171    /// # for w in 0..4 {
172    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
173    /// # }
174    /// # results.sort();
175    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
176    /// # }));
177    /// ```
178    pub fn broadcast_bincode<L2: 'a>(
179        self,
180        other: &Cluster<'a, L2>,
181        nondet_membership: NonDet,
182    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
183    where
184        T: Clone + Serialize + DeserializeOwned,
185    {
186        let ids = track_membership(self.location.source_cluster_members(other));
187        let join_tick = self.location.tick();
188        let current_members = ids.snapshot(&join_tick, nondet_membership);
189
190        self.batch(&join_tick, nondet_membership)
191            .repeat_with_keys(current_members)
192            .all_ticks()
193            .demux_bincode(other)
194    }
195
196    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
197    /// serialization. The external process can receive these elements by establishing a TCP
198    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
199    ///
200    /// # Example
201    /// ```rust
202    /// # use hydro_lang::prelude::*;
203    /// # use futures::StreamExt;
204    /// # tokio_test::block_on(async move {
205    /// let flow = FlowBuilder::new();
206    /// let process = flow.process::<()>();
207    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
208    /// let external = flow.external::<()>();
209    /// let external_handle = numbers.send_bincode_external(&external);
210    ///
211    /// let mut deployment = hydro_deploy::Deployment::new();
212    /// let nodes = flow
213    ///     .with_process(&process, deployment.Localhost())
214    ///     .with_external(&external, deployment.Localhost())
215    ///     .deploy(&mut deployment);
216    ///
217    /// deployment.deploy().await.unwrap();
218    /// // establish the TCP connection
219    /// let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
220    /// deployment.start().await.unwrap();
221    ///
222    /// for w in 1..=3 {
223    ///     assert_eq!(external_recv_stream.next().await, Some(w));
224    /// }
225    /// # });
226    /// ```
227    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
228    where
229        T: Serialize + DeserializeOwned,
230    {
231        let serialize_pipeline = Some(serialize_bincode::<T>(false));
232
233        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
234
235        let external_key = flow_state_borrow.next_external_out;
236        flow_state_borrow.next_external_out += 1;
237
238        flow_state_borrow.push_root(HydroRoot::SendExternal {
239            to_external_id: other.id,
240            to_key: external_key,
241            to_many: false,
242            serialize_fn: serialize_pipeline.map(|e| e.into()),
243            instantiate_fn: DebugInstantiate::Building,
244            input: Box::new(HydroNode::Unpersist {
245                inner: Box::new(self.ir_node.into_inner()),
246                metadata: self.location.new_node_metadata::<T>(),
247            }),
248            op_metadata: HydroIrOpMetadata::new(),
249        });
250
251        ExternalBincodeStream {
252            process_id: other.id,
253            port_id: external_key,
254            _phantom: PhantomData,
255        }
256    }
257}
258
259impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
260    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
261{
262    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
263    /// using [`bincode`] to serialize/deserialize messages.
264    ///
265    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
266    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
267    /// this API allows precise targeting of specific cluster members rather than broadcasting to
268    /// all members.
269    ///
270    /// # Example
271    /// ```rust
272    /// # use hydro_lang::prelude::*;
273    /// # use futures::StreamExt;
274    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
275    /// let p1 = flow.process::<()>();
276    /// let workers: Cluster<()> = flow.cluster::<()>();
277    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
278    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
279    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
280    ///     .demux_bincode(&workers);
281    /// # on_worker.send_bincode(&p2).entries()
282    /// // if there are 4 members in the cluster, each receives one element
283    /// // - MemberId::<()>(0): [0]
284    /// // - MemberId::<()>(1): [1]
285    /// // - MemberId::<()>(2): [2]
286    /// // - MemberId::<()>(3): [3]
287    /// # }, |mut stream| async move {
288    /// # let mut results = Vec::new();
289    /// # for w in 0..4 {
290    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
291    /// # }
292    /// # results.sort();
293    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
294    /// # }));
295    /// ```
296    pub fn demux_bincode(
297        self,
298        other: &Cluster<'a, L2>,
299    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
300    where
301        T: Serialize + DeserializeOwned,
302    {
303        self.into_keyed().demux_bincode(other)
304    }
305}
306
307impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
308    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
309    /// [`bincode`] to serialize/deserialize messages.
310    ///
311    /// This provides load balancing by evenly distributing work across cluster members. The
312    /// distribution is deterministic based on element order - the first element goes to member 0,
313    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
314    ///
315    /// # Non-Determinism
316    /// The set of cluster members may asynchronously change over time. Each element is distributed
317    /// based on the current cluster membership _at that point in time_. Depending on when cluster
318    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
319    /// membership is stable, the order of members in the round-robin pattern may change across runs.
320    ///
321    /// # Ordering Requirements
322    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
323    /// order of messages and retries affects the round-robin pattern.
324    ///
325    /// # Example
326    /// ```rust
327    /// # use hydro_lang::prelude::*;
328    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
329    /// # use futures::StreamExt;
330    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
331    /// let p1 = flow.process::<()>();
332    /// let workers: Cluster<()> = flow.cluster::<()>();
333    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
334    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
335    /// on_worker.send_bincode(&p2)
336    /// # .first().values() // we use first to assert that each member gets one element
337    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
338    /// // - MemberId::<()>(?): [1]
339    /// // - MemberId::<()>(?): [2]
340    /// // - MemberId::<()>(?): [3]
341    /// // - MemberId::<()>(?): [4]
342    /// # }, |mut stream| async move {
343    /// # let mut results = Vec::new();
344    /// # for w in 0..4 {
345    /// #     results.push(stream.next().await.unwrap());
346    /// # }
347    /// # results.sort();
348    /// # assert_eq!(results, vec![1, 2, 3, 4]);
349    /// # }));
350    /// ```
351    pub fn round_robin_bincode<L2: 'a>(
352        self,
353        other: &Cluster<'a, L2>,
354        nondet_membership: NonDet,
355    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
356    where
357        T: Serialize + DeserializeOwned,
358    {
359        let ids = track_membership(self.location.source_cluster_members(other));
360        let join_tick = self.location.tick();
361        let current_members = ids
362            .snapshot(&join_tick, nondet_membership)
363            .keys()
364            .assume_ordering(nondet_membership)
365            .collect_vec();
366
367        self.enumerate()
368            .batch(&join_tick, nondet_membership)
369            .cross_singleton(current_members)
370            .map(q!(|(data, members)| (
371                members[data.0 % members.len()],
372                data.1
373            )))
374            .all_ticks()
375            .demux_bincode(other)
376    }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
380    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
381    /// using [`bincode`] to serialize/deserialize messages.
382    ///
383    /// Each cluster member sends its local stream elements, and they are collected at the destination
384    /// as a [`KeyedStream`] where keys identify the source cluster member.
385    ///
386    /// # Example
387    /// ```rust
388    /// # use hydro_lang::prelude::*;
389    /// # use futures::StreamExt;
390    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
391    /// let workers: Cluster<()> = flow.cluster::<()>();
392    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
393    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
394    /// # all_received.entries()
395    /// # }, |mut stream| async move {
396    /// // if there are 4 members in the cluster, we should receive 4 elements
397    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
398    /// # let mut results = Vec::new();
399    /// # for w in 0..4 {
400    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
401    /// # }
402    /// # results.sort();
403    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
404    /// # }));
405    /// ```
406    ///
407    /// If you don't need to know the source for each element, you can use `.values()`
408    /// to get just the data:
409    /// ```rust
410    /// # use hydro_lang::prelude::*;
411    /// # use hydro_lang::live_collections::stream::NoOrder;
412    /// # use futures::StreamExt;
413    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
414    /// # let workers: Cluster<()> = flow.cluster::<()>();
415    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
416    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
417    /// # values
418    /// # }, |mut stream| async move {
419    /// # let mut results = Vec::new();
420    /// # for w in 0..4 {
421    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
422    /// # }
423    /// # results.sort();
424    /// // if there are 4 members in the cluster, we should receive 4 elements
425    /// // 1, 1, 1, 1
426    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
427    /// # }));
428    /// ```
429    pub fn send_bincode<L2>(
430        self,
431        other: &Process<'a, L2>,
432    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
433    where
434        T: Serialize + DeserializeOwned,
435    {
436        let serialize_pipeline = Some(serialize_bincode::<T>(false));
437
438        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
439
440        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
441            other.clone(),
442            HydroNode::Network {
443                serialize_fn: serialize_pipeline.map(|e| e.into()),
444                instantiate_fn: DebugInstantiate::Building,
445                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
446                input: Box::new(self.ir_node.into_inner()),
447                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
448            },
449        );
450
451        raw_stream.into_keyed()
452    }
453
454    /// Broadcasts elements of this stream at each source member to all members of a destination
455    /// cluster, using [`bincode`] to serialize/deserialize messages.
456    ///
457    /// Each source member sends each of its stream elements to **every** member of the cluster
458    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
459    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
460    /// **only data elements** and sends each element to all cluster members.
461    ///
462    /// # Non-Determinism
463    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
464    /// to the current cluster members known _at that point in time_ at the source member. Depending
465    /// on when each source member is notified of membership changes, it will broadcast each element
466    /// to different members.
467    ///
468    /// # Example
469    /// ```rust
470    /// # use hydro_lang::prelude::*;
471    /// # use hydro_lang::location::MemberId;
472    /// # use futures::StreamExt;
473    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
474    /// # type Source = ();
475    /// # type Destination = ();
476    /// let source: Cluster<Source> = flow.cluster::<Source>();
477    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
478    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
479    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
480    /// # on_destination.entries().send_bincode(&p2).entries()
481    /// // if there are 4 members in the desination, each receives one element from each source member
482    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
483    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
484    /// // - ...
485    /// # }, |mut stream| async move {
486    /// # let mut results = Vec::new();
487    /// # for w in 0..16 {
488    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
489    /// # }
490    /// # results.sort();
491    /// # assert_eq!(results, vec![
492    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
493    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
494    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
495    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
496    /// # ]);
497    /// # }));
498    /// ```
499    pub fn broadcast_bincode<L2: 'a>(
500        self,
501        other: &Cluster<'a, L2>,
502        nondet_membership: NonDet,
503    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
504    where
505        T: Clone + Serialize + DeserializeOwned,
506    {
507        let ids = track_membership(self.location.source_cluster_members(other));
508        let join_tick = self.location.tick();
509        let current_members = ids.snapshot(&join_tick, nondet_membership);
510
511        self.batch(&join_tick, nondet_membership)
512            .repeat_with_keys(current_members)
513            .all_ticks()
514            .demux_bincode(other)
515    }
516}
517
518impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
519    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
520{
521    /// Sends elements of this stream at each source member to specific members of a destination
522    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
523    ///
524    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
525    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
526    /// this API allows precise targeting of specific cluster members rather than broadcasting to
527    /// all members.
528    ///
529    /// Each cluster member sends its local stream elements, and they are collected at each
530    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
531    ///
532    /// # Example
533    /// ```rust
534    /// # use hydro_lang::prelude::*;
535    /// # use futures::StreamExt;
536    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
537    /// # type Source = ();
538    /// # type Destination = ();
539    /// let source: Cluster<Source> = flow.cluster::<Source>();
540    /// let to_send: Stream<_, Cluster<_>, _> = source
541    ///     .source_iter(q!(vec![0, 1, 2, 3]))
542    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
543    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
544    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
545    /// # all_received.entries().send_bincode(&p2).entries()
546    /// # }, |mut stream| async move {
547    /// // if there are 4 members in the destination cluster, each receives one message from each source member
548    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
549    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
550    /// // - ...
551    /// # let mut results = Vec::new();
552    /// # for w in 0..16 {
553    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
554    /// # }
555    /// # results.sort();
556    /// # assert_eq!(results, vec![
557    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
558    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
559    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
560    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
561    /// # ]);
562    /// # }));
563    /// ```
564    pub fn demux_bincode(
565        self,
566        other: &Cluster<'a, L2>,
567    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
568    where
569        T: Serialize + DeserializeOwned,
570    {
571        self.into_keyed().demux_bincode(other)
572    }
573}