hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
11use crate::live_collections::stream::{Ordering, Retries, Stream};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
20 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
21 ///
22 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
23 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
24 /// API allows precise targeting of specific cluster members rather than broadcasting to
25 /// all members.
26 ///
27 /// # Example
28 /// ```rust
29 /// # #[cfg(feature = "deploy")] {
30 /// # use hydro_lang::prelude::*;
31 /// # use futures::StreamExt;
32 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
33 /// let p1 = flow.process::<()>();
34 /// let workers: Cluster<()> = flow.cluster::<()>();
35 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
36 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
37 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
38 /// .into_keyed()
39 /// .demux_bincode(&workers);
40 /// # on_worker.send_bincode(&p2).entries()
41 /// // if there are 4 members in the cluster, each receives one element
42 /// // - MemberId::<()>(0): [0]
43 /// // - MemberId::<()>(1): [1]
44 /// // - MemberId::<()>(2): [2]
45 /// // - MemberId::<()>(3): [3]
46 /// # }, |mut stream| async move {
47 /// # let mut results = Vec::new();
48 /// # for w in 0..4 {
49 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
50 /// # }
51 /// # results.sort();
52 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
53 /// # }));
54 /// # }
55 /// ```
56 pub fn demux_bincode(
57 self,
58 other: &Cluster<'a, L2>,
59 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
60 where
61 T: Serialize + DeserializeOwned,
62 {
63 let serialize_pipeline = Some(serialize_bincode::<T>(true));
64
65 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
66
67 Stream::new(
68 other.clone(),
69 HydroNode::Network {
70 serialize_fn: serialize_pipeline.map(|e| e.into()),
71 instantiate_fn: DebugInstantiate::Building,
72 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
73 input: Box::new(self.ir_node.into_inner()),
74 metadata: other.new_node_metadata(
75 Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
76 ),
77 },
78 )
79 }
80}
81
82impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
83 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
84{
85 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
86 /// compound key where the first element is the recipient's [`MemberId`] and the second element
87 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
88 /// messages.
89 ///
90 /// # Example
91 /// ```rust
92 /// # #[cfg(feature = "deploy")] {
93 /// # use hydro_lang::prelude::*;
94 /// # use futures::StreamExt;
95 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
96 /// let p1 = flow.process::<()>();
97 /// let workers: Cluster<()> = flow.cluster::<()>();
98 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
99 /// .source_iter(q!(vec![0, 1, 2, 3]))
100 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
101 /// .into_keyed();
102 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
103 /// # on_worker.entries().send_bincode(&p2).entries()
104 /// // if there are 4 members in the cluster, each receives one element
105 /// // - MemberId::<()>(0): { 0: [123] }
106 /// // - MemberId::<()>(1): { 1: [124] }
107 /// // - ...
108 /// # }, |mut stream| async move {
109 /// # let mut results = Vec::new();
110 /// # for w in 0..4 {
111 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
112 /// # }
113 /// # results.sort();
114 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
115 /// # }));
116 /// # }
117 /// ```
118 pub fn demux_bincode(
119 self,
120 other: &Cluster<'a, L2>,
121 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
122 where
123 K: Serialize + DeserializeOwned,
124 T: Serialize + DeserializeOwned,
125 {
126 let serialize_pipeline = Some(serialize_bincode::<(K, T)>(true));
127
128 let deserialize_pipeline = Some(deserialize_bincode::<(K, T)>(None));
129
130 KeyedStream::new(
131 other.clone(),
132 HydroNode::Network {
133 serialize_fn: serialize_pipeline.map(|e| e.into()),
134 instantiate_fn: DebugInstantiate::Building,
135 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
136 input: Box::new(
137 self.entries()
138 .map(q!(|((id, k), v)| (id, (k, v))))
139 .ir_node
140 .into_inner(),
141 ),
142 metadata: other.new_node_metadata(KeyedStream::<
143 K,
144 T,
145 Cluster<'a, L2>,
146 Unbounded,
147 O,
148 R,
149 >::collection_kind()),
150 },
151 )
152 }
153}
154
155impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
156 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
157{
158 /// Sends each group of this stream at each source member to a specific member of a destination
159 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
160 /// [`bincode`] to serialize/deserialize messages.
161 ///
162 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
163 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
164 /// API allows precise targeting of specific cluster members rather than broadcasting to all
165 /// members.
166 ///
167 /// Each cluster member sends its local stream elements, and they are collected at each
168 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
169 ///
170 /// # Example
171 /// ```rust
172 /// # #[cfg(feature = "deploy")] {
173 /// # use hydro_lang::prelude::*;
174 /// # use futures::StreamExt;
175 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
176 /// # type Source = ();
177 /// # type Destination = ();
178 /// let source: Cluster<Source> = flow.cluster::<Source>();
179 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
180 /// .source_iter(q!(vec![0, 1, 2, 3]))
181 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
182 /// .into_keyed();
183 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
184 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
185 /// # all_received.entries().send_bincode(&p2).entries()
186 /// # }, |mut stream| async move {
187 /// // if there are 4 members in the destination cluster, each receives one message from each source member
188 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
189 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
190 /// // - ...
191 /// # let mut results = Vec::new();
192 /// # for w in 0..16 {
193 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
194 /// # }
195 /// # results.sort();
196 /// # assert_eq!(results, vec![
197 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
198 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
199 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
200 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
201 /// # ]);
202 /// # }));
203 /// # }
204 /// ```
205 pub fn demux_bincode(
206 self,
207 other: &Cluster<'a, L2>,
208 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
209 where
210 T: Serialize + DeserializeOwned,
211 {
212 let serialize_pipeline = Some(serialize_bincode::<T>(true));
213
214 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
215
216 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
217 other.clone(),
218 HydroNode::Network {
219 serialize_fn: serialize_pipeline.map(|e| e.into()),
220 instantiate_fn: DebugInstantiate::Building,
221 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
222 input: Box::new(self.ir_node.into_inner()),
223 metadata: other.new_node_metadata(Stream::<
224 (MemberId<L>, T),
225 Cluster<'a, L2>,
226 Unbounded,
227 O,
228 R,
229 >::collection_kind()),
230 },
231 );
232
233 raw_stream.into_keyed()
234 }
235}
236
237impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
238 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
239{
240 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
241 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
242 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
243 /// has a compound key where the first element is the sender's [`MemberId`] and the second
244 /// element is the original key.
245 ///
246 /// # Example
247 /// ```rust
248 /// # #[cfg(feature = "deploy")] {
249 /// # use hydro_lang::prelude::*;
250 /// # use futures::StreamExt;
251 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
252 /// # type Source = ();
253 /// # type Destination = ();
254 /// let source: Cluster<Source> = flow.cluster::<Source>();
255 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
256 /// .source_iter(q!(vec![0, 1, 2, 3]))
257 /// .map(q!(|x| (x, x + 123)))
258 /// .into_keyed();
259 /// let destination_process = flow.process::<Destination>();
260 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
261 /// # all_received.entries().send_bincode(&p2)
262 /// # }, |mut stream| async move {
263 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
264 /// // {
265 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
266 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
267 /// // ...
268 /// // }
269 /// # let mut results = Vec::new();
270 /// # for w in 0..16 {
271 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
272 /// # }
273 /// # results.sort();
274 /// # assert_eq!(results, vec![
275 /// # "((MemberId::<()>(0), 0), 123)",
276 /// # "((MemberId::<()>(0), 1), 124)",
277 /// # "((MemberId::<()>(0), 2), 125)",
278 /// # "((MemberId::<()>(0), 3), 126)",
279 /// # "((MemberId::<()>(1), 0), 123)",
280 /// # "((MemberId::<()>(1), 1), 124)",
281 /// # "((MemberId::<()>(1), 2), 125)",
282 /// # "((MemberId::<()>(1), 3), 126)",
283 /// # "((MemberId::<()>(2), 0), 123)",
284 /// # "((MemberId::<()>(2), 1), 124)",
285 /// # "((MemberId::<()>(2), 2), 125)",
286 /// # "((MemberId::<()>(2), 3), 126)",
287 /// # "((MemberId::<()>(3), 0), 123)",
288 /// # "((MemberId::<()>(3), 1), 124)",
289 /// # "((MemberId::<()>(3), 2), 125)",
290 /// # "((MemberId::<()>(3), 3), 126)",
291 /// # ]);
292 /// # }));
293 /// # }
294 /// ```
295 pub fn send_bincode<L2>(
296 self,
297 other: &Process<'a, L2>,
298 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
299 where
300 K: Serialize + DeserializeOwned,
301 V: Serialize + DeserializeOwned,
302 {
303 let serialize_pipeline = Some(serialize_bincode::<(K, V)>(false));
304
305 let deserialize_pipeline = Some(deserialize_bincode::<(K, V)>(Some("e_type::<L>())));
306
307 let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
308 Stream::new(
309 other.clone(),
310 HydroNode::Network {
311 serialize_fn: serialize_pipeline.map(|e| e.into()),
312 instantiate_fn: DebugInstantiate::Building,
313 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
314 input: Box::new(self.ir_node.into_inner()),
315 metadata: other.new_node_metadata(Stream::<
316 (MemberId<L>, (K, V)),
317 Cluster<'a, L2>,
318 Unbounded,
319 O,
320 R,
321 >::collection_kind()),
322 },
323 );
324
325 raw_stream
326 .map(q!(|(sender, (k, v))| ((sender, k), v)))
327 .into_keyed()
328 }
329}