hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
20 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22 ///
23 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25 /// API allows precise targeting of specific cluster members rather than broadcasting to
26 /// all members.
27 ///
28 /// # Example
29 /// ```rust
30 /// # #[cfg(feature = "deploy")] {
31 /// # use hydro_lang::prelude::*;
32 /// # use futures::StreamExt;
33 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34 /// let p1 = flow.process::<()>();
35 /// let workers: Cluster<()> = flow.cluster::<()>();
36 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39 /// .into_keyed()
40 /// .demux_bincode(&workers);
41 /// # on_worker.send_bincode(&p2).entries()
42 /// // if there are 4 members in the cluster, each receives one element
43 /// // - MemberId::<()>(0): [0]
44 /// // - MemberId::<()>(1): [1]
45 /// // - MemberId::<()>(2): [2]
46 /// // - MemberId::<()>(3): [3]
47 /// # }, |mut stream| async move {
48 /// # let mut results = Vec::new();
49 /// # for w in 0..4 {
50 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
51 /// # }
52 /// # results.sort();
53 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54 /// # }));
55 /// # }
56 /// ```
57 pub fn demux_bincode(
58 self,
59 other: &Cluster<'a, L2>,
60 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61 where
62 T: Serialize + DeserializeOwned,
63 {
64 self.demux(other, TCP.fail_stop().bincode())
65 }
66
67 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68 /// identifying the recipient for each group and using the configuration in `via` to set up the
69 /// message transport.
70 ///
71 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73 /// API allows precise targeting of specific cluster members rather than broadcasting to
74 /// all members.
75 ///
76 /// # Example
77 /// ```rust
78 /// # #[cfg(feature = "deploy")] {
79 /// # use hydro_lang::prelude::*;
80 /// # use futures::StreamExt;
81 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82 /// let p1 = flow.process::<()>();
83 /// let workers: Cluster<()> = flow.cluster::<()>();
84 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87 /// .into_keyed()
88 /// .demux(&workers, TCP.fail_stop().bincode());
89 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
90 /// // if there are 4 members in the cluster, each receives one element
91 /// // - MemberId::<()>(0): [0]
92 /// // - MemberId::<()>(1): [1]
93 /// // - MemberId::<()>(2): [2]
94 /// // - MemberId::<()>(3): [3]
95 /// # }, |mut stream| async move {
96 /// # let mut results = Vec::new();
97 /// # for w in 0..4 {
98 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
99 /// # }
100 /// # results.sort();
101 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102 /// # }));
103 /// # }
104 /// ```
105 pub fn demux<N: NetworkFor<T>>(
106 self,
107 to: &Cluster<'a, L2>,
108 via: N,
109 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
110 where
111 T: Serialize + DeserializeOwned,
112 {
113 let serialize_pipeline = Some(N::serialize_thunk(true));
114
115 let deserialize_pipeline = Some(N::deserialize_thunk(None));
116
117 let name = via.name();
118 if to.multiversioned() && name.is_none() {
119 panic!(
120 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
121 );
122 }
123
124 Stream::new(
125 to.clone(),
126 HydroNode::Network {
127 name: name.map(ToOwned::to_owned),
128 networking_info: N::networking_info(),
129 serialize_fn: serialize_pipeline.map(|e| e.into()),
130 instantiate_fn: DebugInstantiate::Building,
131 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
132 input: Box::new(self.ir_node.into_inner()),
133 metadata: to.new_node_metadata(
134 Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
135 ),
136 },
137 )
138 }
139}
140
141impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
142 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
143{
144 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
145 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
146 /// compound key where the first element is the recipient's [`MemberId`] and the second element
147 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
148 /// messages.
149 ///
150 /// # Example
151 /// ```rust
152 /// # #[cfg(feature = "deploy")] {
153 /// # use hydro_lang::prelude::*;
154 /// # use futures::StreamExt;
155 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
156 /// let p1 = flow.process::<()>();
157 /// let workers: Cluster<()> = flow.cluster::<()>();
158 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
159 /// .source_iter(q!(vec![0, 1, 2, 3]))
160 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
161 /// .into_keyed();
162 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
163 /// # on_worker.entries().send_bincode(&p2).entries()
164 /// // if there are 4 members in the cluster, each receives one element
165 /// // - MemberId::<()>(0): { 0: [123] }
166 /// // - MemberId::<()>(1): { 1: [124] }
167 /// // - ...
168 /// # }, |mut stream| async move {
169 /// # let mut results = Vec::new();
170 /// # for w in 0..4 {
171 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
172 /// # }
173 /// # results.sort();
174 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
175 /// # }));
176 /// # }
177 /// ```
178 pub fn demux_bincode(
179 self,
180 other: &Cluster<'a, L2>,
181 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
182 where
183 K: Serialize + DeserializeOwned,
184 T: Serialize + DeserializeOwned,
185 {
186 self.demux(other, TCP.fail_stop().bincode())
187 }
188
189 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
190 /// compound key where the first element is the recipient's [`MemberId`] and the second element
191 /// is a key that will be sent along with the value, using the configuration in `via` to set up
192 /// the message transport.
193 ///
194 /// # Example
195 /// ```rust
196 /// # #[cfg(feature = "deploy")] {
197 /// # use hydro_lang::prelude::*;
198 /// # use futures::StreamExt;
199 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
200 /// let p1 = flow.process::<()>();
201 /// let workers: Cluster<()> = flow.cluster::<()>();
202 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
203 /// .source_iter(q!(vec![0, 1, 2, 3]))
204 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
205 /// .into_keyed();
206 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
207 /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
208 /// // if there are 4 members in the cluster, each receives one element
209 /// // - MemberId::<()>(0): { 0: [123] }
210 /// // - MemberId::<()>(1): { 1: [124] }
211 /// // - ...
212 /// # }, |mut stream| async move {
213 /// # let mut results = Vec::new();
214 /// # for w in 0..4 {
215 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
216 /// # }
217 /// # results.sort();
218 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
219 /// # }));
220 /// # }
221 /// ```
222 pub fn demux<N: NetworkFor<(K, T)>>(
223 self,
224 to: &Cluster<'a, L2>,
225 via: N,
226 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
227 where
228 K: Serialize + DeserializeOwned,
229 T: Serialize + DeserializeOwned,
230 {
231 let serialize_pipeline = Some(N::serialize_thunk(true));
232
233 let deserialize_pipeline = Some(N::deserialize_thunk(None));
234
235 let name = via.name();
236 if to.multiversioned() && name.is_none() {
237 panic!(
238 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
239 );
240 }
241
242 KeyedStream::new(
243 to.clone(),
244 HydroNode::Network {
245 name: name.map(ToOwned::to_owned),
246 networking_info: N::networking_info(),
247 serialize_fn: serialize_pipeline.map(|e| e.into()),
248 instantiate_fn: DebugInstantiate::Building,
249 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
250 input: Box::new(
251 self.entries()
252 .map(q!(|((id, k), v)| (id, (k, v))))
253 .ir_node
254 .into_inner(),
255 ),
256 metadata: to.new_node_metadata(
257 KeyedStream::<K, T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
258 ),
259 },
260 )
261 }
262}
263
264impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
265 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
266{
267 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
268 /// Sends each group of this stream at each source member to a specific member of a destination
269 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
270 /// [`bincode`] to serialize/deserialize messages.
271 ///
272 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
273 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
274 /// API allows precise targeting of specific cluster members rather than broadcasting to all
275 /// members.
276 ///
277 /// Each cluster member sends its local stream elements, and they are collected at each
278 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
279 ///
280 /// # Example
281 /// ```rust
282 /// # #[cfg(feature = "deploy")] {
283 /// # use hydro_lang::prelude::*;
284 /// # use futures::StreamExt;
285 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
286 /// # type Source = ();
287 /// # type Destination = ();
288 /// let source: Cluster<Source> = flow.cluster::<Source>();
289 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
290 /// .source_iter(q!(vec![0, 1, 2, 3]))
291 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
292 /// .into_keyed();
293 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
294 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
295 /// # all_received.entries().send_bincode(&p2).entries()
296 /// # }, |mut stream| async move {
297 /// // if there are 4 members in the destination cluster, each receives one message from each source member
298 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
299 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
300 /// // - ...
301 /// # let mut results = Vec::new();
302 /// # for w in 0..16 {
303 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
304 /// # }
305 /// # results.sort();
306 /// # assert_eq!(results, vec![
307 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
308 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
309 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
310 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
311 /// # ]);
312 /// # }));
313 /// # }
314 /// ```
315 pub fn demux_bincode(
316 self,
317 other: &Cluster<'a, L2>,
318 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
319 where
320 T: Serialize + DeserializeOwned,
321 {
322 self.demux(other, TCP.fail_stop().bincode())
323 }
324
325 /// Sends each group of this stream at each source member to a specific member of a destination
326 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
327 /// configuration in `via` to set up the message transport.
328 ///
329 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
330 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
331 /// API allows precise targeting of specific cluster members rather than broadcasting to all
332 /// members.
333 ///
334 /// Each cluster member sends its local stream elements, and they are collected at each
335 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
336 ///
337 /// # Example
338 /// ```rust
339 /// # #[cfg(feature = "deploy")] {
340 /// # use hydro_lang::prelude::*;
341 /// # use futures::StreamExt;
342 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
343 /// # type Source = ();
344 /// # type Destination = ();
345 /// let source: Cluster<Source> = flow.cluster::<Source>();
346 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
347 /// .source_iter(q!(vec![0, 1, 2, 3]))
348 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
349 /// .into_keyed();
350 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
351 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
352 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
353 /// # }, |mut stream| async move {
354 /// // if there are 4 members in the destination cluster, each receives one message from each source member
355 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
356 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
357 /// // - ...
358 /// # let mut results = Vec::new();
359 /// # for w in 0..16 {
360 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
361 /// # }
362 /// # results.sort();
363 /// # assert_eq!(results, vec![
364 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
365 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
366 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
367 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
368 /// # ]);
369 /// # }));
370 /// # }
371 /// ```
372 pub fn demux<N: NetworkFor<T>>(
373 self,
374 to: &Cluster<'a, L2>,
375 via: N,
376 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
377 where
378 T: Serialize + DeserializeOwned,
379 {
380 let serialize_pipeline = Some(N::serialize_thunk(true));
381
382 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
383
384 let name = via.name();
385 if to.multiversioned() && name.is_none() {
386 panic!(
387 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
388 );
389 }
390
391 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
392 to.clone(),
393 HydroNode::Network {
394 name: name.map(ToOwned::to_owned),
395 networking_info: N::networking_info(),
396 serialize_fn: serialize_pipeline.map(|e| e.into()),
397 instantiate_fn: DebugInstantiate::Building,
398 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
399 input: Box::new(self.ir_node.into_inner()),
400 metadata: to.new_node_metadata(Stream::<
401 (MemberId<L>, T),
402 Cluster<'a, L2>,
403 Unbounded,
404 O,
405 R,
406 >::collection_kind()),
407 },
408 );
409
410 raw_stream.into_keyed()
411 }
412}
413
414impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
415 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
416{
417 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
418 #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
419 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
420 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
421 /// has a compound key where the first element is the sender's [`MemberId`] and the second
422 /// element is the original key.
423 ///
424 /// # Example
425 /// ```rust
426 /// # #[cfg(feature = "deploy")] {
427 /// # use hydro_lang::prelude::*;
428 /// # use futures::StreamExt;
429 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
430 /// # type Source = ();
431 /// # type Destination = ();
432 /// let source: Cluster<Source> = flow.cluster::<Source>();
433 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
434 /// .source_iter(q!(vec![0, 1, 2, 3]))
435 /// .map(q!(|x| (x, x + 123)))
436 /// .into_keyed();
437 /// let destination_process = flow.process::<Destination>();
438 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
439 /// # all_received.entries().send_bincode(&p2)
440 /// # }, |mut stream| async move {
441 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
442 /// // {
443 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
444 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
445 /// // ...
446 /// // }
447 /// # let mut results = Vec::new();
448 /// # for w in 0..16 {
449 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
450 /// # }
451 /// # results.sort();
452 /// # assert_eq!(results, vec![
453 /// # "((MemberId::<()>(0), 0), 123)",
454 /// # "((MemberId::<()>(0), 1), 124)",
455 /// # "((MemberId::<()>(0), 2), 125)",
456 /// # "((MemberId::<()>(0), 3), 126)",
457 /// # "((MemberId::<()>(1), 0), 123)",
458 /// # "((MemberId::<()>(1), 1), 124)",
459 /// # "((MemberId::<()>(1), 2), 125)",
460 /// # "((MemberId::<()>(1), 3), 126)",
461 /// # "((MemberId::<()>(2), 0), 123)",
462 /// # "((MemberId::<()>(2), 1), 124)",
463 /// # "((MemberId::<()>(2), 2), 125)",
464 /// # "((MemberId::<()>(2), 3), 126)",
465 /// # "((MemberId::<()>(3), 0), 123)",
466 /// # "((MemberId::<()>(3), 1), 124)",
467 /// # "((MemberId::<()>(3), 2), 125)",
468 /// # "((MemberId::<()>(3), 3), 126)",
469 /// # ]);
470 /// # }));
471 /// # }
472 /// ```
473 pub fn send_bincode<L2>(
474 self,
475 other: &Process<'a, L2>,
476 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
477 where
478 K: Serialize + DeserializeOwned,
479 V: Serialize + DeserializeOwned,
480 {
481 self.send(other, TCP.fail_stop().bincode())
482 }
483
484 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
485 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
486 /// network, using the configuration in `via` to set up the message transport. The resulting
487 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
488 /// the second element is the original key.
489 ///
490 /// # Example
491 /// ```rust
492 /// # #[cfg(feature = "deploy")] {
493 /// # use hydro_lang::prelude::*;
494 /// # use futures::StreamExt;
495 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
496 /// # type Source = ();
497 /// # type Destination = ();
498 /// let source: Cluster<Source> = flow.cluster::<Source>();
499 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
500 /// .source_iter(q!(vec![0, 1, 2, 3]))
501 /// .map(q!(|x| (x, x + 123)))
502 /// .into_keyed();
503 /// let destination_process = flow.process::<Destination>();
504 /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
505 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
506 /// # }, |mut stream| async move {
507 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
508 /// // {
509 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
510 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
511 /// // ...
512 /// // }
513 /// # let mut results = Vec::new();
514 /// # for w in 0..16 {
515 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
516 /// # }
517 /// # results.sort();
518 /// # assert_eq!(results, vec![
519 /// # "((MemberId::<()>(0), 0), 123)",
520 /// # "((MemberId::<()>(0), 1), 124)",
521 /// # "((MemberId::<()>(0), 2), 125)",
522 /// # "((MemberId::<()>(0), 3), 126)",
523 /// # "((MemberId::<()>(1), 0), 123)",
524 /// # "((MemberId::<()>(1), 1), 124)",
525 /// # "((MemberId::<()>(1), 2), 125)",
526 /// # "((MemberId::<()>(1), 3), 126)",
527 /// # "((MemberId::<()>(2), 0), 123)",
528 /// # "((MemberId::<()>(2), 1), 124)",
529 /// # "((MemberId::<()>(2), 2), 125)",
530 /// # "((MemberId::<()>(2), 3), 126)",
531 /// # "((MemberId::<()>(3), 0), 123)",
532 /// # "((MemberId::<()>(3), 1), 124)",
533 /// # "((MemberId::<()>(3), 2), 125)",
534 /// # "((MemberId::<()>(3), 3), 126)",
535 /// # ]);
536 /// # }));
537 /// # }
538 /// ```
539 pub fn send<L2, N: NetworkFor<(K, V)>>(
540 self,
541 to: &Process<'a, L2>,
542 via: N,
543 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
544 where
545 K: Serialize + DeserializeOwned,
546 V: Serialize + DeserializeOwned,
547 {
548 let serialize_pipeline = Some(N::serialize_thunk(false));
549
550 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
551
552 let name = via.name();
553 if to.multiversioned() && name.is_none() {
554 panic!(
555 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
556 );
557 }
558
559 let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
560 Stream::new(
561 to.clone(),
562 HydroNode::Network {
563 name: name.map(ToOwned::to_owned),
564 networking_info: N::networking_info(),
565 serialize_fn: serialize_pipeline.map(|e| e.into()),
566 instantiate_fn: DebugInstantiate::Building,
567 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
568 input: Box::new(self.ir_node.into_inner()),
569 metadata: to.new_node_metadata(Stream::<
570 (MemberId<L>, (K, V)),
571 Cluster<'a, L2>,
572 Unbounded,
573 O,
574 R,
575 >::collection_kind()),
576 },
577 );
578
579 raw_stream
580 .map(q!(|(sender, (k, v))| ((sender, k), v)))
581 .into_keyed()
582 }
583}