hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::quote_type;
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
11use crate::live_collections::stream::{Ordering, Retries, Stream};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
20 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
21 ///
22 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
23 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
24 /// API allows precise targeting of specific cluster members rather than broadcasting to
25 /// all members.
26 ///
27 /// # Example
28 /// ```rust
29 /// # use hydro_lang::prelude::*;
30 /// # use futures::StreamExt;
31 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
32 /// let p1 = flow.process::<()>();
33 /// let workers: Cluster<()> = flow.cluster::<()>();
34 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
35 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
36 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
37 /// .into_keyed()
38 /// .demux_bincode(&workers);
39 /// # on_worker.send_bincode(&p2).entries()
40 /// // if there are 4 members in the cluster, each receives one element
41 /// // - MemberId::<()>(0): [0]
42 /// // - MemberId::<()>(1): [1]
43 /// // - MemberId::<()>(2): [2]
44 /// // - MemberId::<()>(3): [3]
45 /// # }, |mut stream| async move {
46 /// # let mut results = Vec::new();
47 /// # for w in 0..4 {
48 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
49 /// # }
50 /// # results.sort();
51 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
52 /// # }));
53 /// ```
54 pub fn demux_bincode(
55 self,
56 other: &Cluster<'a, L2>,
57 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
58 where
59 T: Serialize + DeserializeOwned,
60 {
61 let serialize_pipeline = Some(serialize_bincode::<T>(true));
62
63 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
64
65 Stream::new(
66 other.clone(),
67 HydroNode::Network {
68 serialize_fn: serialize_pipeline.map(|e| e.into()),
69 instantiate_fn: DebugInstantiate::Building,
70 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
71 input: Box::new(self.ir_node.into_inner()),
72 metadata: other.new_node_metadata(
73 Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
74 ),
75 },
76 )
77 }
78}
79
80impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
81 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
82{
83 /// Sends each group of this stream at each source member to a specific member of a destination
84 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
85 /// [`bincode`] to serialize/deserialize messages.
86 ///
87 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
88 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
89 /// API allows precise targeting of specific cluster members rather than broadcasting to all
90 /// members.
91 ///
92 /// Each cluster member sends its local stream elements, and they are collected at each
93 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
94 ///
95 /// # Example
96 /// ```rust
97 /// # use hydro_lang::prelude::*;
98 /// # use futures::StreamExt;
99 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
100 /// # type Source = ();
101 /// # type Destination = ();
102 /// let source: Cluster<Source> = flow.cluster::<Source>();
103 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
104 /// .source_iter(q!(vec![0, 1, 2, 3]))
105 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
106 /// .into_keyed();
107 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
108 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
109 /// # all_received.entries().send_bincode(&p2).entries()
110 /// # }, |mut stream| async move {
111 /// // if there are 4 members in the destination cluster, each receives one message from each source member
112 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
113 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
114 /// // - ...
115 /// # let mut results = Vec::new();
116 /// # for w in 0..16 {
117 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
118 /// # }
119 /// # results.sort();
120 /// # assert_eq!(results, vec![
121 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
122 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
123 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
124 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
125 /// # ]);
126 /// # }));
127 /// ```
128 pub fn demux_bincode(
129 self,
130 other: &Cluster<'a, L2>,
131 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
132 where
133 T: Serialize + DeserializeOwned,
134 {
135 let serialize_pipeline = Some(serialize_bincode::<T>(true));
136
137 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
138
139 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
140 other.clone(),
141 HydroNode::Network {
142 serialize_fn: serialize_pipeline.map(|e| e.into()),
143 instantiate_fn: DebugInstantiate::Building,
144 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
145 input: Box::new(self.ir_node.into_inner()),
146 metadata: other.new_node_metadata(Stream::<
147 (MemberId<L>, T),
148 Cluster<'a, L2>,
149 Unbounded,
150 O,
151 R,
152 >::collection_kind()),
153 },
154 );
155
156 raw_stream.into_keyed()
157 }
158}