hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
27    membership.fold(
28        q!(|| false),
29        q!(|present, event| {
30            match event {
31                MembershipEvent::Joined => *present = true,
32                MembershipEvent::Left => *present = false,
33            }
34        }),
35    )
36}
37
38fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
39    let root = get_this_crate();
40
41    if is_demux {
42        parse_quote! {
43            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
44                |(id, data)| {
45                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
46                }
47            )
48        }
49    } else {
50        parse_quote! {
51            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
52                |data| {
53                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
54                }
55            )
56        }
57    }
58}
59
60pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
61    serialize_bincode_with_type(is_demux, &quote_type::<T>())
62}
63
64fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
65    let root = get_this_crate();
66
67    if let Some(c_type) = tagged {
68        parse_quote! {
69            |res| {
70                let (id, b) = res.unwrap();
71                (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
72            }
73        }
74    } else {
75        parse_quote! {
76            |res| {
77                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
78            }
79        }
80    }
81}
82
83pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
84    deserialize_bincode_with_type(tagged, &quote_type::<T>())
85}
86
87impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
88    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
89    /// using [`bincode`] to serialize/deserialize messages.
90    ///
91    /// The returned stream captures the elements received at the destination, where values will
92    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
93    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
94    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
95    /// dropped no further messages will be sent.
96    ///
97    /// # Example
98    /// ```rust
99    /// # use hydro_lang::prelude::*;
100    /// # use futures::StreamExt;
101    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
102    /// let p1 = flow.process::<()>();
103    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
104    /// let p2 = flow.process::<()>();
105    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
106    /// // 1, 2, 3
107    /// # on_p2.send_bincode(&p_out)
108    /// # }, |mut stream| async move {
109    /// # for w in 1..=3 {
110    /// #     assert_eq!(stream.next().await, Some(w));
111    /// # }
112    /// # }));
113    /// ```
114    pub fn send_bincode<L2>(
115        self,
116        other: &Process<'a, L2>,
117    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
118    where
119        T: Serialize + DeserializeOwned,
120    {
121        let serialize_pipeline = Some(serialize_bincode::<T>(false));
122
123        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
124
125        Stream::new(
126            other.clone(),
127            HydroNode::Network {
128                serialize_fn: serialize_pipeline.map(|e| e.into()),
129                instantiate_fn: DebugInstantiate::Building,
130                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
131                input: Box::new(self.ir_node.into_inner()),
132                metadata: other.new_node_metadata(
133                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
134                ),
135            },
136        )
137    }
138
139    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
140    /// using [`bincode`] to serialize/deserialize messages.
141    ///
142    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
143    /// membership information. This is a common pattern in distributed systems for broadcasting data to
144    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
145    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
146    /// each element to all cluster members.
147    ///
148    /// # Non-Determinism
149    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
150    /// to the current cluster members _at that point in time_. Depending on when we are notified of
151    /// membership changes, we will broadcast each element to different members.
152    ///
153    /// # Example
154    /// ```rust
155    /// # use hydro_lang::prelude::*;
156    /// # use futures::StreamExt;
157    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
158    /// let p1 = flow.process::<()>();
159    /// let workers: Cluster<()> = flow.cluster::<()>();
160    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
161    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
162    /// # on_worker.send_bincode(&p2).entries()
163    /// // if there are 4 members in the cluster, each receives one element
164    /// // - MemberId::<()>(0): [123]
165    /// // - MemberId::<()>(1): [123]
166    /// // - MemberId::<()>(2): [123]
167    /// // - MemberId::<()>(3): [123]
168    /// # }, |mut stream| async move {
169    /// # let mut results = Vec::new();
170    /// # for w in 0..4 {
171    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
172    /// # }
173    /// # results.sort();
174    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
175    /// # }));
176    /// ```
177    pub fn broadcast_bincode<L2: 'a>(
178        self,
179        other: &Cluster<'a, L2>,
180        nondet_membership: NonDet,
181    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
182    where
183        T: Clone + Serialize + DeserializeOwned,
184    {
185        let ids = track_membership(self.location.source_cluster_members(other));
186        let join_tick = self.location.tick();
187        let current_members = ids
188            .snapshot(&join_tick, nondet_membership)
189            .filter(q!(|b| *b));
190
191        self.batch(&join_tick, nondet_membership)
192            .repeat_with_keys(current_members)
193            .all_ticks()
194            .demux_bincode(other)
195    }
196
197    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
198    /// serialization. The external process can receive these elements by establishing a TCP
199    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
200    ///
201    /// # Example
202    /// ```rust
203    /// # use hydro_lang::prelude::*;
204    /// # use futures::StreamExt;
205    /// # tokio_test::block_on(async move {
206    /// let flow = FlowBuilder::new();
207    /// let process = flow.process::<()>();
208    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
209    /// let external = flow.external::<()>();
210    /// let external_handle = numbers.send_bincode_external(&external);
211    ///
212    /// let mut deployment = hydro_deploy::Deployment::new();
213    /// let nodes = flow
214    ///     .with_process(&process, deployment.Localhost())
215    ///     .with_external(&external, deployment.Localhost())
216    ///     .deploy(&mut deployment);
217    ///
218    /// deployment.deploy().await.unwrap();
219    /// // establish the TCP connection
220    /// let mut external_recv_stream = nodes.connect(external_handle).await;
221    /// deployment.start().await.unwrap();
222    ///
223    /// for w in 1..=3 {
224    ///     assert_eq!(external_recv_stream.next().await, Some(w));
225    /// }
226    /// # });
227    /// ```
228    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
229    where
230        T: Serialize + DeserializeOwned,
231    {
232        let serialize_pipeline = Some(serialize_bincode::<T>(false));
233
234        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
235
236        let external_key = flow_state_borrow.next_external_out;
237        flow_state_borrow.next_external_out += 1;
238
239        flow_state_borrow.push_root(HydroRoot::SendExternal {
240            to_external_id: other.id,
241            to_key: external_key,
242            to_many: false,
243            unpaired: true,
244            serialize_fn: serialize_pipeline.map(|e| e.into()),
245            instantiate_fn: DebugInstantiate::Building,
246            input: Box::new(self.ir_node.into_inner()),
247            op_metadata: HydroIrOpMetadata::new(),
248        });
249
250        ExternalBincodeStream {
251            process_id: other.id,
252            port_id: external_key,
253            _phantom: PhantomData,
254        }
255    }
256}
257
258impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
259    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
260{
261    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
262    /// using [`bincode`] to serialize/deserialize messages.
263    ///
264    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
265    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
266    /// this API allows precise targeting of specific cluster members rather than broadcasting to
267    /// all members.
268    ///
269    /// # Example
270    /// ```rust
271    /// # use hydro_lang::prelude::*;
272    /// # use futures::StreamExt;
273    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
274    /// let p1 = flow.process::<()>();
275    /// let workers: Cluster<()> = flow.cluster::<()>();
276    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
277    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
278    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
279    ///     .demux_bincode(&workers);
280    /// # on_worker.send_bincode(&p2).entries()
281    /// // if there are 4 members in the cluster, each receives one element
282    /// // - MemberId::<()>(0): [0]
283    /// // - MemberId::<()>(1): [1]
284    /// // - MemberId::<()>(2): [2]
285    /// // - MemberId::<()>(3): [3]
286    /// # }, |mut stream| async move {
287    /// # let mut results = Vec::new();
288    /// # for w in 0..4 {
289    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
290    /// # }
291    /// # results.sort();
292    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
293    /// # }));
294    /// ```
295    pub fn demux_bincode(
296        self,
297        other: &Cluster<'a, L2>,
298    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
299    where
300        T: Serialize + DeserializeOwned,
301    {
302        self.into_keyed().demux_bincode(other)
303    }
304}
305
306impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
307    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
308    /// [`bincode`] to serialize/deserialize messages.
309    ///
310    /// This provides load balancing by evenly distributing work across cluster members. The
311    /// distribution is deterministic based on element order - the first element goes to member 0,
312    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
313    ///
314    /// # Non-Determinism
315    /// The set of cluster members may asynchronously change over time. Each element is distributed
316    /// based on the current cluster membership _at that point in time_. Depending on when cluster
317    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
318    /// membership is stable, the order of members in the round-robin pattern may change across runs.
319    ///
320    /// # Ordering Requirements
321    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
322    /// order of messages and retries affects the round-robin pattern.
323    ///
324    /// # Example
325    /// ```rust
326    /// # use hydro_lang::prelude::*;
327    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
328    /// # use futures::StreamExt;
329    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
330    /// let p1 = flow.process::<()>();
331    /// let workers: Cluster<()> = flow.cluster::<()>();
332    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
333    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
334    /// on_worker.send_bincode(&p2)
335    /// # .first().values() // we use first to assert that each member gets one element
336    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
337    /// // - MemberId::<()>(?): [1]
338    /// // - MemberId::<()>(?): [2]
339    /// // - MemberId::<()>(?): [3]
340    /// // - MemberId::<()>(?): [4]
341    /// # }, |mut stream| async move {
342    /// # let mut results = Vec::new();
343    /// # for w in 0..4 {
344    /// #     results.push(stream.next().await.unwrap());
345    /// # }
346    /// # results.sort();
347    /// # assert_eq!(results, vec![1, 2, 3, 4]);
348    /// # }));
349    /// ```
350    pub fn round_robin_bincode<L2: 'a>(
351        self,
352        other: &Cluster<'a, L2>,
353        nondet_membership: NonDet,
354    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
355    where
356        T: Serialize + DeserializeOwned,
357    {
358        let ids = track_membership(self.location.source_cluster_members(other));
359        let join_tick = self.location.tick();
360        let current_members = ids
361            .snapshot(&join_tick, nondet_membership)
362            .filter(q!(|b| *b))
363            .keys()
364            .assume_ordering(nondet_membership)
365            .collect_vec();
366
367        self.enumerate()
368            .batch(&join_tick, nondet_membership)
369            .cross_singleton(current_members)
370            .map(q!(|(data, members)| (
371                members[data.0 % members.len()],
372                data.1
373            )))
374            .all_ticks()
375            .demux_bincode(other)
376    }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
380    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
381    /// using [`bincode`] to serialize/deserialize messages.
382    ///
383    /// Each cluster member sends its local stream elements, and they are collected at the destination
384    /// as a [`KeyedStream`] where keys identify the source cluster member.
385    ///
386    /// # Example
387    /// ```rust
388    /// # use hydro_lang::prelude::*;
389    /// # use futures::StreamExt;
390    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
391    /// let workers: Cluster<()> = flow.cluster::<()>();
392    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
393    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
394    /// # all_received.entries()
395    /// # }, |mut stream| async move {
396    /// // if there are 4 members in the cluster, we should receive 4 elements
397    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
398    /// # let mut results = Vec::new();
399    /// # for w in 0..4 {
400    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
401    /// # }
402    /// # results.sort();
403    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
404    /// # }));
405    /// ```
406    ///
407    /// If you don't need to know the source for each element, you can use `.values()`
408    /// to get just the data:
409    /// ```rust
410    /// # use hydro_lang::prelude::*;
411    /// # use hydro_lang::live_collections::stream::NoOrder;
412    /// # use futures::StreamExt;
413    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
414    /// # let workers: Cluster<()> = flow.cluster::<()>();
415    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
416    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
417    /// # values
418    /// # }, |mut stream| async move {
419    /// # let mut results = Vec::new();
420    /// # for w in 0..4 {
421    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
422    /// # }
423    /// # results.sort();
424    /// // if there are 4 members in the cluster, we should receive 4 elements
425    /// // 1, 1, 1, 1
426    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
427    /// # }));
428    /// ```
429    pub fn send_bincode<L2>(
430        self,
431        other: &Process<'a, L2>,
432    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
433    where
434        T: Serialize + DeserializeOwned,
435    {
436        let serialize_pipeline = Some(serialize_bincode::<T>(false));
437
438        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
439
440        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
441            other.clone(),
442            HydroNode::Network {
443                serialize_fn: serialize_pipeline.map(|e| e.into()),
444                instantiate_fn: DebugInstantiate::Building,
445                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
446                input: Box::new(self.ir_node.into_inner()),
447                metadata: other.new_node_metadata(Stream::<
448                    (MemberId<L>, T),
449                    Process<'a, L2>,
450                    Unbounded,
451                    O,
452                    R,
453                >::collection_kind()),
454            },
455        );
456
457        raw_stream.into_keyed()
458    }
459
460    /// Broadcasts elements of this stream at each source member to all members of a destination
461    /// cluster, using [`bincode`] to serialize/deserialize messages.
462    ///
463    /// Each source member sends each of its stream elements to **every** member of the cluster
464    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
465    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
466    /// **only data elements** and sends each element to all cluster members.
467    ///
468    /// # Non-Determinism
469    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
470    /// to the current cluster members known _at that point in time_ at the source member. Depending
471    /// on when each source member is notified of membership changes, it will broadcast each element
472    /// to different members.
473    ///
474    /// # Example
475    /// ```rust
476    /// # use hydro_lang::prelude::*;
477    /// # use hydro_lang::location::MemberId;
478    /// # use futures::StreamExt;
479    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
480    /// # type Source = ();
481    /// # type Destination = ();
482    /// let source: Cluster<Source> = flow.cluster::<Source>();
483    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
484    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
485    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
486    /// # on_destination.entries().send_bincode(&p2).entries()
487    /// // if there are 4 members in the desination, each receives one element from each source member
488    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
489    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
490    /// // - ...
491    /// # }, |mut stream| async move {
492    /// # let mut results = Vec::new();
493    /// # for w in 0..16 {
494    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
495    /// # }
496    /// # results.sort();
497    /// # assert_eq!(results, vec![
498    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
499    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
500    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
501    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
502    /// # ]);
503    /// # }));
504    /// ```
505    pub fn broadcast_bincode<L2: 'a>(
506        self,
507        other: &Cluster<'a, L2>,
508        nondet_membership: NonDet,
509    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
510    where
511        T: Clone + Serialize + DeserializeOwned,
512    {
513        let ids = track_membership(self.location.source_cluster_members(other));
514        let join_tick = self.location.tick();
515        let current_members = ids
516            .snapshot(&join_tick, nondet_membership)
517            .filter(q!(|b| *b));
518
519        self.batch(&join_tick, nondet_membership)
520            .repeat_with_keys(current_members)
521            .all_ticks()
522            .demux_bincode(other)
523    }
524}
525
526impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
527    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
528{
529    /// Sends elements of this stream at each source member to specific members of a destination
530    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
531    ///
532    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
533    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
534    /// this API allows precise targeting of specific cluster members rather than broadcasting to
535    /// all members.
536    ///
537    /// Each cluster member sends its local stream elements, and they are collected at each
538    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
539    ///
540    /// # Example
541    /// ```rust
542    /// # use hydro_lang::prelude::*;
543    /// # use futures::StreamExt;
544    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
545    /// # type Source = ();
546    /// # type Destination = ();
547    /// let source: Cluster<Source> = flow.cluster::<Source>();
548    /// let to_send: Stream<_, Cluster<_>, _> = source
549    ///     .source_iter(q!(vec![0, 1, 2, 3]))
550    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
551    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
552    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
553    /// # all_received.entries().send_bincode(&p2).entries()
554    /// # }, |mut stream| async move {
555    /// // if there are 4 members in the destination cluster, each receives one message from each source member
556    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
557    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
558    /// // - ...
559    /// # let mut results = Vec::new();
560    /// # for w in 0..16 {
561    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
562    /// # }
563    /// # results.sort();
564    /// # assert_eq!(results, vec![
565    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
566    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
567    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
568    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
569    /// # ]);
570    /// # }));
571    /// ```
572    pub fn demux_bincode(
573        self,
574        other: &Cluster<'a, L2>,
575    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
576    where
577        T: Serialize + DeserializeOwned,
578    {
579        self.into_keyed().demux_bincode(other)
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use stageleft::q;
586
587    use crate::location::{Location, MemberId};
588    use crate::nondet::nondet;
589    use crate::prelude::FlowBuilder;
590
591    #[test]
592    fn sim_send_bincode_o2o() {
593        let flow = FlowBuilder::new();
594        let external = flow.external::<()>();
595        let node = flow.process::<()>();
596        let node2 = flow.process::<()>();
597
598        let (port, input) = node.source_external_bincode(&external);
599
600        let out_port = input
601            .send_bincode(&node2)
602            .batch(&node2.tick(), nondet!(/** test */))
603            .count()
604            .all_ticks()
605            .send_bincode_external(&external);
606
607        let instances = flow.sim().exhaustive(async |mut compiled| {
608            let in_send = compiled.connect(&port);
609            let out_recv = compiled.connect(&out_port);
610            compiled.launch();
611
612            in_send.send(());
613            in_send.send(());
614            in_send.send(());
615
616            let received = out_recv.collect::<Vec<_>>().await;
617            assert!(received.into_iter().sum::<usize>() == 3);
618        });
619
620        assert_eq!(instances, 4); // 2^{3 - 1}
621    }
622
623    #[test]
624    fn sim_send_bincode_m2o() {
625        let flow = FlowBuilder::new();
626        let external = flow.external::<()>();
627        let cluster = flow.cluster::<()>();
628        let node = flow.process::<()>();
629
630        let input = cluster.source_iter(q!(vec![1]));
631
632        let out_port = input
633            .send_bincode(&node)
634            .entries()
635            .batch(&node.tick(), nondet!(/** test */))
636            .all_ticks()
637            .send_bincode_external(&external);
638
639        let instances =
640            flow.sim()
641                .with_cluster_size(&cluster, 4)
642                .exhaustive(async |mut compiled| {
643                    let out_recv = compiled.connect(&out_port);
644                    compiled.launch();
645
646                    out_recv
647                        .assert_yields_only_unordered(vec![
648                            (MemberId::from_raw(0), 1),
649                            (MemberId::from_raw(1), 1),
650                            (MemberId::from_raw(2), 1),
651                            (MemberId::from_raw(3), 1),
652                        ])
653                        .await
654                });
655
656        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
657    }
658
659    #[test]
660    fn sim_send_bincode_multiple_m2o() {
661        let flow = FlowBuilder::new();
662        let external = flow.external::<()>();
663        let cluster1 = flow.cluster::<()>();
664        let cluster2 = flow.cluster::<()>();
665        let node = flow.process::<()>();
666
667        let out_port_1 = cluster1
668            .source_iter(q!(vec![1]))
669            .send_bincode(&node)
670            .entries()
671            .send_bincode_external(&external);
672
673        let out_port_2 = cluster2
674            .source_iter(q!(vec![2]))
675            .send_bincode(&node)
676            .entries()
677            .send_bincode_external(&external);
678
679        let instances = flow
680            .sim()
681            .with_cluster_size(&cluster1, 3)
682            .with_cluster_size(&cluster2, 4)
683            .exhaustive(async |mut compiled| {
684                let out_recv_1 = compiled.connect(&out_port_1);
685                let out_recv_2 = compiled.connect(&out_port_2);
686                compiled.launch();
687
688                out_recv_1
689                    .assert_yields_only_unordered(vec![
690                        (MemberId::from_raw(0), 1),
691                        (MemberId::from_raw(1), 1),
692                        (MemberId::from_raw(2), 1),
693                    ])
694                    .await;
695
696                out_recv_2
697                    .assert_yields_only_unordered(vec![
698                        (MemberId::from_raw(0), 2),
699                        (MemberId::from_raw(1), 2),
700                        (MemberId::from_raw(2), 2),
701                        (MemberId::from_raw(3), 2),
702                    ])
703                    .await;
704            });
705
706        assert_eq!(instances, 1);
707    }
708
709    #[test]
710    fn sim_send_bincode_o2m() {
711        let flow = FlowBuilder::new();
712        let external = flow.external::<()>();
713        let cluster = flow.cluster::<()>();
714        let node = flow.process::<()>();
715
716        let input = node.source_iter(q!(vec![
717            (MemberId::from_raw(0), 123),
718            (MemberId::from_raw(1), 456),
719        ]));
720
721        let out_port = input
722            .demux_bincode(&cluster)
723            .map(q!(|x| x + 1))
724            .send_bincode(&node)
725            .entries()
726            .send_bincode_external(&external);
727
728        flow.sim()
729            .with_cluster_size(&cluster, 4)
730            .exhaustive(async |mut compiled| {
731                let out_recv = compiled.connect(&out_port);
732                compiled.launch();
733
734                out_recv
735                    .assert_yields_only_unordered(vec![
736                        (MemberId::from_raw(0), 124),
737                        (MemberId::from_raw(1), 457),
738                    ])
739                    .await
740            });
741    }
742
743    #[test]
744    fn sim_broadcast_bincode_o2m() {
745        let flow = FlowBuilder::new();
746        let external = flow.external::<()>();
747        let cluster = flow.cluster::<()>();
748        let node = flow.process::<()>();
749
750        let input = node.source_iter(q!(vec![123, 456]));
751
752        let out_port = input
753            .broadcast_bincode(&cluster, nondet!(/** test */))
754            .map(q!(|x| x + 1))
755            .send_bincode(&node)
756            .entries()
757            .send_bincode_external(&external);
758
759        let mut c_1_produced = false;
760        let mut c_2_produced = false;
761
762        flow.sim()
763            .with_cluster_size(&cluster, 2)
764            .exhaustive(async |mut compiled| {
765                let out_recv = compiled.connect(&out_port);
766                compiled.launch();
767
768                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
769
770                // check that order is preserved
771                if all_out.contains(&(MemberId::from_raw(0), 124)) {
772                    assert!(all_out.contains(&(MemberId::from_raw(0), 457)));
773                    c_1_produced = true;
774                }
775
776                if all_out.contains(&(MemberId::from_raw(1), 124)) {
777                    assert!(all_out.contains(&(MemberId::from_raw(1), 457)));
778                    c_2_produced = true;
779                }
780            });
781
782        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
783    }
784
785    #[test]
786    fn sim_send_bincode_m2m() {
787        let flow = FlowBuilder::new();
788        let external = flow.external::<()>();
789        let cluster = flow.cluster::<()>();
790        let node = flow.process::<()>();
791
792        let input = node.source_iter(q!(vec![
793            (MemberId::from_raw(0), 123),
794            (MemberId::from_raw(1), 456),
795        ]));
796
797        let out_port = input
798            .demux_bincode(&cluster)
799            .map(q!(|x| x + 1))
800            .flat_map_ordered(q!(|x| vec![
801                (MemberId::from_raw(0), x),
802                (MemberId::from_raw(1), x),
803            ]))
804            .demux_bincode(&cluster)
805            .entries()
806            .send_bincode(&node)
807            .entries()
808            .send_bincode_external(&external);
809
810        flow.sim()
811            .with_cluster_size(&cluster, 4)
812            .exhaustive(async |mut compiled| {
813                let out_recv = compiled.connect(&out_port);
814                compiled.launch();
815
816                out_recv
817                    .assert_yields_only_unordered(vec![
818                        (MemberId::from_raw(0), (MemberId::from_raw(0), 124)),
819                        (MemberId::from_raw(0), (MemberId::from_raw(1), 457)),
820                        (MemberId::from_raw(1), (MemberId::from_raw(0), 124)),
821                        (MemberId::from_raw(1), (MemberId::from_raw(1), 457)),
822                    ])
823                    .await
824            });
825    }
826}