Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
20    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22    ///
23    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25    /// API allows precise targeting of specific cluster members rather than broadcasting to
26    /// all members.
27    ///
28    /// # Example
29    /// ```rust
30    /// # #[cfg(feature = "deploy")] {
31    /// # use hydro_lang::prelude::*;
32    /// # use futures::StreamExt;
33    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34    /// let p1 = flow.process::<()>();
35    /// let workers: Cluster<()> = flow.cluster::<()>();
36    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39    ///     .into_keyed()
40    ///     .demux_bincode(&workers);
41    /// # on_worker.send_bincode(&p2).entries()
42    /// // if there are 4 members in the cluster, each receives one element
43    /// // - MemberId::<()>(0): [0]
44    /// // - MemberId::<()>(1): [1]
45    /// // - MemberId::<()>(2): [2]
46    /// // - MemberId::<()>(3): [3]
47    /// # }, |mut stream| async move {
48    /// # let mut results = Vec::new();
49    /// # for w in 0..4 {
50    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
51    /// # }
52    /// # results.sort();
53    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54    /// # }));
55    /// # }
56    /// ```
57    pub fn demux_bincode(
58        self,
59        other: &Cluster<'a, L2>,
60    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61    where
62        T: Serialize + DeserializeOwned,
63    {
64        self.demux(other, TCP.fail_stop().bincode())
65    }
66
67    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68    /// identifying the recipient for each group and using the configuration in `via` to set up the
69    /// message transport.
70    ///
71    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73    /// API allows precise targeting of specific cluster members rather than broadcasting to
74    /// all members.
75    ///
76    /// # Example
77    /// ```rust
78    /// # #[cfg(feature = "deploy")] {
79    /// # use hydro_lang::prelude::*;
80    /// # use futures::StreamExt;
81    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82    /// let p1 = flow.process::<()>();
83    /// let workers: Cluster<()> = flow.cluster::<()>();
84    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87    ///     .into_keyed()
88    ///     .demux(&workers, TCP.fail_stop().bincode());
89    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
90    /// // if there are 4 members in the cluster, each receives one element
91    /// // - MemberId::<()>(0): [0]
92    /// // - MemberId::<()>(1): [1]
93    /// // - MemberId::<()>(2): [2]
94    /// // - MemberId::<()>(3): [3]
95    /// # }, |mut stream| async move {
96    /// # let mut results = Vec::new();
97    /// # for w in 0..4 {
98    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
99    /// # }
100    /// # results.sort();
101    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102    /// # }));
103    /// # }
104    /// ```
105    pub fn demux<N: NetworkFor<T>>(
106        self,
107        to: &Cluster<'a, L2>,
108        via: N,
109    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
110    where
111        T: Serialize + DeserializeOwned,
112    {
113        let serialize_pipeline = Some(N::serialize_thunk(true));
114
115        let deserialize_pipeline = Some(N::deserialize_thunk(None));
116
117        let name = via.name();
118        if to.multiversioned() && name.is_none() {
119            panic!(
120                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
121            );
122        }
123
124        Stream::new(
125            to.clone(),
126            HydroNode::Network {
127                name: name.map(ToOwned::to_owned),
128                networking_info: N::networking_info(),
129                serialize_fn: serialize_pipeline.map(|e| e.into()),
130                instantiate_fn: DebugInstantiate::Building,
131                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
132                input: Box::new(self.ir_node.into_inner()),
133                metadata: to.new_node_metadata(
134                    Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
135                ),
136            },
137        )
138    }
139}
140
141impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
142    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
143{
144    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
145    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
146    /// compound key where the first element is the recipient's [`MemberId`] and the second element
147    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
148    /// messages.
149    ///
150    /// # Example
151    /// ```rust
152    /// # #[cfg(feature = "deploy")] {
153    /// # use hydro_lang::prelude::*;
154    /// # use futures::StreamExt;
155    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
156    /// let p1 = flow.process::<()>();
157    /// let workers: Cluster<()> = flow.cluster::<()>();
158    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
159    ///     .source_iter(q!(vec![0, 1, 2, 3]))
160    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
161    ///     .into_keyed();
162    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
163    /// # on_worker.entries().send_bincode(&p2).entries()
164    /// // if there are 4 members in the cluster, each receives one element
165    /// // - MemberId::<()>(0): { 0: [123] }
166    /// // - MemberId::<()>(1): { 1: [124] }
167    /// // - ...
168    /// # }, |mut stream| async move {
169    /// # let mut results = Vec::new();
170    /// # for w in 0..4 {
171    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
172    /// # }
173    /// # results.sort();
174    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
175    /// # }));
176    /// # }
177    /// ```
178    pub fn demux_bincode(
179        self,
180        other: &Cluster<'a, L2>,
181    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
182    where
183        K: Serialize + DeserializeOwned,
184        T: Serialize + DeserializeOwned,
185    {
186        self.demux(other, TCP.fail_stop().bincode())
187    }
188
189    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
190    /// compound key where the first element is the recipient's [`MemberId`] and the second element
191    /// is a key that will be sent along with the value, using the configuration in `via` to set up
192    /// the message transport.
193    ///
194    /// # Example
195    /// ```rust
196    /// # #[cfg(feature = "deploy")] {
197    /// # use hydro_lang::prelude::*;
198    /// # use futures::StreamExt;
199    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
200    /// let p1 = flow.process::<()>();
201    /// let workers: Cluster<()> = flow.cluster::<()>();
202    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
203    ///     .source_iter(q!(vec![0, 1, 2, 3]))
204    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
205    ///     .into_keyed();
206    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
207    /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
208    /// // if there are 4 members in the cluster, each receives one element
209    /// // - MemberId::<()>(0): { 0: [123] }
210    /// // - MemberId::<()>(1): { 1: [124] }
211    /// // - ...
212    /// # }, |mut stream| async move {
213    /// # let mut results = Vec::new();
214    /// # for w in 0..4 {
215    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
216    /// # }
217    /// # results.sort();
218    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
219    /// # }));
220    /// # }
221    /// ```
222    pub fn demux<N: NetworkFor<(K, T)>>(
223        self,
224        to: &Cluster<'a, L2>,
225        via: N,
226    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
227    where
228        K: Serialize + DeserializeOwned,
229        T: Serialize + DeserializeOwned,
230    {
231        let serialize_pipeline = Some(N::serialize_thunk(true));
232
233        let deserialize_pipeline = Some(N::deserialize_thunk(None));
234
235        let name = via.name();
236        if to.multiversioned() && name.is_none() {
237            panic!(
238                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
239            );
240        }
241
242        KeyedStream::new(
243            to.clone(),
244            HydroNode::Network {
245                name: name.map(ToOwned::to_owned),
246                networking_info: N::networking_info(),
247                serialize_fn: serialize_pipeline.map(|e| e.into()),
248                instantiate_fn: DebugInstantiate::Building,
249                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
250                input: Box::new(
251                    self.entries()
252                        .map(q!(|((id, k), v)| (id, (k, v))))
253                        .ir_node
254                        .into_inner(),
255                ),
256                metadata: to.new_node_metadata(
257                    KeyedStream::<K, T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
258                ),
259            },
260        )
261    }
262}
263
264impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
265    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
266{
267    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
268    /// Sends each group of this stream at each source member to a specific member of a destination
269    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
270    /// [`bincode`] to serialize/deserialize messages.
271    ///
272    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
273    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
274    /// API allows precise targeting of specific cluster members rather than broadcasting to all
275    /// members.
276    ///
277    /// Each cluster member sends its local stream elements, and they are collected at each
278    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
279    ///
280    /// # Example
281    /// ```rust
282    /// # #[cfg(feature = "deploy")] {
283    /// # use hydro_lang::prelude::*;
284    /// # use futures::StreamExt;
285    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
286    /// # type Source = ();
287    /// # type Destination = ();
288    /// let source: Cluster<Source> = flow.cluster::<Source>();
289    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
290    ///     .source_iter(q!(vec![0, 1, 2, 3]))
291    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
292    ///     .into_keyed();
293    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
294    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
295    /// # all_received.entries().send_bincode(&p2).entries()
296    /// # }, |mut stream| async move {
297    /// // if there are 4 members in the destination cluster, each receives one message from each source member
298    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
299    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
300    /// // - ...
301    /// # let mut results = Vec::new();
302    /// # for w in 0..16 {
303    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
304    /// # }
305    /// # results.sort();
306    /// # assert_eq!(results, vec![
307    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
308    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
309    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
310    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
311    /// # ]);
312    /// # }));
313    /// # }
314    /// ```
315    pub fn demux_bincode(
316        self,
317        other: &Cluster<'a, L2>,
318    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
319    where
320        T: Serialize + DeserializeOwned,
321    {
322        self.demux(other, TCP.fail_stop().bincode())
323    }
324
325    /// Sends each group of this stream at each source member to a specific member of a destination
326    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
327    /// configuration in `via` to set up the message transport.
328    ///
329    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
330    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
331    /// API allows precise targeting of specific cluster members rather than broadcasting to all
332    /// members.
333    ///
334    /// Each cluster member sends its local stream elements, and they are collected at each
335    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
336    ///
337    /// # Example
338    /// ```rust
339    /// # #[cfg(feature = "deploy")] {
340    /// # use hydro_lang::prelude::*;
341    /// # use futures::StreamExt;
342    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
343    /// # type Source = ();
344    /// # type Destination = ();
345    /// let source: Cluster<Source> = flow.cluster::<Source>();
346    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
347    ///     .source_iter(q!(vec![0, 1, 2, 3]))
348    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
349    ///     .into_keyed();
350    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
351    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
352    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
353    /// # }, |mut stream| async move {
354    /// // if there are 4 members in the destination cluster, each receives one message from each source member
355    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
356    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
357    /// // - ...
358    /// # let mut results = Vec::new();
359    /// # for w in 0..16 {
360    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
361    /// # }
362    /// # results.sort();
363    /// # assert_eq!(results, vec![
364    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
365    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
366    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
367    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
368    /// # ]);
369    /// # }));
370    /// # }
371    /// ```
372    pub fn demux<N: NetworkFor<T>>(
373        self,
374        to: &Cluster<'a, L2>,
375        via: N,
376    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
377    where
378        T: Serialize + DeserializeOwned,
379    {
380        let serialize_pipeline = Some(N::serialize_thunk(true));
381
382        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
383
384        let name = via.name();
385        if to.multiversioned() && name.is_none() {
386            panic!(
387                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
388            );
389        }
390
391        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
392            to.clone(),
393            HydroNode::Network {
394                name: name.map(ToOwned::to_owned),
395                networking_info: N::networking_info(),
396                serialize_fn: serialize_pipeline.map(|e| e.into()),
397                instantiate_fn: DebugInstantiate::Building,
398                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
399                input: Box::new(self.ir_node.into_inner()),
400                metadata: to.new_node_metadata(Stream::<
401                    (MemberId<L>, T),
402                    Cluster<'a, L2>,
403                    Unbounded,
404                    O,
405                    R,
406                >::collection_kind()),
407            },
408        );
409
410        raw_stream.into_keyed()
411    }
412}
413
414impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
415    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
416{
417    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
418    #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
419    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
420    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
421    /// has a compound key where the first element is the sender's [`MemberId`] and the second
422    /// element is the original key.
423    ///
424    /// # Example
425    /// ```rust
426    /// # #[cfg(feature = "deploy")] {
427    /// # use hydro_lang::prelude::*;
428    /// # use futures::StreamExt;
429    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
430    /// # type Source = ();
431    /// # type Destination = ();
432    /// let source: Cluster<Source> = flow.cluster::<Source>();
433    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
434    ///     .source_iter(q!(vec![0, 1, 2, 3]))
435    ///     .map(q!(|x| (x, x + 123)))
436    ///     .into_keyed();
437    /// let destination_process = flow.process::<Destination>();
438    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
439    /// # all_received.entries().send_bincode(&p2)
440    /// # }, |mut stream| async move {
441    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
442    /// // {
443    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
444    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
445    /// //     ...
446    /// // }
447    /// # let mut results = Vec::new();
448    /// # for w in 0..16 {
449    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
450    /// # }
451    /// # results.sort();
452    /// # assert_eq!(results, vec![
453    /// #   "((MemberId::<()>(0), 0), 123)",
454    /// #   "((MemberId::<()>(0), 1), 124)",
455    /// #   "((MemberId::<()>(0), 2), 125)",
456    /// #   "((MemberId::<()>(0), 3), 126)",
457    /// #   "((MemberId::<()>(1), 0), 123)",
458    /// #   "((MemberId::<()>(1), 1), 124)",
459    /// #   "((MemberId::<()>(1), 2), 125)",
460    /// #   "((MemberId::<()>(1), 3), 126)",
461    /// #   "((MemberId::<()>(2), 0), 123)",
462    /// #   "((MemberId::<()>(2), 1), 124)",
463    /// #   "((MemberId::<()>(2), 2), 125)",
464    /// #   "((MemberId::<()>(2), 3), 126)",
465    /// #   "((MemberId::<()>(3), 0), 123)",
466    /// #   "((MemberId::<()>(3), 1), 124)",
467    /// #   "((MemberId::<()>(3), 2), 125)",
468    /// #   "((MemberId::<()>(3), 3), 126)",
469    /// # ]);
470    /// # }));
471    /// # }
472    /// ```
473    pub fn send_bincode<L2>(
474        self,
475        other: &Process<'a, L2>,
476    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
477    where
478        K: Serialize + DeserializeOwned,
479        V: Serialize + DeserializeOwned,
480    {
481        self.send(other, TCP.fail_stop().bincode())
482    }
483
484    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
485    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
486    /// network, using the configuration in `via` to set up the message transport. The resulting
487    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
488    /// the second element is the original key.
489    ///
490    /// # Example
491    /// ```rust
492    /// # #[cfg(feature = "deploy")] {
493    /// # use hydro_lang::prelude::*;
494    /// # use futures::StreamExt;
495    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
496    /// # type Source = ();
497    /// # type Destination = ();
498    /// let source: Cluster<Source> = flow.cluster::<Source>();
499    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
500    ///     .source_iter(q!(vec![0, 1, 2, 3]))
501    ///     .map(q!(|x| (x, x + 123)))
502    ///     .into_keyed();
503    /// let destination_process = flow.process::<Destination>();
504    /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
505    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
506    /// # }, |mut stream| async move {
507    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
508    /// // {
509    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
510    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
511    /// //     ...
512    /// // }
513    /// # let mut results = Vec::new();
514    /// # for w in 0..16 {
515    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
516    /// # }
517    /// # results.sort();
518    /// # assert_eq!(results, vec![
519    /// #   "((MemberId::<()>(0), 0), 123)",
520    /// #   "((MemberId::<()>(0), 1), 124)",
521    /// #   "((MemberId::<()>(0), 2), 125)",
522    /// #   "((MemberId::<()>(0), 3), 126)",
523    /// #   "((MemberId::<()>(1), 0), 123)",
524    /// #   "((MemberId::<()>(1), 1), 124)",
525    /// #   "((MemberId::<()>(1), 2), 125)",
526    /// #   "((MemberId::<()>(1), 3), 126)",
527    /// #   "((MemberId::<()>(2), 0), 123)",
528    /// #   "((MemberId::<()>(2), 1), 124)",
529    /// #   "((MemberId::<()>(2), 2), 125)",
530    /// #   "((MemberId::<()>(2), 3), 126)",
531    /// #   "((MemberId::<()>(3), 0), 123)",
532    /// #   "((MemberId::<()>(3), 1), 124)",
533    /// #   "((MemberId::<()>(3), 2), 125)",
534    /// #   "((MemberId::<()>(3), 3), 126)",
535    /// # ]);
536    /// # }));
537    /// # }
538    /// ```
539    pub fn send<L2, N: NetworkFor<(K, V)>>(
540        self,
541        to: &Process<'a, L2>,
542        via: N,
543    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
544    where
545        K: Serialize + DeserializeOwned,
546        V: Serialize + DeserializeOwned,
547    {
548        let serialize_pipeline = Some(N::serialize_thunk(false));
549
550        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
551
552        let name = via.name();
553        if to.multiversioned() && name.is_none() {
554            panic!(
555                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
556            );
557        }
558
559        let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
560            Stream::new(
561                to.clone(),
562                HydroNode::Network {
563                    name: name.map(ToOwned::to_owned),
564                    networking_info: N::networking_info(),
565                    serialize_fn: serialize_pipeline.map(|e| e.into()),
566                    instantiate_fn: DebugInstantiate::Building,
567                    deserialize_fn: deserialize_pipeline.map(|e| e.into()),
568                    input: Box::new(self.ir_node.into_inner()),
569                    metadata: to.new_node_metadata(Stream::<
570                        (MemberId<L>, (K, V)),
571                        Cluster<'a, L2>,
572                        Unbounded,
573                        O,
574                        R,
575                    >::collection_kind()),
576                },
577            );
578
579        raw_stream
580            .map(q!(|(sender, (k, v))| ((sender, k), v)))
581            .into_keyed()
582    }
583}