hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
27    membership
28        .fold(
29            q!(|| false),
30            q!(|present, event| {
31                match event {
32                    MembershipEvent::Joined => *present = true,
33                    MembershipEvent::Left => *present = false,
34                }
35            }),
36        )
37        .filter_map(q!(|v| if v { Some(()) } else { None }))
38}
39
40fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
41    let root = get_this_crate();
42
43    if is_demux {
44        parse_quote! {
45            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
46                |(id, data)| {
47                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
48                }
49            )
50        }
51    } else {
52        parse_quote! {
53            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
54                |data| {
55                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
56                }
57            )
58        }
59    }
60}
61
62pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
63    serialize_bincode_with_type(is_demux, &quote_type::<T>())
64}
65
66fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
67    let root = get_this_crate();
68
69    if let Some(c_type) = tagged {
70        parse_quote! {
71            |res| {
72                let (id, b) = res.unwrap();
73                (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
74            }
75        }
76    } else {
77        parse_quote! {
78            |res| {
79                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
80            }
81        }
82    }
83}
84
85pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
86    deserialize_bincode_with_type(tagged, &quote_type::<T>())
87}
88
89impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
90    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
91    /// using [`bincode`] to serialize/deserialize messages.
92    ///
93    /// The returned stream captures the elements received at the destination, where values will
94    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
95    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
96    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
97    /// dropped no further messages will be sent.
98    ///
99    /// # Example
100    /// ```rust
101    /// # use hydro_lang::prelude::*;
102    /// # use futures::StreamExt;
103    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
104    /// let p1 = flow.process::<()>();
105    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
106    /// let p2 = flow.process::<()>();
107    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
108    /// // 1, 2, 3
109    /// # on_p2.send_bincode(&p_out)
110    /// # }, |mut stream| async move {
111    /// # for w in 1..=3 {
112    /// #     assert_eq!(stream.next().await, Some(w));
113    /// # }
114    /// # }));
115    /// ```
116    pub fn send_bincode<L2>(
117        self,
118        other: &Process<'a, L2>,
119    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
120    where
121        T: Serialize + DeserializeOwned,
122    {
123        let serialize_pipeline = Some(serialize_bincode::<T>(false));
124
125        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
126
127        Stream::new(
128            other.clone(),
129            HydroNode::Network {
130                serialize_fn: serialize_pipeline.map(|e| e.into()),
131                instantiate_fn: DebugInstantiate::Building,
132                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133                input: Box::new(self.ir_node.into_inner()),
134                metadata: other.new_node_metadata(
135                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
136                ),
137            },
138        )
139    }
140
141    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
142    /// using [`bincode`] to serialize/deserialize messages.
143    ///
144    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
145    /// membership information. This is a common pattern in distributed systems for broadcasting data to
146    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
147    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
148    /// each element to all cluster members.
149    ///
150    /// # Non-Determinism
151    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
152    /// to the current cluster members _at that point in time_. Depending on when we are notified of
153    /// membership changes, we will broadcast each element to different members.
154    ///
155    /// # Example
156    /// ```rust
157    /// # use hydro_lang::prelude::*;
158    /// # use futures::StreamExt;
159    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
160    /// let p1 = flow.process::<()>();
161    /// let workers: Cluster<()> = flow.cluster::<()>();
162    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
163    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
164    /// # on_worker.send_bincode(&p2).entries()
165    /// // if there are 4 members in the cluster, each receives one element
166    /// // - MemberId::<()>(0): [123]
167    /// // - MemberId::<()>(1): [123]
168    /// // - MemberId::<()>(2): [123]
169    /// // - MemberId::<()>(3): [123]
170    /// # }, |mut stream| async move {
171    /// # let mut results = Vec::new();
172    /// # for w in 0..4 {
173    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
174    /// # }
175    /// # results.sort();
176    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
177    /// # }));
178    /// ```
179    pub fn broadcast_bincode<L2: 'a>(
180        self,
181        other: &Cluster<'a, L2>,
182        nondet_membership: NonDet,
183    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
184    where
185        T: Clone + Serialize + DeserializeOwned,
186    {
187        let ids = track_membership(self.location.source_cluster_members(other));
188        let join_tick = self.location.tick();
189        let current_members = ids.snapshot(&join_tick, nondet_membership);
190
191        self.batch(&join_tick, nondet_membership)
192            .repeat_with_keys(current_members)
193            .all_ticks()
194            .demux_bincode(other)
195    }
196
197    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
198    /// serialization. The external process can receive these elements by establishing a TCP
199    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
200    ///
201    /// # Example
202    /// ```rust
203    /// # use hydro_lang::prelude::*;
204    /// # use futures::StreamExt;
205    /// # tokio_test::block_on(async move {
206    /// let flow = FlowBuilder::new();
207    /// let process = flow.process::<()>();
208    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
209    /// let external = flow.external::<()>();
210    /// let external_handle = numbers.send_bincode_external(&external);
211    ///
212    /// let mut deployment = hydro_deploy::Deployment::new();
213    /// let nodes = flow
214    ///     .with_process(&process, deployment.Localhost())
215    ///     .with_external(&external, deployment.Localhost())
216    ///     .deploy(&mut deployment);
217    ///
218    /// deployment.deploy().await.unwrap();
219    /// // establish the TCP connection
220    /// let mut external_recv_stream = nodes.connect(external_handle).await;
221    /// deployment.start().await.unwrap();
222    ///
223    /// for w in 1..=3 {
224    ///     assert_eq!(external_recv_stream.next().await, Some(w));
225    /// }
226    /// # });
227    /// ```
228    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
229    where
230        T: Serialize + DeserializeOwned,
231    {
232        let serialize_pipeline = Some(serialize_bincode::<T>(false));
233
234        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
235
236        let external_key = flow_state_borrow.next_external_out;
237        flow_state_borrow.next_external_out += 1;
238
239        flow_state_borrow.push_root(HydroRoot::SendExternal {
240            to_external_id: other.id,
241            to_key: external_key,
242            to_many: false,
243            serialize_fn: serialize_pipeline.map(|e| e.into()),
244            instantiate_fn: DebugInstantiate::Building,
245            input: Box::new(self.ir_node.into_inner()),
246            op_metadata: HydroIrOpMetadata::new(),
247        });
248
249        ExternalBincodeStream {
250            process_id: other.id,
251            port_id: external_key,
252            _phantom: PhantomData,
253        }
254    }
255}
256
257impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
258    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
259{
260    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
261    /// using [`bincode`] to serialize/deserialize messages.
262    ///
263    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
264    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
265    /// this API allows precise targeting of specific cluster members rather than broadcasting to
266    /// all members.
267    ///
268    /// # Example
269    /// ```rust
270    /// # use hydro_lang::prelude::*;
271    /// # use futures::StreamExt;
272    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273    /// let p1 = flow.process::<()>();
274    /// let workers: Cluster<()> = flow.cluster::<()>();
275    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
276    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
277    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
278    ///     .demux_bincode(&workers);
279    /// # on_worker.send_bincode(&p2).entries()
280    /// // if there are 4 members in the cluster, each receives one element
281    /// // - MemberId::<()>(0): [0]
282    /// // - MemberId::<()>(1): [1]
283    /// // - MemberId::<()>(2): [2]
284    /// // - MemberId::<()>(3): [3]
285    /// # }, |mut stream| async move {
286    /// # let mut results = Vec::new();
287    /// # for w in 0..4 {
288    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
289    /// # }
290    /// # results.sort();
291    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
292    /// # }));
293    /// ```
294    pub fn demux_bincode(
295        self,
296        other: &Cluster<'a, L2>,
297    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
298    where
299        T: Serialize + DeserializeOwned,
300    {
301        self.into_keyed().demux_bincode(other)
302    }
303}
304
305impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
306    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
307    /// [`bincode`] to serialize/deserialize messages.
308    ///
309    /// This provides load balancing by evenly distributing work across cluster members. The
310    /// distribution is deterministic based on element order - the first element goes to member 0,
311    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
312    ///
313    /// # Non-Determinism
314    /// The set of cluster members may asynchronously change over time. Each element is distributed
315    /// based on the current cluster membership _at that point in time_. Depending on when cluster
316    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
317    /// membership is stable, the order of members in the round-robin pattern may change across runs.
318    ///
319    /// # Ordering Requirements
320    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
321    /// order of messages and retries affects the round-robin pattern.
322    ///
323    /// # Example
324    /// ```rust
325    /// # use hydro_lang::prelude::*;
326    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
327    /// # use futures::StreamExt;
328    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
329    /// let p1 = flow.process::<()>();
330    /// let workers: Cluster<()> = flow.cluster::<()>();
331    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
332    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
333    /// on_worker.send_bincode(&p2)
334    /// # .first().values() // we use first to assert that each member gets one element
335    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
336    /// // - MemberId::<()>(?): [1]
337    /// // - MemberId::<()>(?): [2]
338    /// // - MemberId::<()>(?): [3]
339    /// // - MemberId::<()>(?): [4]
340    /// # }, |mut stream| async move {
341    /// # let mut results = Vec::new();
342    /// # for w in 0..4 {
343    /// #     results.push(stream.next().await.unwrap());
344    /// # }
345    /// # results.sort();
346    /// # assert_eq!(results, vec![1, 2, 3, 4]);
347    /// # }));
348    /// ```
349    pub fn round_robin_bincode<L2: 'a>(
350        self,
351        other: &Cluster<'a, L2>,
352        nondet_membership: NonDet,
353    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
354    where
355        T: Serialize + DeserializeOwned,
356    {
357        let ids = track_membership(self.location.source_cluster_members(other));
358        let join_tick = self.location.tick();
359        let current_members = ids
360            .snapshot(&join_tick, nondet_membership)
361            .keys()
362            .assume_ordering(nondet_membership)
363            .collect_vec();
364
365        self.enumerate()
366            .batch(&join_tick, nondet_membership)
367            .cross_singleton(current_members)
368            .map(q!(|(data, members)| (
369                members[data.0 % members.len()],
370                data.1
371            )))
372            .all_ticks()
373            .demux_bincode(other)
374    }
375}
376
377impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
378    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
379    /// using [`bincode`] to serialize/deserialize messages.
380    ///
381    /// Each cluster member sends its local stream elements, and they are collected at the destination
382    /// as a [`KeyedStream`] where keys identify the source cluster member.
383    ///
384    /// # Example
385    /// ```rust
386    /// # use hydro_lang::prelude::*;
387    /// # use futures::StreamExt;
388    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
389    /// let workers: Cluster<()> = flow.cluster::<()>();
390    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
391    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
392    /// # all_received.entries()
393    /// # }, |mut stream| async move {
394    /// // if there are 4 members in the cluster, we should receive 4 elements
395    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
396    /// # let mut results = Vec::new();
397    /// # for w in 0..4 {
398    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
399    /// # }
400    /// # results.sort();
401    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
402    /// # }));
403    /// ```
404    ///
405    /// If you don't need to know the source for each element, you can use `.values()`
406    /// to get just the data:
407    /// ```rust
408    /// # use hydro_lang::prelude::*;
409    /// # use hydro_lang::live_collections::stream::NoOrder;
410    /// # use futures::StreamExt;
411    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
412    /// # let workers: Cluster<()> = flow.cluster::<()>();
413    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
414    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
415    /// # values
416    /// # }, |mut stream| async move {
417    /// # let mut results = Vec::new();
418    /// # for w in 0..4 {
419    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
420    /// # }
421    /// # results.sort();
422    /// // if there are 4 members in the cluster, we should receive 4 elements
423    /// // 1, 1, 1, 1
424    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
425    /// # }));
426    /// ```
427    pub fn send_bincode<L2>(
428        self,
429        other: &Process<'a, L2>,
430    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
431    where
432        T: Serialize + DeserializeOwned,
433    {
434        let serialize_pipeline = Some(serialize_bincode::<T>(false));
435
436        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
437
438        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
439            other.clone(),
440            HydroNode::Network {
441                serialize_fn: serialize_pipeline.map(|e| e.into()),
442                instantiate_fn: DebugInstantiate::Building,
443                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
444                input: Box::new(self.ir_node.into_inner()),
445                metadata: other.new_node_metadata(Stream::<
446                    (MemberId<L>, T),
447                    Process<'a, L2>,
448                    Unbounded,
449                    O,
450                    R,
451                >::collection_kind()),
452            },
453        );
454
455        raw_stream.into_keyed()
456    }
457
458    /// Broadcasts elements of this stream at each source member to all members of a destination
459    /// cluster, using [`bincode`] to serialize/deserialize messages.
460    ///
461    /// Each source member sends each of its stream elements to **every** member of the cluster
462    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
463    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
464    /// **only data elements** and sends each element to all cluster members.
465    ///
466    /// # Non-Determinism
467    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
468    /// to the current cluster members known _at that point in time_ at the source member. Depending
469    /// on when each source member is notified of membership changes, it will broadcast each element
470    /// to different members.
471    ///
472    /// # Example
473    /// ```rust
474    /// # use hydro_lang::prelude::*;
475    /// # use hydro_lang::location::MemberId;
476    /// # use futures::StreamExt;
477    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
478    /// # type Source = ();
479    /// # type Destination = ();
480    /// let source: Cluster<Source> = flow.cluster::<Source>();
481    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
482    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
483    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
484    /// # on_destination.entries().send_bincode(&p2).entries()
485    /// // if there are 4 members in the desination, each receives one element from each source member
486    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
487    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
488    /// // - ...
489    /// # }, |mut stream| async move {
490    /// # let mut results = Vec::new();
491    /// # for w in 0..16 {
492    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
493    /// # }
494    /// # results.sort();
495    /// # assert_eq!(results, vec![
496    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
497    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
498    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
499    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
500    /// # ]);
501    /// # }));
502    /// ```
503    pub fn broadcast_bincode<L2: 'a>(
504        self,
505        other: &Cluster<'a, L2>,
506        nondet_membership: NonDet,
507    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
508    where
509        T: Clone + Serialize + DeserializeOwned,
510    {
511        let ids = track_membership(self.location.source_cluster_members(other));
512        let join_tick = self.location.tick();
513        let current_members = ids.snapshot(&join_tick, nondet_membership);
514
515        self.batch(&join_tick, nondet_membership)
516            .repeat_with_keys(current_members)
517            .all_ticks()
518            .demux_bincode(other)
519    }
520}
521
522impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
523    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
524{
525    /// Sends elements of this stream at each source member to specific members of a destination
526    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
527    ///
528    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
529    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
530    /// this API allows precise targeting of specific cluster members rather than broadcasting to
531    /// all members.
532    ///
533    /// Each cluster member sends its local stream elements, and they are collected at each
534    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
535    ///
536    /// # Example
537    /// ```rust
538    /// # use hydro_lang::prelude::*;
539    /// # use futures::StreamExt;
540    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
541    /// # type Source = ();
542    /// # type Destination = ();
543    /// let source: Cluster<Source> = flow.cluster::<Source>();
544    /// let to_send: Stream<_, Cluster<_>, _> = source
545    ///     .source_iter(q!(vec![0, 1, 2, 3]))
546    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
547    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
548    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
549    /// # all_received.entries().send_bincode(&p2).entries()
550    /// # }, |mut stream| async move {
551    /// // if there are 4 members in the destination cluster, each receives one message from each source member
552    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
553    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
554    /// // - ...
555    /// # let mut results = Vec::new();
556    /// # for w in 0..16 {
557    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
558    /// # }
559    /// # results.sort();
560    /// # assert_eq!(results, vec![
561    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
562    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
563    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
564    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
565    /// # ]);
566    /// # }));
567    /// ```
568    pub fn demux_bincode(
569        self,
570        other: &Cluster<'a, L2>,
571    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
572    where
573        T: Serialize + DeserializeOwned,
574    {
575        self.into_keyed().demux_bincode(other)
576    }
577}