Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11use crate::location::cluster::{Consistency, NoConsistency};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15use crate::networking::{NetworkFor, TCP};
16
17impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
18    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
19{
20    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
21    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
22    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
23    ///
24    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
25    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
26    /// API allows precise targeting of specific cluster members rather than broadcasting to
27    /// all members.
28    ///
29    /// # Example
30    /// ```rust
31    /// # #[cfg(feature = "deploy")] {
32    /// # use hydro_lang::prelude::*;
33    /// # use futures::StreamExt;
34    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
35    /// let p1 = flow.process::<()>();
36    /// let workers: Cluster<()> = flow.cluster::<()>();
37    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
38    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
39    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
40    ///     .into_keyed()
41    ///     .demux_bincode(&workers);
42    /// # on_worker.send_bincode(&p2).entries()
43    /// // if there are 4 members in the cluster, each receives one element
44    /// // - MemberId::<()>(0): [0]
45    /// // - MemberId::<()>(1): [1]
46    /// // - MemberId::<()>(2): [2]
47    /// // - MemberId::<()>(3): [3]
48    /// # }, |mut stream| async move {
49    /// # let mut results = Vec::new();
50    /// # for w in 0..4 {
51    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
52    /// # }
53    /// # results.sort();
54    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
55    /// # }));
56    /// # }
57    /// ```
58    pub fn demux_bincode(
59        self,
60        other: &Cluster<'a, L2>,
61    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
62    where
63        T: Serialize + DeserializeOwned,
64    {
65        self.demux(other, TCP.fail_stop().bincode())
66    }
67
68    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
69    /// identifying the recipient for each group and using the configuration in `via` to set up the
70    /// message transport.
71    ///
72    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
73    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
74    /// API allows precise targeting of specific cluster members rather than broadcasting to
75    /// all members.
76    ///
77    /// # Example
78    /// ```rust
79    /// # #[cfg(feature = "deploy")] {
80    /// # use hydro_lang::prelude::*;
81    /// # use futures::StreamExt;
82    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
83    /// let p1 = flow.process::<()>();
84    /// let workers: Cluster<()> = flow.cluster::<()>();
85    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
86    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
87    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
88    ///     .into_keyed()
89    ///     .demux(&workers, TCP.fail_stop().bincode());
90    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
91    /// // if there are 4 members in the cluster, each receives one element
92    /// // - MemberId::<()>(0): [0]
93    /// // - MemberId::<()>(1): [1]
94    /// // - MemberId::<()>(2): [2]
95    /// // - MemberId::<()>(3): [3]
96    /// # }, |mut stream| async move {
97    /// # let mut results = Vec::new();
98    /// # for w in 0..4 {
99    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
100    /// # }
101    /// # results.sort();
102    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
103    /// # }));
104    /// # }
105    /// ```
106    #[expect(clippy::type_complexity, reason = "DropConsistency type")]
107    pub fn demux<N: NetworkFor<T>>(
108        self,
109        to: &Cluster<'a, L2>,
110        via: N,
111    ) -> Stream<
112        T,
113        // NoConsistency because there each replica member may receive different streams
114        Cluster<'a, L2, NoConsistency>,
115        Unbounded,
116        <O as MinOrder<N::OrderingGuarantee>>::Min,
117        R,
118    >
119    where
120        T: Serialize + DeserializeOwned,
121        O: MinOrder<N::OrderingGuarantee>,
122    {
123        let serialize_pipeline = Some(N::serialize_thunk(true));
124
125        let deserialize_pipeline = Some(N::deserialize_thunk(None));
126
127        let name = via.name();
128        if to.multiversioned() && name.is_none() {
129            panic!(
130                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
131            );
132        }
133
134        Stream::new(
135            to.clone(),
136            HydroNode::Network {
137                name: name.map(ToOwned::to_owned),
138                networking_info: N::networking_info(),
139                serialize_fn: serialize_pipeline.map(|e| e.into()),
140                instantiate_fn: DebugInstantiate::Building,
141                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
142                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
143                metadata: to.new_node_metadata(Stream::<
144                    T,
145                    Cluster<'a, L2>,
146                    Unbounded,
147                    <O as MinOrder<N::OrderingGuarantee>>::Min,
148                    R,
149                >::collection_kind()),
150            },
151        )
152    }
153}
154
155impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
156    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
157{
158    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
159    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
160    /// compound key where the first element is the recipient's [`MemberId`] and the second element
161    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
162    /// messages.
163    ///
164    /// # Example
165    /// ```rust
166    /// # #[cfg(feature = "deploy")] {
167    /// # use hydro_lang::prelude::*;
168    /// # use futures::StreamExt;
169    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
170    /// let p1 = flow.process::<()>();
171    /// let workers: Cluster<()> = flow.cluster::<()>();
172    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
173    ///     .source_iter(q!(vec![0, 1, 2, 3]))
174    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
175    ///     .into_keyed();
176    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
177    /// # on_worker.entries().send_bincode(&p2).entries()
178    /// // if there are 4 members in the cluster, each receives one element
179    /// // - MemberId::<()>(0): { 0: [123] }
180    /// // - MemberId::<()>(1): { 1: [124] }
181    /// // - ...
182    /// # }, |mut stream| async move {
183    /// # let mut results = Vec::new();
184    /// # for w in 0..4 {
185    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
186    /// # }
187    /// # results.sort();
188    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
189    /// # }));
190    /// # }
191    /// ```
192    pub fn demux_bincode(
193        self,
194        other: &Cluster<'a, L2>,
195    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
196    where
197        K: Serialize + DeserializeOwned,
198        T: Serialize + DeserializeOwned,
199    {
200        self.demux(other, TCP.fail_stop().bincode())
201    }
202
203    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
204    /// compound key where the first element is the recipient's [`MemberId`] and the second element
205    /// is a key that will be sent along with the value, using the configuration in `via` to set up
206    /// the message transport.
207    ///
208    /// # Example
209    /// ```rust
210    /// # #[cfg(feature = "deploy")] {
211    /// # use hydro_lang::prelude::*;
212    /// # use futures::StreamExt;
213    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214    /// let p1 = flow.process::<()>();
215    /// let workers: Cluster<()> = flow.cluster::<()>();
216    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
217    ///     .source_iter(q!(vec![0, 1, 2, 3]))
218    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
219    ///     .into_keyed();
220    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
221    /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
222    /// // if there are 4 members in the cluster, each receives one element
223    /// // - MemberId::<()>(0): { 0: [123] }
224    /// // - MemberId::<()>(1): { 1: [124] }
225    /// // - ...
226    /// # }, |mut stream| async move {
227    /// # let mut results = Vec::new();
228    /// # for w in 0..4 {
229    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
230    /// # }
231    /// # results.sort();
232    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
233    /// # }));
234    /// # }
235    /// ```
236    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
237    pub fn demux<N: NetworkFor<(K, T)>>(
238        self,
239        to: &Cluster<'a, L2>,
240        via: N,
241    ) -> KeyedStream<
242        K,
243        T,
244        Cluster<'a, L2, NoConsistency>,
245        Unbounded,
246        <O as MinOrder<N::OrderingGuarantee>>::Min,
247        R,
248    >
249    where
250        K: Serialize + DeserializeOwned,
251        T: Serialize + DeserializeOwned,
252        O: MinOrder<N::OrderingGuarantee>,
253    {
254        let serialize_pipeline = Some(N::serialize_thunk(true));
255
256        let deserialize_pipeline = Some(N::deserialize_thunk(None));
257
258        let name = via.name();
259        if to.multiversioned() && name.is_none() {
260            panic!(
261                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
262            );
263        }
264
265        KeyedStream::new(
266            to.clone(),
267            HydroNode::Network {
268                name: name.map(ToOwned::to_owned),
269                networking_info: N::networking_info(),
270                serialize_fn: serialize_pipeline.map(|e| e.into()),
271                instantiate_fn: DebugInstantiate::Building,
272                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
273                input: Box::new(
274                    self.entries()
275                        .map(q!(|((id, k), v)| (id, (k, v))))
276                        .ir_node
277                        .replace(HydroNode::Placeholder),
278                ),
279                metadata: to.new_node_metadata(KeyedStream::<
280                    K,
281                    T,
282                    Cluster<'a, L2>,
283                    Unbounded,
284                    <O as MinOrder<N::OrderingGuarantee>>::Min,
285                    R,
286                >::collection_kind()),
287            },
288        )
289    }
290}
291
292impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
293    KeyedStream<MemberId<L2>, T, Cluster<'a, L, C>, B, O, R>
294{
295    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
296    /// Sends each group of this stream at each source member to a specific member of a destination
297    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
298    /// [`bincode`] to serialize/deserialize messages.
299    ///
300    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
301    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
302    /// API allows precise targeting of specific cluster members rather than broadcasting to all
303    /// members.
304    ///
305    /// Each cluster member sends its local stream elements, and they are collected at each
306    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
307    ///
308    /// # Example
309    /// ```rust
310    /// # #[cfg(feature = "deploy")] {
311    /// # use hydro_lang::prelude::*;
312    /// # use futures::StreamExt;
313    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
314    /// # type Source = ();
315    /// # type Destination = ();
316    /// let source: Cluster<Source> = flow.cluster::<Source>();
317    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
318    ///     .source_iter(q!(vec![0, 1, 2, 3]))
319    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
320    ///     .into_keyed();
321    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
322    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
323    /// # all_received.entries().send_bincode(&p2).entries()
324    /// # }, |mut stream| async move {
325    /// // if there are 4 members in the destination cluster, each receives one message from each source member
326    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
327    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
328    /// // - ...
329    /// # let mut results = Vec::new();
330    /// # for w in 0..16 {
331    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
332    /// # }
333    /// # results.sort();
334    /// # assert_eq!(results, vec![
335    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
336    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
337    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
338    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
339    /// # ]);
340    /// # }));
341    /// # }
342    /// ```
343    pub fn demux_bincode(
344        self,
345        other: &Cluster<'a, L2>,
346    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
347    where
348        T: Serialize + DeserializeOwned,
349    {
350        self.demux(other, TCP.fail_stop().bincode())
351    }
352
353    /// Sends each group of this stream at each source member to a specific member of a destination
354    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
355    /// configuration in `via` to set up the message transport.
356    ///
357    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
358    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
359    /// API allows precise targeting of specific cluster members rather than broadcasting to all
360    /// members.
361    ///
362    /// Each cluster member sends its local stream elements, and they are collected at each
363    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
364    ///
365    /// # Example
366    /// ```rust
367    /// # #[cfg(feature = "deploy")] {
368    /// # use hydro_lang::prelude::*;
369    /// # use futures::StreamExt;
370    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
371    /// # type Source = ();
372    /// # type Destination = ();
373    /// let source: Cluster<Source> = flow.cluster::<Source>();
374    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
375    ///     .source_iter(q!(vec![0, 1, 2, 3]))
376    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
377    ///     .into_keyed();
378    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
379    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
380    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
381    /// # }, |mut stream| async move {
382    /// // if there are 4 members in the destination cluster, each receives one message from each source member
383    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
384    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
385    /// // - ...
386    /// # let mut results = Vec::new();
387    /// # for w in 0..16 {
388    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
389    /// # }
390    /// # results.sort();
391    /// # assert_eq!(results, vec![
392    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
393    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
394    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
395    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
396    /// # ]);
397    /// # }));
398    /// # }
399    /// ```
400    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
401    pub fn demux<N: NetworkFor<T>>(
402        self,
403        to: &Cluster<'a, L2>,
404        via: N,
405    ) -> KeyedStream<
406        MemberId<L>,
407        T,
408        Cluster<'a, L2, NoConsistency>,
409        Unbounded,
410        <O as MinOrder<N::OrderingGuarantee>>::Min,
411        R,
412    >
413    where
414        T: Serialize + DeserializeOwned,
415        O: MinOrder<N::OrderingGuarantee>,
416    {
417        let serialize_pipeline = Some(N::serialize_thunk(true));
418
419        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
420
421        let name = via.name();
422        if to.multiversioned() && name.is_none() {
423            panic!(
424                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
425            );
426        }
427
428        KeyedStream::new(
429            to.clone(),
430            HydroNode::Network {
431                name: name.map(ToOwned::to_owned),
432                networking_info: N::networking_info(),
433                serialize_fn: serialize_pipeline.map(|e| e.into()),
434                instantiate_fn: DebugInstantiate::Building,
435                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
436                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
437                metadata: to.new_node_metadata(KeyedStream::<
438                    MemberId<L>,
439                    T,
440                    Cluster<'a, L2>,
441                    Unbounded,
442                    <O as MinOrder<N::OrderingGuarantee>>::Min,
443                    R,
444                >::collection_kind()),
445            },
446        )
447    }
448}
449
450impl<'a, K, V, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
451    KeyedStream<K, V, Cluster<'a, L, C>, B, O, R>
452{
453    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
454    #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
455    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
456    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
457    /// has a compound key where the first element is the sender's [`MemberId`] and the second
458    /// element is the original key.
459    ///
460    /// # Example
461    /// ```rust
462    /// # #[cfg(feature = "deploy")] {
463    /// # use hydro_lang::prelude::*;
464    /// # use futures::StreamExt;
465    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
466    /// # type Source = ();
467    /// # type Destination = ();
468    /// let source: Cluster<Source> = flow.cluster::<Source>();
469    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
470    ///     .source_iter(q!(vec![0, 1, 2, 3]))
471    ///     .map(q!(|x| (x, x + 123)))
472    ///     .into_keyed();
473    /// let destination_process = flow.process::<Destination>();
474    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
475    /// # all_received.entries().send_bincode(&p2)
476    /// # }, |mut stream| async move {
477    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
478    /// // {
479    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
480    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
481    /// //     ...
482    /// // }
483    /// # let mut results = Vec::new();
484    /// # for w in 0..16 {
485    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
486    /// # }
487    /// # results.sort();
488    /// # assert_eq!(results, vec![
489    /// #   "((MemberId::<()>(0), 0), 123)",
490    /// #   "((MemberId::<()>(0), 1), 124)",
491    /// #   "((MemberId::<()>(0), 2), 125)",
492    /// #   "((MemberId::<()>(0), 3), 126)",
493    /// #   "((MemberId::<()>(1), 0), 123)",
494    /// #   "((MemberId::<()>(1), 1), 124)",
495    /// #   "((MemberId::<()>(1), 2), 125)",
496    /// #   "((MemberId::<()>(1), 3), 126)",
497    /// #   "((MemberId::<()>(2), 0), 123)",
498    /// #   "((MemberId::<()>(2), 1), 124)",
499    /// #   "((MemberId::<()>(2), 2), 125)",
500    /// #   "((MemberId::<()>(2), 3), 126)",
501    /// #   "((MemberId::<()>(3), 0), 123)",
502    /// #   "((MemberId::<()>(3), 1), 124)",
503    /// #   "((MemberId::<()>(3), 2), 125)",
504    /// #   "((MemberId::<()>(3), 3), 126)",
505    /// # ]);
506    /// # }));
507    /// # }
508    /// ```
509    pub fn send_bincode<L2>(
510        self,
511        other: &Process<'a, L2>,
512    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
513    where
514        K: Serialize + DeserializeOwned,
515        V: Serialize + DeserializeOwned,
516    {
517        self.send(other, TCP.fail_stop().bincode())
518    }
519
520    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
521    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
522    /// network, using the configuration in `via` to set up the message transport. The resulting
523    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
524    /// the second element is the original key.
525    ///
526    /// # Example
527    /// ```rust
528    /// # #[cfg(feature = "deploy")] {
529    /// # use hydro_lang::prelude::*;
530    /// # use futures::StreamExt;
531    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
532    /// # type Source = ();
533    /// # type Destination = ();
534    /// let source: Cluster<Source> = flow.cluster::<Source>();
535    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
536    ///     .source_iter(q!(vec![0, 1, 2, 3]))
537    ///     .map(q!(|x| (x, x + 123)))
538    ///     .into_keyed();
539    /// let destination_process = flow.process::<Destination>();
540    /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
541    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
542    /// # }, |mut stream| async move {
543    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
544    /// // {
545    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
546    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
547    /// //     ...
548    /// // }
549    /// # let mut results = Vec::new();
550    /// # for w in 0..16 {
551    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
552    /// # }
553    /// # results.sort();
554    /// # assert_eq!(results, vec![
555    /// #   "((MemberId::<()>(0), 0), 123)",
556    /// #   "((MemberId::<()>(0), 1), 124)",
557    /// #   "((MemberId::<()>(0), 2), 125)",
558    /// #   "((MemberId::<()>(0), 3), 126)",
559    /// #   "((MemberId::<()>(1), 0), 123)",
560    /// #   "((MemberId::<()>(1), 1), 124)",
561    /// #   "((MemberId::<()>(1), 2), 125)",
562    /// #   "((MemberId::<()>(1), 3), 126)",
563    /// #   "((MemberId::<()>(2), 0), 123)",
564    /// #   "((MemberId::<()>(2), 1), 124)",
565    /// #   "((MemberId::<()>(2), 2), 125)",
566    /// #   "((MemberId::<()>(2), 3), 126)",
567    /// #   "((MemberId::<()>(3), 0), 123)",
568    /// #   "((MemberId::<()>(3), 1), 124)",
569    /// #   "((MemberId::<()>(3), 2), 125)",
570    /// #   "((MemberId::<()>(3), 3), 126)",
571    /// # ]);
572    /// # }));
573    /// # }
574    /// ```
575    pub fn send<L2, N: NetworkFor<(K, V)>>(
576        self,
577        to: &Process<'a, L2>,
578        via: N,
579    ) -> KeyedStream<
580        (MemberId<L>, K),
581        V,
582        Process<'a, L2>,
583        Unbounded,
584        <O as MinOrder<N::OrderingGuarantee>>::Min,
585        R,
586    >
587    where
588        K: Serialize + DeserializeOwned,
589        V: Serialize + DeserializeOwned,
590        O: MinOrder<N::OrderingGuarantee>,
591    {
592        let serialize_pipeline = Some(N::serialize_thunk(false));
593
594        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
595
596        let name = via.name();
597        if to.multiversioned() && name.is_none() {
598            panic!(
599                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
600            );
601        }
602
603        let raw_stream: Stream<
604            (MemberId<L>, (K, V)),
605            Process<'a, L2>,
606            Unbounded,
607            <O as MinOrder<N::OrderingGuarantee>>::Min,
608            R,
609        > = Stream::new(
610            to.clone(),
611            HydroNode::Network {
612                name: name.map(ToOwned::to_owned),
613                networking_info: N::networking_info(),
614                serialize_fn: serialize_pipeline.map(|e| e.into()),
615                instantiate_fn: DebugInstantiate::Building,
616                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
617                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
618                metadata: to.new_node_metadata(Stream::<
619                    (MemberId<L>, (K, V)),
620                    Cluster<'a, L2>,
621                    Unbounded,
622                    <O as MinOrder<N::OrderingGuarantee>>::Min,
623                    R,
624                >::collection_kind()),
625            },
626        );
627
628        raw_stream
629            .map(q!(|(sender, (k, v))| ((sender, k), v)))
630            .into_keyed()
631    }
632}