hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
11use crate::live_collections::stream::{Ordering, Retries, Stream};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
20    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
21    ///
22    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
23    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
24    /// API allows precise targeting of specific cluster members rather than broadcasting to
25    /// all members.
26    ///
27    /// # Example
28    /// ```rust
29    /// # #[cfg(feature = "deploy")] {
30    /// # use hydro_lang::prelude::*;
31    /// # use futures::StreamExt;
32    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
33    /// let p1 = flow.process::<()>();
34    /// let workers: Cluster<()> = flow.cluster::<()>();
35    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
36    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
37    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
38    ///     .into_keyed()
39    ///     .demux_bincode(&workers);
40    /// # on_worker.send_bincode(&p2).entries()
41    /// // if there are 4 members in the cluster, each receives one element
42    /// // - MemberId::<()>(0): [0]
43    /// // - MemberId::<()>(1): [1]
44    /// // - MemberId::<()>(2): [2]
45    /// // - MemberId::<()>(3): [3]
46    /// # }, |mut stream| async move {
47    /// # let mut results = Vec::new();
48    /// # for w in 0..4 {
49    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
50    /// # }
51    /// # results.sort();
52    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
53    /// # }));
54    /// # }
55    /// ```
56    pub fn demux_bincode(
57        self,
58        other: &Cluster<'a, L2>,
59    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
60    where
61        T: Serialize + DeserializeOwned,
62    {
63        let serialize_pipeline = Some(serialize_bincode::<T>(true));
64
65        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
66
67        Stream::new(
68            other.clone(),
69            HydroNode::Network {
70                serialize_fn: serialize_pipeline.map(|e| e.into()),
71                instantiate_fn: DebugInstantiate::Building,
72                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
73                input: Box::new(self.ir_node.into_inner()),
74                metadata: other.new_node_metadata(
75                    Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
76                ),
77            },
78        )
79    }
80}
81
82impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
83    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
84{
85    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
86    /// compound key where the first element is the recipient's [`MemberId`] and the second element
87    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
88    /// messages.
89    ///
90    /// # Example
91    /// ```rust
92    /// # #[cfg(feature = "deploy")] {
93    /// # use hydro_lang::prelude::*;
94    /// # use futures::StreamExt;
95    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
96    /// let p1 = flow.process::<()>();
97    /// let workers: Cluster<()> = flow.cluster::<()>();
98    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
99    ///     .source_iter(q!(vec![0, 1, 2, 3]))
100    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
101    ///     .into_keyed();
102    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
103    /// # on_worker.entries().send_bincode(&p2).entries()
104    /// // if there are 4 members in the cluster, each receives one element
105    /// // - MemberId::<()>(0): { 0: [123] }
106    /// // - MemberId::<()>(1): { 1: [124] }
107    /// // - ...
108    /// # }, |mut stream| async move {
109    /// # let mut results = Vec::new();
110    /// # for w in 0..4 {
111    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
112    /// # }
113    /// # results.sort();
114    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
115    /// # }));
116    /// # }
117    /// ```
118    pub fn demux_bincode(
119        self,
120        other: &Cluster<'a, L2>,
121    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
122    where
123        K: Serialize + DeserializeOwned,
124        T: Serialize + DeserializeOwned,
125    {
126        let serialize_pipeline = Some(serialize_bincode::<(K, T)>(true));
127
128        let deserialize_pipeline = Some(deserialize_bincode::<(K, T)>(None));
129
130        KeyedStream::new(
131            other.clone(),
132            HydroNode::Network {
133                serialize_fn: serialize_pipeline.map(|e| e.into()),
134                instantiate_fn: DebugInstantiate::Building,
135                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
136                input: Box::new(
137                    self.entries()
138                        .map(q!(|((id, k), v)| (id, (k, v))))
139                        .ir_node
140                        .into_inner(),
141                ),
142                metadata: other.new_node_metadata(KeyedStream::<
143                    K,
144                    T,
145                    Cluster<'a, L2>,
146                    Unbounded,
147                    O,
148                    R,
149                >::collection_kind()),
150            },
151        )
152    }
153}
154
155impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
156    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
157{
158    /// Sends each group of this stream at each source member to a specific member of a destination
159    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
160    /// [`bincode`] to serialize/deserialize messages.
161    ///
162    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
163    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
164    /// API allows precise targeting of specific cluster members rather than broadcasting to all
165    /// members.
166    ///
167    /// Each cluster member sends its local stream elements, and they are collected at each
168    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
169    ///
170    /// # Example
171    /// ```rust
172    /// # #[cfg(feature = "deploy")] {
173    /// # use hydro_lang::prelude::*;
174    /// # use futures::StreamExt;
175    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
176    /// # type Source = ();
177    /// # type Destination = ();
178    /// let source: Cluster<Source> = flow.cluster::<Source>();
179    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
180    ///     .source_iter(q!(vec![0, 1, 2, 3]))
181    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
182    ///     .into_keyed();
183    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
184    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
185    /// # all_received.entries().send_bincode(&p2).entries()
186    /// # }, |mut stream| async move {
187    /// // if there are 4 members in the destination cluster, each receives one message from each source member
188    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
189    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
190    /// // - ...
191    /// # let mut results = Vec::new();
192    /// # for w in 0..16 {
193    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
194    /// # }
195    /// # results.sort();
196    /// # assert_eq!(results, vec![
197    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
198    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
199    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
200    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
201    /// # ]);
202    /// # }));
203    /// # }
204    /// ```
205    pub fn demux_bincode(
206        self,
207        other: &Cluster<'a, L2>,
208    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
209    where
210        T: Serialize + DeserializeOwned,
211    {
212        let serialize_pipeline = Some(serialize_bincode::<T>(true));
213
214        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
215
216        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
217            other.clone(),
218            HydroNode::Network {
219                serialize_fn: serialize_pipeline.map(|e| e.into()),
220                instantiate_fn: DebugInstantiate::Building,
221                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
222                input: Box::new(self.ir_node.into_inner()),
223                metadata: other.new_node_metadata(Stream::<
224                    (MemberId<L>, T),
225                    Cluster<'a, L2>,
226                    Unbounded,
227                    O,
228                    R,
229                >::collection_kind()),
230            },
231        );
232
233        raw_stream.into_keyed()
234    }
235}
236
237impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
238    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
239{
240    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
241    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
242    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
243    /// has a compound key where the first element is the sender's [`MemberId`] and the second
244    /// element is the original key.
245    ///
246    /// # Example
247    /// ```rust
248    /// # #[cfg(feature = "deploy")] {
249    /// # use hydro_lang::prelude::*;
250    /// # use futures::StreamExt;
251    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
252    /// # type Source = ();
253    /// # type Destination = ();
254    /// let source: Cluster<Source> = flow.cluster::<Source>();
255    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
256    ///     .source_iter(q!(vec![0, 1, 2, 3]))
257    ///     .map(q!(|x| (x, x + 123)))
258    ///     .into_keyed();
259    /// let destination_process = flow.process::<Destination>();
260    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
261    /// # all_received.entries().send_bincode(&p2)
262    /// # }, |mut stream| async move {
263    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
264    /// // {
265    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
266    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
267    /// //     ...
268    /// // }
269    /// # let mut results = Vec::new();
270    /// # for w in 0..16 {
271    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
272    /// # }
273    /// # results.sort();
274    /// # assert_eq!(results, vec![
275    /// #   "((MemberId::<()>(0), 0), 123)",
276    /// #   "((MemberId::<()>(0), 1), 124)",
277    /// #   "((MemberId::<()>(0), 2), 125)",
278    /// #   "((MemberId::<()>(0), 3), 126)",
279    /// #   "((MemberId::<()>(1), 0), 123)",
280    /// #   "((MemberId::<()>(1), 1), 124)",
281    /// #   "((MemberId::<()>(1), 2), 125)",
282    /// #   "((MemberId::<()>(1), 3), 126)",
283    /// #   "((MemberId::<()>(2), 0), 123)",
284    /// #   "((MemberId::<()>(2), 1), 124)",
285    /// #   "((MemberId::<()>(2), 2), 125)",
286    /// #   "((MemberId::<()>(2), 3), 126)",
287    /// #   "((MemberId::<()>(3), 0), 123)",
288    /// #   "((MemberId::<()>(3), 1), 124)",
289    /// #   "((MemberId::<()>(3), 2), 125)",
290    /// #   "((MemberId::<()>(3), 3), 126)",
291    /// # ]);
292    /// # }));
293    /// # }
294    /// ```
295    pub fn send_bincode<L2>(
296        self,
297        other: &Process<'a, L2>,
298    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
299    where
300        K: Serialize + DeserializeOwned,
301        V: Serialize + DeserializeOwned,
302    {
303        let serialize_pipeline = Some(serialize_bincode::<(K, V)>(false));
304
305        let deserialize_pipeline = Some(deserialize_bincode::<(K, V)>(Some(&quote_type::<L>())));
306
307        let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
308            Stream::new(
309                other.clone(),
310                HydroNode::Network {
311                    serialize_fn: serialize_pipeline.map(|e| e.into()),
312                    instantiate_fn: DebugInstantiate::Building,
313                    deserialize_fn: deserialize_pipeline.map(|e| e.into()),
314                    input: Box::new(self.ir_node.into_inner()),
315                    metadata: other.new_node_metadata(Stream::<
316                        (MemberId<L>, (K, V)),
317                        Cluster<'a, L2>,
318                        Unbounded,
319                        O,
320                        R,
321                    >::collection_kind()),
322                },
323            );
324
325        raw_stream
326            .map(q!(|(sender, (k, v))| ((sender, k), v)))
327            .into_keyed()
328    }
329}