hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11use crate::location::cluster::{Consistency, NoConsistency};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15use crate::networking::{NetworkFor, TCP};
16
17impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
18 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
19{
20 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
21 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
22 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
23 ///
24 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
25 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
26 /// API allows precise targeting of specific cluster members rather than broadcasting to
27 /// all members.
28 ///
29 /// # Example
30 /// ```rust
31 /// # #[cfg(feature = "deploy")] {
32 /// # use hydro_lang::prelude::*;
33 /// # use futures::StreamExt;
34 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
35 /// let p1 = flow.process::<()>();
36 /// let workers: Cluster<()> = flow.cluster::<()>();
37 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
38 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
39 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
40 /// .into_keyed()
41 /// .demux_bincode(&workers);
42 /// # on_worker.send_bincode(&p2).entries()
43 /// // if there are 4 members in the cluster, each receives one element
44 /// // - MemberId::<()>(0): [0]
45 /// // - MemberId::<()>(1): [1]
46 /// // - MemberId::<()>(2): [2]
47 /// // - MemberId::<()>(3): [3]
48 /// # }, |mut stream| async move {
49 /// # let mut results = Vec::new();
50 /// # for w in 0..4 {
51 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
52 /// # }
53 /// # results.sort();
54 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
55 /// # }));
56 /// # }
57 /// ```
58 pub fn demux_bincode(
59 self,
60 other: &Cluster<'a, L2>,
61 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
62 where
63 T: Serialize + DeserializeOwned,
64 {
65 self.demux(other, TCP.fail_stop().bincode())
66 }
67
68 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
69 /// identifying the recipient for each group and using the configuration in `via` to set up the
70 /// message transport.
71 ///
72 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
73 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
74 /// API allows precise targeting of specific cluster members rather than broadcasting to
75 /// all members.
76 ///
77 /// # Example
78 /// ```rust
79 /// # #[cfg(feature = "deploy")] {
80 /// # use hydro_lang::prelude::*;
81 /// # use futures::StreamExt;
82 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
83 /// let p1 = flow.process::<()>();
84 /// let workers: Cluster<()> = flow.cluster::<()>();
85 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
86 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
87 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
88 /// .into_keyed()
89 /// .demux(&workers, TCP.fail_stop().bincode());
90 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
91 /// // if there are 4 members in the cluster, each receives one element
92 /// // - MemberId::<()>(0): [0]
93 /// // - MemberId::<()>(1): [1]
94 /// // - MemberId::<()>(2): [2]
95 /// // - MemberId::<()>(3): [3]
96 /// # }, |mut stream| async move {
97 /// # let mut results = Vec::new();
98 /// # for w in 0..4 {
99 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
100 /// # }
101 /// # results.sort();
102 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
103 /// # }));
104 /// # }
105 /// ```
106 #[expect(clippy::type_complexity, reason = "DropConsistency type")]
107 pub fn demux<N: NetworkFor<T>>(
108 self,
109 to: &Cluster<'a, L2>,
110 via: N,
111 ) -> Stream<
112 T,
113 // NoConsistency because there each replica member may receive different streams
114 Cluster<'a, L2, NoConsistency>,
115 Unbounded,
116 <O as MinOrder<N::OrderingGuarantee>>::Min,
117 R,
118 >
119 where
120 T: Serialize + DeserializeOwned,
121 O: MinOrder<N::OrderingGuarantee>,
122 {
123 let serialize_pipeline = Some(N::serialize_thunk(true));
124
125 let deserialize_pipeline = Some(N::deserialize_thunk(None));
126
127 let name = via.name();
128 if to.multiversioned() && name.is_none() {
129 panic!(
130 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
131 );
132 }
133
134 Stream::new(
135 to.clone(),
136 HydroNode::Network {
137 name: name.map(ToOwned::to_owned),
138 networking_info: N::networking_info(),
139 serialize_fn: serialize_pipeline.map(|e| e.into()),
140 instantiate_fn: DebugInstantiate::Building,
141 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
142 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
143 metadata: to.new_node_metadata(Stream::<
144 T,
145 Cluster<'a, L2>,
146 Unbounded,
147 <O as MinOrder<N::OrderingGuarantee>>::Min,
148 R,
149 >::collection_kind()),
150 },
151 )
152 }
153}
154
155impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
156 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
157{
158 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
159 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
160 /// compound key where the first element is the recipient's [`MemberId`] and the second element
161 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
162 /// messages.
163 ///
164 /// # Example
165 /// ```rust
166 /// # #[cfg(feature = "deploy")] {
167 /// # use hydro_lang::prelude::*;
168 /// # use futures::StreamExt;
169 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
170 /// let p1 = flow.process::<()>();
171 /// let workers: Cluster<()> = flow.cluster::<()>();
172 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
173 /// .source_iter(q!(vec![0, 1, 2, 3]))
174 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
175 /// .into_keyed();
176 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
177 /// # on_worker.entries().send_bincode(&p2).entries()
178 /// // if there are 4 members in the cluster, each receives one element
179 /// // - MemberId::<()>(0): { 0: [123] }
180 /// // - MemberId::<()>(1): { 1: [124] }
181 /// // - ...
182 /// # }, |mut stream| async move {
183 /// # let mut results = Vec::new();
184 /// # for w in 0..4 {
185 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
186 /// # }
187 /// # results.sort();
188 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
189 /// # }));
190 /// # }
191 /// ```
192 pub fn demux_bincode(
193 self,
194 other: &Cluster<'a, L2>,
195 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
196 where
197 K: Serialize + DeserializeOwned,
198 T: Serialize + DeserializeOwned,
199 {
200 self.demux(other, TCP.fail_stop().bincode())
201 }
202
203 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
204 /// compound key where the first element is the recipient's [`MemberId`] and the second element
205 /// is a key that will be sent along with the value, using the configuration in `via` to set up
206 /// the message transport.
207 ///
208 /// # Example
209 /// ```rust
210 /// # #[cfg(feature = "deploy")] {
211 /// # use hydro_lang::prelude::*;
212 /// # use futures::StreamExt;
213 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214 /// let p1 = flow.process::<()>();
215 /// let workers: Cluster<()> = flow.cluster::<()>();
216 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
217 /// .source_iter(q!(vec![0, 1, 2, 3]))
218 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
219 /// .into_keyed();
220 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
221 /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
222 /// // if there are 4 members in the cluster, each receives one element
223 /// // - MemberId::<()>(0): { 0: [123] }
224 /// // - MemberId::<()>(1): { 1: [124] }
225 /// // - ...
226 /// # }, |mut stream| async move {
227 /// # let mut results = Vec::new();
228 /// # for w in 0..4 {
229 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
230 /// # }
231 /// # results.sort();
232 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
233 /// # }));
234 /// # }
235 /// ```
236 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
237 pub fn demux<N: NetworkFor<(K, T)>>(
238 self,
239 to: &Cluster<'a, L2>,
240 via: N,
241 ) -> KeyedStream<
242 K,
243 T,
244 Cluster<'a, L2, NoConsistency>,
245 Unbounded,
246 <O as MinOrder<N::OrderingGuarantee>>::Min,
247 R,
248 >
249 where
250 K: Serialize + DeserializeOwned,
251 T: Serialize + DeserializeOwned,
252 O: MinOrder<N::OrderingGuarantee>,
253 {
254 let serialize_pipeline = Some(N::serialize_thunk(true));
255
256 let deserialize_pipeline = Some(N::deserialize_thunk(None));
257
258 let name = via.name();
259 if to.multiversioned() && name.is_none() {
260 panic!(
261 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
262 );
263 }
264
265 KeyedStream::new(
266 to.clone(),
267 HydroNode::Network {
268 name: name.map(ToOwned::to_owned),
269 networking_info: N::networking_info(),
270 serialize_fn: serialize_pipeline.map(|e| e.into()),
271 instantiate_fn: DebugInstantiate::Building,
272 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
273 input: Box::new(
274 self.entries()
275 .map(q!(|((id, k), v)| (id, (k, v))))
276 .ir_node
277 .replace(HydroNode::Placeholder),
278 ),
279 metadata: to.new_node_metadata(KeyedStream::<
280 K,
281 T,
282 Cluster<'a, L2>,
283 Unbounded,
284 <O as MinOrder<N::OrderingGuarantee>>::Min,
285 R,
286 >::collection_kind()),
287 },
288 )
289 }
290}
291
292impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
293 KeyedStream<MemberId<L2>, T, Cluster<'a, L, C>, B, O, R>
294{
295 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
296 /// Sends each group of this stream at each source member to a specific member of a destination
297 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
298 /// [`bincode`] to serialize/deserialize messages.
299 ///
300 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
301 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
302 /// API allows precise targeting of specific cluster members rather than broadcasting to all
303 /// members.
304 ///
305 /// Each cluster member sends its local stream elements, and they are collected at each
306 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
307 ///
308 /// # Example
309 /// ```rust
310 /// # #[cfg(feature = "deploy")] {
311 /// # use hydro_lang::prelude::*;
312 /// # use futures::StreamExt;
313 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
314 /// # type Source = ();
315 /// # type Destination = ();
316 /// let source: Cluster<Source> = flow.cluster::<Source>();
317 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
318 /// .source_iter(q!(vec![0, 1, 2, 3]))
319 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
320 /// .into_keyed();
321 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
322 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
323 /// # all_received.entries().send_bincode(&p2).entries()
324 /// # }, |mut stream| async move {
325 /// // if there are 4 members in the destination cluster, each receives one message from each source member
326 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
327 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
328 /// // - ...
329 /// # let mut results = Vec::new();
330 /// # for w in 0..16 {
331 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
332 /// # }
333 /// # results.sort();
334 /// # assert_eq!(results, vec![
335 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
336 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
337 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
338 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
339 /// # ]);
340 /// # }));
341 /// # }
342 /// ```
343 pub fn demux_bincode(
344 self,
345 other: &Cluster<'a, L2>,
346 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
347 where
348 T: Serialize + DeserializeOwned,
349 {
350 self.demux(other, TCP.fail_stop().bincode())
351 }
352
353 /// Sends each group of this stream at each source member to a specific member of a destination
354 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
355 /// configuration in `via` to set up the message transport.
356 ///
357 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
358 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
359 /// API allows precise targeting of specific cluster members rather than broadcasting to all
360 /// members.
361 ///
362 /// Each cluster member sends its local stream elements, and they are collected at each
363 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
364 ///
365 /// # Example
366 /// ```rust
367 /// # #[cfg(feature = "deploy")] {
368 /// # use hydro_lang::prelude::*;
369 /// # use futures::StreamExt;
370 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
371 /// # type Source = ();
372 /// # type Destination = ();
373 /// let source: Cluster<Source> = flow.cluster::<Source>();
374 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
375 /// .source_iter(q!(vec![0, 1, 2, 3]))
376 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
377 /// .into_keyed();
378 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
379 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
380 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
381 /// # }, |mut stream| async move {
382 /// // if there are 4 members in the destination cluster, each receives one message from each source member
383 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
384 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
385 /// // - ...
386 /// # let mut results = Vec::new();
387 /// # for w in 0..16 {
388 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
389 /// # }
390 /// # results.sort();
391 /// # assert_eq!(results, vec![
392 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
393 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
394 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
395 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
396 /// # ]);
397 /// # }));
398 /// # }
399 /// ```
400 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
401 pub fn demux<N: NetworkFor<T>>(
402 self,
403 to: &Cluster<'a, L2>,
404 via: N,
405 ) -> KeyedStream<
406 MemberId<L>,
407 T,
408 Cluster<'a, L2, NoConsistency>,
409 Unbounded,
410 <O as MinOrder<N::OrderingGuarantee>>::Min,
411 R,
412 >
413 where
414 T: Serialize + DeserializeOwned,
415 O: MinOrder<N::OrderingGuarantee>,
416 {
417 let serialize_pipeline = Some(N::serialize_thunk(true));
418
419 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
420
421 let name = via.name();
422 if to.multiversioned() && name.is_none() {
423 panic!(
424 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
425 );
426 }
427
428 KeyedStream::new(
429 to.clone(),
430 HydroNode::Network {
431 name: name.map(ToOwned::to_owned),
432 networking_info: N::networking_info(),
433 serialize_fn: serialize_pipeline.map(|e| e.into()),
434 instantiate_fn: DebugInstantiate::Building,
435 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
436 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
437 metadata: to.new_node_metadata(KeyedStream::<
438 MemberId<L>,
439 T,
440 Cluster<'a, L2>,
441 Unbounded,
442 <O as MinOrder<N::OrderingGuarantee>>::Min,
443 R,
444 >::collection_kind()),
445 },
446 )
447 }
448}
449
450impl<'a, K, V, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
451 KeyedStream<K, V, Cluster<'a, L, C>, B, O, R>
452{
453 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
454 #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
455 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
456 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
457 /// has a compound key where the first element is the sender's [`MemberId`] and the second
458 /// element is the original key.
459 ///
460 /// # Example
461 /// ```rust
462 /// # #[cfg(feature = "deploy")] {
463 /// # use hydro_lang::prelude::*;
464 /// # use futures::StreamExt;
465 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
466 /// # type Source = ();
467 /// # type Destination = ();
468 /// let source: Cluster<Source> = flow.cluster::<Source>();
469 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
470 /// .source_iter(q!(vec![0, 1, 2, 3]))
471 /// .map(q!(|x| (x, x + 123)))
472 /// .into_keyed();
473 /// let destination_process = flow.process::<Destination>();
474 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
475 /// # all_received.entries().send_bincode(&p2)
476 /// # }, |mut stream| async move {
477 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
478 /// // {
479 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
480 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
481 /// // ...
482 /// // }
483 /// # let mut results = Vec::new();
484 /// # for w in 0..16 {
485 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
486 /// # }
487 /// # results.sort();
488 /// # assert_eq!(results, vec![
489 /// # "((MemberId::<()>(0), 0), 123)",
490 /// # "((MemberId::<()>(0), 1), 124)",
491 /// # "((MemberId::<()>(0), 2), 125)",
492 /// # "((MemberId::<()>(0), 3), 126)",
493 /// # "((MemberId::<()>(1), 0), 123)",
494 /// # "((MemberId::<()>(1), 1), 124)",
495 /// # "((MemberId::<()>(1), 2), 125)",
496 /// # "((MemberId::<()>(1), 3), 126)",
497 /// # "((MemberId::<()>(2), 0), 123)",
498 /// # "((MemberId::<()>(2), 1), 124)",
499 /// # "((MemberId::<()>(2), 2), 125)",
500 /// # "((MemberId::<()>(2), 3), 126)",
501 /// # "((MemberId::<()>(3), 0), 123)",
502 /// # "((MemberId::<()>(3), 1), 124)",
503 /// # "((MemberId::<()>(3), 2), 125)",
504 /// # "((MemberId::<()>(3), 3), 126)",
505 /// # ]);
506 /// # }));
507 /// # }
508 /// ```
509 pub fn send_bincode<L2>(
510 self,
511 other: &Process<'a, L2>,
512 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
513 where
514 K: Serialize + DeserializeOwned,
515 V: Serialize + DeserializeOwned,
516 {
517 self.send(other, TCP.fail_stop().bincode())
518 }
519
520 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
521 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
522 /// network, using the configuration in `via` to set up the message transport. The resulting
523 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
524 /// the second element is the original key.
525 ///
526 /// # Example
527 /// ```rust
528 /// # #[cfg(feature = "deploy")] {
529 /// # use hydro_lang::prelude::*;
530 /// # use futures::StreamExt;
531 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
532 /// # type Source = ();
533 /// # type Destination = ();
534 /// let source: Cluster<Source> = flow.cluster::<Source>();
535 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
536 /// .source_iter(q!(vec![0, 1, 2, 3]))
537 /// .map(q!(|x| (x, x + 123)))
538 /// .into_keyed();
539 /// let destination_process = flow.process::<Destination>();
540 /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
541 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
542 /// # }, |mut stream| async move {
543 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
544 /// // {
545 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
546 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
547 /// // ...
548 /// // }
549 /// # let mut results = Vec::new();
550 /// # for w in 0..16 {
551 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
552 /// # }
553 /// # results.sort();
554 /// # assert_eq!(results, vec![
555 /// # "((MemberId::<()>(0), 0), 123)",
556 /// # "((MemberId::<()>(0), 1), 124)",
557 /// # "((MemberId::<()>(0), 2), 125)",
558 /// # "((MemberId::<()>(0), 3), 126)",
559 /// # "((MemberId::<()>(1), 0), 123)",
560 /// # "((MemberId::<()>(1), 1), 124)",
561 /// # "((MemberId::<()>(1), 2), 125)",
562 /// # "((MemberId::<()>(1), 3), 126)",
563 /// # "((MemberId::<()>(2), 0), 123)",
564 /// # "((MemberId::<()>(2), 1), 124)",
565 /// # "((MemberId::<()>(2), 2), 125)",
566 /// # "((MemberId::<()>(2), 3), 126)",
567 /// # "((MemberId::<()>(3), 0), 123)",
568 /// # "((MemberId::<()>(3), 1), 124)",
569 /// # "((MemberId::<()>(3), 2), 125)",
570 /// # "((MemberId::<()>(3), 3), 126)",
571 /// # ]);
572 /// # }));
573 /// # }
574 /// ```
575 pub fn send<L2, N: NetworkFor<(K, V)>>(
576 self,
577 to: &Process<'a, L2>,
578 via: N,
579 ) -> KeyedStream<
580 (MemberId<L>, K),
581 V,
582 Process<'a, L2>,
583 Unbounded,
584 <O as MinOrder<N::OrderingGuarantee>>::Min,
585 R,
586 >
587 where
588 K: Serialize + DeserializeOwned,
589 V: Serialize + DeserializeOwned,
590 O: MinOrder<N::OrderingGuarantee>,
591 {
592 let serialize_pipeline = Some(N::serialize_thunk(false));
593
594 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
595
596 let name = via.name();
597 if to.multiversioned() && name.is_none() {
598 panic!(
599 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
600 );
601 }
602
603 let raw_stream: Stream<
604 (MemberId<L>, (K, V)),
605 Process<'a, L2>,
606 Unbounded,
607 <O as MinOrder<N::OrderingGuarantee>>::Min,
608 R,
609 > = Stream::new(
610 to.clone(),
611 HydroNode::Network {
612 name: name.map(ToOwned::to_owned),
613 networking_info: N::networking_info(),
614 serialize_fn: serialize_pipeline.map(|e| e.into()),
615 instantiate_fn: DebugInstantiate::Building,
616 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
617 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
618 metadata: to.new_node_metadata(Stream::<
619 (MemberId<L>, (K, V)),
620 Cluster<'a, L2>,
621 Unbounded,
622 <O as MinOrder<N::OrderingGuarantee>>::Min,
623 R,
624 >::collection_kind()),
625 },
626 );
627
628 raw_stream
629 .map(q!(|(sender, (k, v))| ((sender, k), v)))
630 .into_keyed()
631 }
632}