hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21#[cfg(feature = "sim")]
22use crate::sim::SimReceiver;
23use crate::staging_util::get_this_crate;
24
25// same as the one in `hydro_std`, but internal use only
26fn track_membership<'a, C, L: Location<'a> + NoTick>(
27    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
28) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
29    membership.fold(
30        q!(|| false),
31        q!(|present, event| {
32            match event {
33                MembershipEvent::Joined => *present = true,
34                MembershipEvent::Left => *present = false,
35            }
36        }),
37    )
38}
39
40fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
41    let root = get_this_crate();
42
43    if is_demux {
44        parse_quote! {
45            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
46                |(id, data)| {
47                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
48                }
49            )
50        }
51    } else {
52        parse_quote! {
53            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
54                |data| {
55                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
56                }
57            )
58        }
59    }
60}
61
62pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
63    serialize_bincode_with_type(is_demux, &quote_type::<T>())
64}
65
66fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
67    let root = get_this_crate();
68
69    if let Some(c_type) = tagged {
70        parse_quote! {
71            |res| {
72                let (id, b) = res.unwrap();
73                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
74            }
75        }
76    } else {
77        parse_quote! {
78            |res| {
79                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
80            }
81        }
82    }
83}
84
85pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
86    deserialize_bincode_with_type(tagged, &quote_type::<T>())
87}
88
89impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
90    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
91    /// using [`bincode`] to serialize/deserialize messages.
92    ///
93    /// The returned stream captures the elements received at the destination, where values will
94    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
95    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
96    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
97    /// dropped no further messages will be sent.
98    ///
99    /// # Example
100    /// ```rust
101    /// # #[cfg(feature = "deploy")] {
102    /// # use hydro_lang::prelude::*;
103    /// # use futures::StreamExt;
104    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
105    /// let p1 = flow.process::<()>();
106    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
107    /// let p2 = flow.process::<()>();
108    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
109    /// // 1, 2, 3
110    /// # on_p2.send_bincode(&p_out)
111    /// # }, |mut stream| async move {
112    /// # for w in 1..=3 {
113    /// #     assert_eq!(stream.next().await, Some(w));
114    /// # }
115    /// # }));
116    /// # }
117    /// ```
118    pub fn send_bincode<L2>(
119        self,
120        other: &Process<'a, L2>,
121    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
122    where
123        T: Serialize + DeserializeOwned,
124    {
125        let serialize_pipeline = Some(serialize_bincode::<T>(false));
126
127        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
128
129        Stream::new(
130            other.clone(),
131            HydroNode::Network {
132                serialize_fn: serialize_pipeline.map(|e| e.into()),
133                instantiate_fn: DebugInstantiate::Building,
134                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
135                input: Box::new(self.ir_node.into_inner()),
136                metadata: other.new_node_metadata(
137                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
138                ),
139            },
140        )
141    }
142
143    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
144    /// using [`bincode`] to serialize/deserialize messages.
145    ///
146    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
147    /// membership information. This is a common pattern in distributed systems for broadcasting data to
148    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
149    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
150    /// each element to all cluster members.
151    ///
152    /// # Non-Determinism
153    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
154    /// to the current cluster members _at that point in time_. Depending on when we are notified of
155    /// membership changes, we will broadcast each element to different members.
156    ///
157    /// # Example
158    /// ```rust
159    /// # #[cfg(feature = "deploy")] {
160    /// # use hydro_lang::prelude::*;
161    /// # use futures::StreamExt;
162    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
163    /// let p1 = flow.process::<()>();
164    /// let workers: Cluster<()> = flow.cluster::<()>();
165    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
166    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
167    /// # on_worker.send_bincode(&p2).entries()
168    /// // if there are 4 members in the cluster, each receives one element
169    /// // - MemberId::<()>(0): [123]
170    /// // - MemberId::<()>(1): [123]
171    /// // - MemberId::<()>(2): [123]
172    /// // - MemberId::<()>(3): [123]
173    /// # }, |mut stream| async move {
174    /// # let mut results = Vec::new();
175    /// # for w in 0..4 {
176    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
177    /// # }
178    /// # results.sort();
179    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
180    /// # }));
181    /// # }
182    /// ```
183    pub fn broadcast_bincode<L2: 'a>(
184        self,
185        other: &Cluster<'a, L2>,
186        nondet_membership: NonDet,
187    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
188    where
189        T: Clone + Serialize + DeserializeOwned,
190    {
191        let ids = track_membership(self.location.source_cluster_members(other));
192        let join_tick = self.location.tick();
193        let current_members = ids
194            .snapshot(&join_tick, nondet_membership)
195            .filter(q!(|b| *b));
196
197        self.batch(&join_tick, nondet_membership)
198            .repeat_with_keys(current_members)
199            .all_ticks()
200            .demux_bincode(other)
201    }
202
203    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
204    /// serialization. The external process can receive these elements by establishing a TCP
205    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
206    ///
207    /// # Example
208    /// ```rust
209    /// # #[cfg(feature = "deploy")] {
210    /// # use hydro_lang::prelude::*;
211    /// # use futures::StreamExt;
212    /// # tokio_test::block_on(async move {
213    /// let flow = FlowBuilder::new();
214    /// let process = flow.process::<()>();
215    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
216    /// let external = flow.external::<()>();
217    /// let external_handle = numbers.send_bincode_external(&external);
218    ///
219    /// let mut deployment = hydro_deploy::Deployment::new();
220    /// let nodes = flow
221    ///     .with_process(&process, deployment.Localhost())
222    ///     .with_external(&external, deployment.Localhost())
223    ///     .deploy(&mut deployment);
224    ///
225    /// deployment.deploy().await.unwrap();
226    /// // establish the TCP connection
227    /// let mut external_recv_stream = nodes.connect(external_handle).await;
228    /// deployment.start().await.unwrap();
229    ///
230    /// for w in 1..=3 {
231    ///     assert_eq!(external_recv_stream.next().await, Some(w));
232    /// }
233    /// # });
234    /// # }
235    /// ```
236    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
237    where
238        T: Serialize + DeserializeOwned,
239    {
240        let serialize_pipeline = Some(serialize_bincode::<T>(false));
241
242        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
243
244        let external_key = flow_state_borrow.next_external_out;
245        flow_state_borrow.next_external_out += 1;
246
247        flow_state_borrow.push_root(HydroRoot::SendExternal {
248            to_external_id: other.id,
249            to_key: external_key,
250            to_many: false,
251            unpaired: true,
252            serialize_fn: serialize_pipeline.map(|e| e.into()),
253            instantiate_fn: DebugInstantiate::Building,
254            input: Box::new(self.ir_node.into_inner()),
255            op_metadata: HydroIrOpMetadata::new(),
256        });
257
258        ExternalBincodeStream {
259            process_id: other.id,
260            port_id: external_key,
261            _phantom: PhantomData,
262        }
263    }
264
265    #[cfg(feature = "sim")]
266    /// Sets up a simulation output port for this stream, allowing test code to receive elements
267    /// sent to this stream during simulation.
268    pub fn sim_output(self) -> SimReceiver<T, O, R>
269    where
270        T: Serialize + DeserializeOwned,
271    {
272        let external_location: External<'a, ()> = External {
273            id: 0,
274            flow_state: self.location.flow_state().clone(),
275            _phantom: PhantomData,
276        };
277
278        let external = self.send_bincode_external(&external_location);
279
280        SimReceiver(external.port_id, PhantomData)
281    }
282}
283
284impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
285    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
286{
287    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
288    /// using [`bincode`] to serialize/deserialize messages.
289    ///
290    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
291    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
292    /// this API allows precise targeting of specific cluster members rather than broadcasting to
293    /// all members.
294    ///
295    /// # Example
296    /// ```rust
297    /// # #[cfg(feature = "deploy")] {
298    /// # use hydro_lang::prelude::*;
299    /// # use futures::StreamExt;
300    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
301    /// let p1 = flow.process::<()>();
302    /// let workers: Cluster<()> = flow.cluster::<()>();
303    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
304    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
305    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
306    ///     .demux_bincode(&workers);
307    /// # on_worker.send_bincode(&p2).entries()
308    /// // if there are 4 members in the cluster, each receives one element
309    /// // - MemberId::<()>(0): [0]
310    /// // - MemberId::<()>(1): [1]
311    /// // - MemberId::<()>(2): [2]
312    /// // - MemberId::<()>(3): [3]
313    /// # }, |mut stream| async move {
314    /// # let mut results = Vec::new();
315    /// # for w in 0..4 {
316    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
317    /// # }
318    /// # results.sort();
319    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
320    /// # }));
321    /// # }
322    /// ```
323    pub fn demux_bincode(
324        self,
325        other: &Cluster<'a, L2>,
326    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
327    where
328        T: Serialize + DeserializeOwned,
329    {
330        self.into_keyed().demux_bincode(other)
331    }
332}
333
334impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
335    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
336    /// [`bincode`] to serialize/deserialize messages.
337    ///
338    /// This provides load balancing by evenly distributing work across cluster members. The
339    /// distribution is deterministic based on element order - the first element goes to member 0,
340    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
341    ///
342    /// # Non-Determinism
343    /// The set of cluster members may asynchronously change over time. Each element is distributed
344    /// based on the current cluster membership _at that point in time_. Depending on when cluster
345    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
346    /// membership is stable, the order of members in the round-robin pattern may change across runs.
347    ///
348    /// # Ordering Requirements
349    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
350    /// order of messages and retries affects the round-robin pattern.
351    ///
352    /// # Example
353    /// ```rust
354    /// # #[cfg(feature = "deploy")] {
355    /// # use hydro_lang::prelude::*;
356    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
357    /// # use futures::StreamExt;
358    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
359    /// let p1 = flow.process::<()>();
360    /// let workers: Cluster<()> = flow.cluster::<()>();
361    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
362    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
363    /// on_worker.send_bincode(&p2)
364    /// # .first().values() // we use first to assert that each member gets one element
365    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
366    /// // - MemberId::<()>(?): [1]
367    /// // - MemberId::<()>(?): [2]
368    /// // - MemberId::<()>(?): [3]
369    /// // - MemberId::<()>(?): [4]
370    /// # }, |mut stream| async move {
371    /// # let mut results = Vec::new();
372    /// # for w in 0..4 {
373    /// #     results.push(stream.next().await.unwrap());
374    /// # }
375    /// # results.sort();
376    /// # assert_eq!(results, vec![1, 2, 3, 4]);
377    /// # }));
378    /// # }
379    /// ```
380    pub fn round_robin_bincode<L2: 'a>(
381        self,
382        other: &Cluster<'a, L2>,
383        nondet_membership: NonDet,
384    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
385    where
386        T: Serialize + DeserializeOwned,
387    {
388        let ids = track_membership(self.location.source_cluster_members(other));
389        let join_tick = self.location.tick();
390        let current_members = ids
391            .snapshot(&join_tick, nondet_membership)
392            .filter(q!(|b| *b))
393            .keys()
394            .assume_ordering(nondet_membership)
395            .collect_vec();
396
397        self.enumerate()
398            .batch(&join_tick, nondet_membership)
399            .cross_singleton(current_members)
400            .map(q!(|(data, members)| (
401                members[data.0 % members.len()].clone(),
402                data.1
403            )))
404            .all_ticks()
405            .demux_bincode(other)
406    }
407}
408
409impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
410    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
411    /// [`bincode`] to serialize/deserialize messages.
412    ///
413    /// This provides load balancing by evenly distributing work across cluster members. The
414    /// distribution is deterministic based on element order - the first element goes to member 0,
415    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
416    ///
417    /// # Non-Determinism
418    /// The set of cluster members may asynchronously change over time. Each element is distributed
419    /// based on the current cluster membership _at that point in time_. Depending on when cluster
420    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
421    /// membership is stable, the order of members in the round-robin pattern may change across runs.
422    ///
423    /// # Ordering Requirements
424    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
425    /// order of messages and retries affects the round-robin pattern.
426    ///
427    /// # Example
428    /// ```rust
429    /// # #[cfg(feature = "deploy")] {
430    /// # use hydro_lang::prelude::*;
431    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
432    /// # use hydro_lang::location::MemberId;
433    /// # use futures::StreamExt;
434    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
435    /// let p1 = flow.process::<()>();
436    /// let workers1: Cluster<()> = flow.cluster::<()>();
437    /// let workers2: Cluster<()> = flow.cluster::<()>();
438    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
439    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
440    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
441    /// on_worker2.send_bincode(&p2)
442    /// # .entries()
443    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
444    /// # }, |mut stream| async move {
445    /// # let mut results = Vec::new();
446    /// # let mut locations = std::collections::HashSet::new();
447    /// # for w in 0..=16 {
448    /// #     let (location, v) = stream.next().await.unwrap();
449    /// #     locations.insert(location);
450    /// #     results.push(v);
451    /// # }
452    /// # results.sort();
453    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
454    /// # assert_eq!(locations.len(), 16);
455    /// # }));
456    /// # }
457    /// ```
458    pub fn round_robin_bincode<L2: 'a>(
459        self,
460        other: &Cluster<'a, L2>,
461        nondet_membership: NonDet,
462    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
463    where
464        T: Serialize + DeserializeOwned,
465    {
466        let ids = track_membership(self.location.source_cluster_members(other));
467        let join_tick = self.location.tick();
468        let current_members = ids
469            .snapshot(&join_tick, nondet_membership)
470            .filter(q!(|b| *b))
471            .keys()
472            .assume_ordering(nondet_membership)
473            .collect_vec();
474
475        self.enumerate()
476            .batch(&join_tick, nondet_membership)
477            .cross_singleton(current_members)
478            .map(q!(|(data, members)| (
479                members[data.0 % members.len()].clone(),
480                data.1
481            )))
482            .all_ticks()
483            .demux_bincode(other)
484    }
485}
486
487impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
488    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
489    /// using [`bincode`] to serialize/deserialize messages.
490    ///
491    /// Each cluster member sends its local stream elements, and they are collected at the destination
492    /// as a [`KeyedStream`] where keys identify the source cluster member.
493    ///
494    /// # Example
495    /// ```rust
496    /// # #[cfg(feature = "deploy")] {
497    /// # use hydro_lang::prelude::*;
498    /// # use futures::StreamExt;
499    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
500    /// let workers: Cluster<()> = flow.cluster::<()>();
501    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
502    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
503    /// # all_received.entries()
504    /// # }, |mut stream| async move {
505    /// // if there are 4 members in the cluster, we should receive 4 elements
506    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
507    /// # let mut results = Vec::new();
508    /// # for w in 0..4 {
509    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
510    /// # }
511    /// # results.sort();
512    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
513    /// # }));
514    /// # }
515    /// ```
516    ///
517    /// If you don't need to know the source for each element, you can use `.values()`
518    /// to get just the data:
519    /// ```rust
520    /// # #[cfg(feature = "deploy")] {
521    /// # use hydro_lang::prelude::*;
522    /// # use hydro_lang::live_collections::stream::NoOrder;
523    /// # use futures::StreamExt;
524    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
525    /// # let workers: Cluster<()> = flow.cluster::<()>();
526    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
527    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
528    /// # values
529    /// # }, |mut stream| async move {
530    /// # let mut results = Vec::new();
531    /// # for w in 0..4 {
532    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
533    /// # }
534    /// # results.sort();
535    /// // if there are 4 members in the cluster, we should receive 4 elements
536    /// // 1, 1, 1, 1
537    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
538    /// # }));
539    /// # }
540    /// ```
541    pub fn send_bincode<L2>(
542        self,
543        other: &Process<'a, L2>,
544    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
545    where
546        T: Serialize + DeserializeOwned,
547    {
548        let serialize_pipeline = Some(serialize_bincode::<T>(false));
549
550        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
551
552        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
553            other.clone(),
554            HydroNode::Network {
555                serialize_fn: serialize_pipeline.map(|e| e.into()),
556                instantiate_fn: DebugInstantiate::Building,
557                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
558                input: Box::new(self.ir_node.into_inner()),
559                metadata: other.new_node_metadata(Stream::<
560                    (MemberId<L>, T),
561                    Process<'a, L2>,
562                    Unbounded,
563                    O,
564                    R,
565                >::collection_kind()),
566            },
567        );
568
569        raw_stream.into_keyed()
570    }
571
572    /// Broadcasts elements of this stream at each source member to all members of a destination
573    /// cluster, using [`bincode`] to serialize/deserialize messages.
574    ///
575    /// Each source member sends each of its stream elements to **every** member of the cluster
576    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
577    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
578    /// **only data elements** and sends each element to all cluster members.
579    ///
580    /// # Non-Determinism
581    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
582    /// to the current cluster members known _at that point in time_ at the source member. Depending
583    /// on when each source member is notified of membership changes, it will broadcast each element
584    /// to different members.
585    ///
586    /// # Example
587    /// ```rust
588    /// # #[cfg(feature = "deploy")] {
589    /// # use hydro_lang::prelude::*;
590    /// # use hydro_lang::location::MemberId;
591    /// # use futures::StreamExt;
592    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
593    /// # type Source = ();
594    /// # type Destination = ();
595    /// let source: Cluster<Source> = flow.cluster::<Source>();
596    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
597    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
598    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
599    /// # on_destination.entries().send_bincode(&p2).entries()
600    /// // if there are 4 members in the desination, each receives one element from each source member
601    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
602    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
603    /// // - ...
604    /// # }, |mut stream| async move {
605    /// # let mut results = Vec::new();
606    /// # for w in 0..16 {
607    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
608    /// # }
609    /// # results.sort();
610    /// # assert_eq!(results, vec![
611    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
612    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
613    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
614    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
615    /// # ]);
616    /// # }));
617    /// # }
618    /// ```
619    pub fn broadcast_bincode<L2: 'a>(
620        self,
621        other: &Cluster<'a, L2>,
622        nondet_membership: NonDet,
623    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
624    where
625        T: Clone + Serialize + DeserializeOwned,
626    {
627        let ids = track_membership(self.location.source_cluster_members(other));
628        let join_tick = self.location.tick();
629        let current_members = ids
630            .snapshot(&join_tick, nondet_membership)
631            .filter(q!(|b| *b));
632
633        self.batch(&join_tick, nondet_membership)
634            .repeat_with_keys(current_members)
635            .all_ticks()
636            .demux_bincode(other)
637    }
638}
639
640impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
641    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
642{
643    /// Sends elements of this stream at each source member to specific members of a destination
644    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
645    ///
646    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
647    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
648    /// this API allows precise targeting of specific cluster members rather than broadcasting to
649    /// all members.
650    ///
651    /// Each cluster member sends its local stream elements, and they are collected at each
652    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::prelude::*;
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
660    /// # type Source = ();
661    /// # type Destination = ();
662    /// let source: Cluster<Source> = flow.cluster::<Source>();
663    /// let to_send: Stream<_, Cluster<_>, _> = source
664    ///     .source_iter(q!(vec![0, 1, 2, 3]))
665    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
666    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
667    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
668    /// # all_received.entries().send_bincode(&p2).entries()
669    /// # }, |mut stream| async move {
670    /// // if there are 4 members in the destination cluster, each receives one message from each source member
671    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
672    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
673    /// // - ...
674    /// # let mut results = Vec::new();
675    /// # for w in 0..16 {
676    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
677    /// # }
678    /// # results.sort();
679    /// # assert_eq!(results, vec![
680    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
681    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
682    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
683    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
684    /// # ]);
685    /// # }));
686    /// # }
687    /// ```
688    pub fn demux_bincode(
689        self,
690        other: &Cluster<'a, L2>,
691    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
692    where
693        T: Serialize + DeserializeOwned,
694    {
695        self.into_keyed().demux_bincode(other)
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    #[cfg(feature = "sim")]
702    use stageleft::q;
703
704    #[cfg(feature = "sim")]
705    use crate::location::{Location, MemberId};
706    #[cfg(feature = "sim")]
707    use crate::nondet::nondet;
708    #[cfg(feature = "sim")]
709    use crate::prelude::FlowBuilder;
710
711    #[cfg(feature = "sim")]
712    #[test]
713    fn sim_send_bincode_o2o() {
714        let flow = FlowBuilder::new();
715        let node = flow.process::<()>();
716        let node2 = flow.process::<()>();
717
718        let (in_send, input) = node.sim_input();
719
720        let out_recv = input
721            .send_bincode(&node2)
722            .batch(&node2.tick(), nondet!(/** test */))
723            .count()
724            .all_ticks()
725            .sim_output();
726
727        let instances = flow.sim().exhaustive(async || {
728            in_send.send(());
729            in_send.send(());
730            in_send.send(());
731
732            let received = out_recv.collect::<Vec<_>>().await;
733            assert!(received.into_iter().sum::<usize>() == 3);
734        });
735
736        assert_eq!(instances, 4); // 2^{3 - 1}
737    }
738
739    #[cfg(feature = "sim")]
740    #[test]
741    fn sim_send_bincode_m2o() {
742        let flow = FlowBuilder::new();
743        let cluster = flow.cluster::<()>();
744        let node = flow.process::<()>();
745
746        let input = cluster.source_iter(q!(vec![1]));
747
748        let out_recv = input
749            .send_bincode(&node)
750            .entries()
751            .batch(&node.tick(), nondet!(/** test */))
752            .all_ticks()
753            .sim_output();
754
755        let instances = flow
756            .sim()
757            .with_cluster_size(&cluster, 4)
758            .exhaustive(async || {
759                out_recv
760                    .assert_yields_only_unordered(vec![
761                        (MemberId::from_raw_id(0), 1),
762                        (MemberId::from_raw_id(1), 1),
763                        (MemberId::from_raw_id(2), 1),
764                        (MemberId::from_raw_id(3), 1),
765                    ])
766                    .await
767            });
768
769        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
770    }
771
772    #[cfg(feature = "sim")]
773    #[test]
774    fn sim_send_bincode_multiple_m2o() {
775        let flow = FlowBuilder::new();
776        let cluster1 = flow.cluster::<()>();
777        let cluster2 = flow.cluster::<()>();
778        let node = flow.process::<()>();
779
780        let out_recv_1 = cluster1
781            .source_iter(q!(vec![1]))
782            .send_bincode(&node)
783            .entries()
784            .sim_output();
785
786        let out_recv_2 = cluster2
787            .source_iter(q!(vec![2]))
788            .send_bincode(&node)
789            .entries()
790            .sim_output();
791
792        let instances = flow
793            .sim()
794            .with_cluster_size(&cluster1, 3)
795            .with_cluster_size(&cluster2, 4)
796            .exhaustive(async || {
797                out_recv_1
798                    .assert_yields_only_unordered(vec![
799                        (MemberId::from_raw_id(0), 1),
800                        (MemberId::from_raw_id(1), 1),
801                        (MemberId::from_raw_id(2), 1),
802                    ])
803                    .await;
804
805                out_recv_2
806                    .assert_yields_only_unordered(vec![
807                        (MemberId::from_raw_id(0), 2),
808                        (MemberId::from_raw_id(1), 2),
809                        (MemberId::from_raw_id(2), 2),
810                        (MemberId::from_raw_id(3), 2),
811                    ])
812                    .await;
813            });
814
815        assert_eq!(instances, 1);
816    }
817
818    #[cfg(feature = "sim")]
819    #[test]
820    fn sim_send_bincode_o2m() {
821        let flow = FlowBuilder::new();
822        let cluster = flow.cluster::<()>();
823        let node = flow.process::<()>();
824
825        let input = node.source_iter(q!(vec![
826            (MemberId::from_raw_id(0), 123),
827            (MemberId::from_raw_id(1), 456),
828        ]));
829
830        let out_recv = input
831            .demux_bincode(&cluster)
832            .map(q!(|x| x + 1))
833            .send_bincode(&node)
834            .entries()
835            .sim_output();
836
837        flow.sim()
838            .with_cluster_size(&cluster, 4)
839            .exhaustive(async || {
840                out_recv
841                    .assert_yields_only_unordered(vec![
842                        (MemberId::from_raw_id(0), 124),
843                        (MemberId::from_raw_id(1), 457),
844                    ])
845                    .await
846            });
847    }
848
849    #[cfg(feature = "sim")]
850    #[test]
851    fn sim_broadcast_bincode_o2m() {
852        let flow = FlowBuilder::new();
853        let cluster = flow.cluster::<()>();
854        let node = flow.process::<()>();
855
856        let input = node.source_iter(q!(vec![123, 456]));
857
858        let out_recv = input
859            .broadcast_bincode(&cluster, nondet!(/** test */))
860            .map(q!(|x| x + 1))
861            .send_bincode(&node)
862            .entries()
863            .sim_output();
864
865        let mut c_1_produced = false;
866        let mut c_2_produced = false;
867
868        flow.sim()
869            .with_cluster_size(&cluster, 2)
870            .exhaustive(async || {
871                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
872
873                // check that order is preserved
874                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
875                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
876                    c_1_produced = true;
877                }
878
879                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
880                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
881                    c_2_produced = true;
882                }
883            });
884
885        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
886    }
887
888    #[cfg(feature = "sim")]
889    #[test]
890    fn sim_send_bincode_m2m() {
891        let flow = FlowBuilder::new();
892        let cluster = flow.cluster::<()>();
893        let node = flow.process::<()>();
894
895        let input = node.source_iter(q!(vec![
896            (MemberId::from_raw_id(0), 123),
897            (MemberId::from_raw_id(1), 456),
898        ]));
899
900        let out_recv = input
901            .demux_bincode(&cluster)
902            .map(q!(|x| x + 1))
903            .flat_map_ordered(q!(|x| vec![
904                (MemberId::from_raw_id(0), x),
905                (MemberId::from_raw_id(1), x),
906            ]))
907            .demux_bincode(&cluster)
908            .entries()
909            .send_bincode(&node)
910            .entries()
911            .sim_output();
912
913        flow.sim()
914            .with_cluster_size(&cluster, 4)
915            .exhaustive(async || {
916                out_recv
917                    .assert_yields_only_unordered(vec![
918                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
919                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
920                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
921                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
922                    ])
923                    .await
924            });
925    }
926}