hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::quote_type;
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
11use crate::live_collections::stream::{Ordering, Retries, Stream};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
20    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
21    ///
22    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
23    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
24    /// API allows precise targeting of specific cluster members rather than broadcasting to
25    /// all members.
26    ///
27    /// # Example
28    /// ```rust
29    /// # use hydro_lang::prelude::*;
30    /// # use futures::StreamExt;
31    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
32    /// let p1 = flow.process::<()>();
33    /// let workers: Cluster<()> = flow.cluster::<()>();
34    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
35    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
36    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
37    ///     .into_keyed()
38    ///     .demux_bincode(&workers);
39    /// # on_worker.send_bincode(&p2).entries()
40    /// // if there are 4 members in the cluster, each receives one element
41    /// // - MemberId::<()>(0): [0]
42    /// // - MemberId::<()>(1): [1]
43    /// // - MemberId::<()>(2): [2]
44    /// // - MemberId::<()>(3): [3]
45    /// # }, |mut stream| async move {
46    /// # let mut results = Vec::new();
47    /// # for w in 0..4 {
48    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
49    /// # }
50    /// # results.sort();
51    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
52    /// # }));
53    /// ```
54    pub fn demux_bincode(
55        self,
56        other: &Cluster<'a, L2>,
57    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
58    where
59        T: Serialize + DeserializeOwned,
60    {
61        let serialize_pipeline = Some(serialize_bincode::<T>(true));
62
63        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
64
65        Stream::new(
66            other.clone(),
67            HydroNode::Network {
68                serialize_fn: serialize_pipeline.map(|e| e.into()),
69                instantiate_fn: DebugInstantiate::Building,
70                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
71                input: Box::new(self.underlying.ir_node.into_inner()),
72                metadata: other.new_node_metadata::<T>(),
73            },
74        )
75    }
76}
77
78impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
79    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
80{
81    /// Sends each group of this stream at each source member to a specific member of a destination
82    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
83    /// [`bincode`] to serialize/deserialize messages.
84    ///
85    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
86    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
87    /// API allows precise targeting of specific cluster members rather than broadcasting to all
88    /// members.
89    ///
90    /// Each cluster member sends its local stream elements, and they are collected at each
91    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
92    ///
93    /// # Example
94    /// ```rust
95    /// # use hydro_lang::prelude::*;
96    /// # use futures::StreamExt;
97    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
98    /// # type Source = ();
99    /// # type Destination = ();
100    /// let source: Cluster<Source> = flow.cluster::<Source>();
101    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
102    ///     .source_iter(q!(vec![0, 1, 2, 3]))
103    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
104    ///     .into_keyed();
105    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
106    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
107    /// # all_received.entries().send_bincode(&p2).entries()
108    /// # }, |mut stream| async move {
109    /// // if there are 4 members in the destination cluster, each receives one message from each source member
110    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
111    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
112    /// // - ...
113    /// # let mut results = Vec::new();
114    /// # for w in 0..16 {
115    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
116    /// # }
117    /// # results.sort();
118    /// # assert_eq!(results, vec![
119    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
120    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
121    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
122    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
123    /// # ]);
124    /// # }));
125    /// ```
126    pub fn demux_bincode(
127        self,
128        other: &Cluster<'a, L2>,
129    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
130    where
131        T: Serialize + DeserializeOwned,
132    {
133        let serialize_pipeline = Some(serialize_bincode::<T>(true));
134
135        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
136
137        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
138            other.clone(),
139            HydroNode::Network {
140                serialize_fn: serialize_pipeline.map(|e| e.into()),
141                instantiate_fn: DebugInstantiate::Building,
142                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
143                input: Box::new(self.underlying.ir_node.into_inner()),
144                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
145            },
146        );
147
148        raw_stream.into_keyed()
149    }
150}