hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::quote_type;
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
11use crate::live_collections::stream::{Ordering, Retries, Stream};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
20    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
21    ///
22    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
23    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
24    /// API allows precise targeting of specific cluster members rather than broadcasting to
25    /// all members.
26    ///
27    /// # Example
28    /// ```rust
29    /// # use hydro_lang::prelude::*;
30    /// # use futures::StreamExt;
31    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
32    /// let p1 = flow.process::<()>();
33    /// let workers: Cluster<()> = flow.cluster::<()>();
34    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
35    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
36    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
37    ///     .into_keyed()
38    ///     .demux_bincode(&workers);
39    /// # on_worker.send_bincode(&p2).entries()
40    /// // if there are 4 members in the cluster, each receives one element
41    /// // - MemberId::<()>(0): [0]
42    /// // - MemberId::<()>(1): [1]
43    /// // - MemberId::<()>(2): [2]
44    /// // - MemberId::<()>(3): [3]
45    /// # }, |mut stream| async move {
46    /// # let mut results = Vec::new();
47    /// # for w in 0..4 {
48    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
49    /// # }
50    /// # results.sort();
51    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
52    /// # }));
53    /// ```
54    pub fn demux_bincode(
55        self,
56        other: &Cluster<'a, L2>,
57    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
58    where
59        T: Serialize + DeserializeOwned,
60    {
61        let serialize_pipeline = Some(serialize_bincode::<T>(true));
62
63        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
64
65        Stream::new(
66            other.clone(),
67            HydroNode::Network {
68                serialize_fn: serialize_pipeline.map(|e| e.into()),
69                instantiate_fn: DebugInstantiate::Building,
70                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
71                input: Box::new(self.ir_node.into_inner()),
72                metadata: other.new_node_metadata(
73                    Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
74                ),
75            },
76        )
77    }
78}
79
80impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
81    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
82{
83    /// Sends each group of this stream at each source member to a specific member of a destination
84    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
85    /// [`bincode`] to serialize/deserialize messages.
86    ///
87    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
88    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
89    /// API allows precise targeting of specific cluster members rather than broadcasting to all
90    /// members.
91    ///
92    /// Each cluster member sends its local stream elements, and they are collected at each
93    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
94    ///
95    /// # Example
96    /// ```rust
97    /// # use hydro_lang::prelude::*;
98    /// # use futures::StreamExt;
99    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
100    /// # type Source = ();
101    /// # type Destination = ();
102    /// let source: Cluster<Source> = flow.cluster::<Source>();
103    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
104    ///     .source_iter(q!(vec![0, 1, 2, 3]))
105    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
106    ///     .into_keyed();
107    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
108    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
109    /// # all_received.entries().send_bincode(&p2).entries()
110    /// # }, |mut stream| async move {
111    /// // if there are 4 members in the destination cluster, each receives one message from each source member
112    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
113    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
114    /// // - ...
115    /// # let mut results = Vec::new();
116    /// # for w in 0..16 {
117    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
118    /// # }
119    /// # results.sort();
120    /// # assert_eq!(results, vec![
121    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
122    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
123    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
124    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
125    /// # ]);
126    /// # }));
127    /// ```
128    pub fn demux_bincode(
129        self,
130        other: &Cluster<'a, L2>,
131    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
132    where
133        T: Serialize + DeserializeOwned,
134    {
135        let serialize_pipeline = Some(serialize_bincode::<T>(true));
136
137        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
138
139        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
140            other.clone(),
141            HydroNode::Network {
142                serialize_fn: serialize_pipeline.map(|e| e.into()),
143                instantiate_fn: DebugInstantiate::Building,
144                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
145                input: Box::new(self.ir_node.into_inner()),
146                metadata: other.new_node_metadata(Stream::<
147                    (MemberId<L>, T),
148                    Cluster<'a, L2>,
149                    Unbounded,
150                    O,
151                    R,
152                >::collection_kind()),
153            },
154        );
155
156        raw_stream.into_keyed()
157    }
158}