hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::quote_type;
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
11use crate::live_collections::stream::{Ordering, Retries, Stream};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
20 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
21 ///
22 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
23 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
24 /// API allows precise targeting of specific cluster members rather than broadcasting to
25 /// all members.
26 ///
27 /// # Example
28 /// ```rust
29 /// # use hydro_lang::prelude::*;
30 /// # use futures::StreamExt;
31 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
32 /// let p1 = flow.process::<()>();
33 /// let workers: Cluster<()> = flow.cluster::<()>();
34 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
35 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
36 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
37 /// .into_keyed()
38 /// .demux_bincode(&workers);
39 /// # on_worker.send_bincode(&p2).entries()
40 /// // if there are 4 members in the cluster, each receives one element
41 /// // - MemberId::<()>(0): [0]
42 /// // - MemberId::<()>(1): [1]
43 /// // - MemberId::<()>(2): [2]
44 /// // - MemberId::<()>(3): [3]
45 /// # }, |mut stream| async move {
46 /// # let mut results = Vec::new();
47 /// # for w in 0..4 {
48 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
49 /// # }
50 /// # results.sort();
51 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
52 /// # }));
53 /// ```
54 pub fn demux_bincode(
55 self,
56 other: &Cluster<'a, L2>,
57 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
58 where
59 T: Serialize + DeserializeOwned,
60 {
61 let serialize_pipeline = Some(serialize_bincode::<T>(true));
62
63 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
64
65 Stream::new(
66 other.clone(),
67 HydroNode::Network {
68 serialize_fn: serialize_pipeline.map(|e| e.into()),
69 instantiate_fn: DebugInstantiate::Building,
70 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
71 input: Box::new(self.underlying.ir_node.into_inner()),
72 metadata: other.new_node_metadata::<T>(),
73 },
74 )
75 }
76}
77
78impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
79 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
80{
81 /// Sends each group of this stream at each source member to a specific member of a destination
82 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
83 /// [`bincode`] to serialize/deserialize messages.
84 ///
85 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
86 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
87 /// API allows precise targeting of specific cluster members rather than broadcasting to all
88 /// members.
89 ///
90 /// Each cluster member sends its local stream elements, and they are collected at each
91 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
92 ///
93 /// # Example
94 /// ```rust
95 /// # use hydro_lang::prelude::*;
96 /// # use futures::StreamExt;
97 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
98 /// # type Source = ();
99 /// # type Destination = ();
100 /// let source: Cluster<Source> = flow.cluster::<Source>();
101 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
102 /// .source_iter(q!(vec![0, 1, 2, 3]))
103 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
104 /// .into_keyed();
105 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
106 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
107 /// # all_received.entries().send_bincode(&p2).entries()
108 /// # }, |mut stream| async move {
109 /// // if there are 4 members in the destination cluster, each receives one message from each source member
110 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
111 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
112 /// // - ...
113 /// # let mut results = Vec::new();
114 /// # for w in 0..16 {
115 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
116 /// # }
117 /// # results.sort();
118 /// # assert_eq!(results, vec![
119 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
120 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
121 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
122 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
123 /// # ]);
124 /// # }));
125 /// ```
126 pub fn demux_bincode(
127 self,
128 other: &Cluster<'a, L2>,
129 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
130 where
131 T: Serialize + DeserializeOwned,
132 {
133 let serialize_pipeline = Some(serialize_bincode::<T>(true));
134
135 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
136
137 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
138 other.clone(),
139 HydroNode::Network {
140 serialize_fn: serialize_pipeline.map(|e| e.into()),
141 instantiate_fn: DebugInstantiate::Building,
142 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
143 input: Box::new(self.underlying.ir_node.into_inner()),
144 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
145 },
146 );
147
148 raw_stream.into_keyed()
149 }
150}