hydro_lang/stream/
networking.rs

1use std::marker::PhantomData;
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6use syn::parse_quote;
7
8use crate::boundedness::Boundedness;
9use crate::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
10use crate::keyed_singleton::KeyedSingleton;
11use crate::keyed_stream::KeyedStream;
12use crate::location::external_process::ExternalBincodeStream;
13use crate::location::tick::NoAtomic;
14use crate::location::{MembershipEvent, NoTick};
15use crate::staging_util::get_this_crate;
16use crate::stream::ExactlyOnce;
17use crate::{
18    Cluster, External, Location, MemberId, NonDet, Process, Stream, TotalOrder, Unbounded, nondet,
19};
20
21// same as the one in `hydro_std`, but internal use only
22fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
23    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
24) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
25    membership
26        .fold(
27            q!(|| false),
28            q!(|present, event| {
29                match event {
30                    MembershipEvent::Joined => *present = true,
31                    MembershipEvent::Left => *present = false,
32                }
33            }),
34        )
35        .filter_map(q!(|v| if v { Some(()) } else { None }))
36}
37
38pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
39    let root = get_this_crate();
40
41    if is_demux {
42        parse_quote! {
43            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::MemberId<_>, #t_type), _>(
44                |(id, data)| {
45                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
46                }
47            )
48        }
49    } else {
50        parse_quote! {
51            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
52                |data| {
53                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
54                }
55            )
56        }
57    }
58}
59
60fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
61    serialize_bincode_with_type(is_demux, &quote_type::<T>())
62}
63
64pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
65    let root = get_this_crate();
66
67    if let Some(c_type) = tagged {
68        parse_quote! {
69            |res| {
70                let (id, b) = res.unwrap();
71                (#root::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
72            }
73        }
74    } else {
75        parse_quote! {
76            |res| {
77                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
78            }
79        }
80    }
81}
82
83pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
84    deserialize_bincode_with_type(tagged, &quote_type::<T>())
85}
86
87impl<'a, T, L, B: Boundedness, O, R> Stream<T, Cluster<'a, L>, B, O, R> {
88    pub fn send_bincode<L2>(
89        self,
90        other: &Process<'a, L2>,
91    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
92    where
93        T: Serialize + DeserializeOwned,
94    {
95        let serialize_pipeline = Some(serialize_bincode::<T>(false));
96
97        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
98
99        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
100            other.clone(),
101            HydroNode::Network {
102                serialize_fn: serialize_pipeline.map(|e| e.into()),
103                instantiate_fn: DebugInstantiate::Building,
104                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
105                input: Box::new(self.ir_node.into_inner()),
106                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
107            },
108        );
109
110        raw_stream.into_keyed()
111    }
112
113    pub fn broadcast_bincode<L2: 'a>(
114        self,
115        other: &Cluster<'a, L2>,
116        nondet_membership: NonDet,
117    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
118    where
119        T: Clone + Serialize + DeserializeOwned,
120    {
121        let ids = track_membership(self.location.source_cluster_members(other));
122        let join_tick = self.location.tick();
123        let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
124
125        current_members
126            .weaker_retries()
127            .assume_ordering::<TotalOrder>(
128                nondet!(/** we send to each member independently, order does not matter */),
129            )
130            .cross_product_nested_loop(
131                self.batch(&join_tick, nondet_membership)
132                    .assume_ordering::<TotalOrder>(
133                        nondet!(/** we weaken the ordering back later */),
134                    ),
135            )
136            .assume_ordering::<O>(nondet!(/** strictly weaker than TotalOrder */))
137            .all_ticks()
138            .demux_bincode(other)
139    }
140}
141
142impl<'a, T, L, L2, B: Boundedness, O, R> Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R> {
143    pub fn demux_bincode(
144        self,
145        other: &Cluster<'a, L2>,
146    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
147    where
148        T: Serialize + DeserializeOwned,
149    {
150        self.into_keyed().demux_bincode(other)
151    }
152}
153
154impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R> {
155    pub fn demux_bincode(
156        self,
157        other: &Cluster<'a, L2>,
158    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
159    where
160        T: Serialize + DeserializeOwned,
161    {
162        let serialize_pipeline = Some(serialize_bincode::<T>(true));
163
164        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
165
166        Stream::new(
167            other.clone(),
168            HydroNode::Network {
169                serialize_fn: serialize_pipeline.map(|e| e.into()),
170                instantiate_fn: DebugInstantiate::Building,
171                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
172                input: Box::new(self.underlying.ir_node.into_inner()),
173                metadata: other.new_node_metadata::<T>(),
174            },
175        )
176    }
177}
178
179impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
180    pub fn round_robin_bincode<L2: 'a>(
181        self,
182        other: &Cluster<'a, L2>,
183        nondet_membership: NonDet,
184    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
185    where
186        T: Serialize + DeserializeOwned,
187    {
188        let ids = track_membership(self.location.source_cluster_members(other));
189        let join_tick = self.location.tick();
190        let current_members = ids
191            .snapshot(&join_tick, nondet_membership)
192            .keys()
193            .assume_ordering(
194                nondet!(/** safe to assume ordering because each output is independent */),
195            )
196            .collect_vec();
197
198        self.enumerate()
199            .batch(&join_tick, nondet_membership)
200            .cross_singleton(current_members)
201            .map(q!(|(data, members)| (
202                members[data.0 % members.len()],
203                data.1
204            )))
205            .all_ticks()
206            .demux_bincode(other)
207    }
208}
209
210impl<'a, T, L, L2, B: Boundedness, O, R> Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R> {
211    pub fn demux_bincode(
212        self,
213        other: &Cluster<'a, L2>,
214    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
215    where
216        T: Serialize + DeserializeOwned,
217    {
218        self.into_keyed().demux_bincode(other)
219    }
220}
221
222impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R> {
223    pub fn demux_bincode(
224        self,
225        other: &Cluster<'a, L2>,
226    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
227    where
228        T: Serialize + DeserializeOwned,
229    {
230        let serialize_pipeline = Some(serialize_bincode::<T>(true));
231
232        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
233
234        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
235            other.clone(),
236            HydroNode::Network {
237                serialize_fn: serialize_pipeline.map(|e| e.into()),
238                instantiate_fn: DebugInstantiate::Building,
239                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
240                input: Box::new(self.underlying.ir_node.into_inner()),
241                metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
242            },
243        );
244
245        raw_stream.into_keyed()
246    }
247}
248
249impl<'a, T, L, B: Boundedness, O, R> Stream<T, Process<'a, L>, B, O, R> {
250    pub fn send_bincode<L2>(
251        self,
252        other: &Process<'a, L2>,
253    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
254    where
255        T: Serialize + DeserializeOwned,
256    {
257        let serialize_pipeline = Some(serialize_bincode::<T>(false));
258
259        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
260
261        Stream::new(
262            other.clone(),
263            HydroNode::Network {
264                serialize_fn: serialize_pipeline.map(|e| e.into()),
265                instantiate_fn: DebugInstantiate::Building,
266                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
267                input: Box::new(self.ir_node.into_inner()),
268                metadata: other.new_node_metadata::<T>(),
269            },
270        )
271    }
272
273    pub fn broadcast_bincode<L2: 'a>(
274        self,
275        other: &Cluster<'a, L2>,
276        nondet_membership: NonDet,
277    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
278    where
279        T: Clone + Serialize + DeserializeOwned,
280    {
281        let ids = track_membership(self.location.source_cluster_members(other));
282        let join_tick = self.location.tick();
283        let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
284
285        current_members
286            .weaker_retries()
287            .assume_ordering::<TotalOrder>(
288                nondet!(/** we send to each member independently, order does not matter */),
289            )
290            .cross_product_nested_loop(
291                self.batch(&join_tick, nondet_membership)
292                    .assume_ordering::<TotalOrder>(
293                        nondet!(/** we weaken the ordering back later */),
294                    ),
295            )
296            .assume_ordering::<O>(nondet!(/** strictly weaker than TotalOrder */))
297            .all_ticks()
298            .demux_bincode(other)
299    }
300
301    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
302    where
303        T: Serialize + DeserializeOwned,
304    {
305        let serialize_pipeline = Some(serialize_bincode::<T>(false));
306
307        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
308
309        let external_key = flow_state_borrow.next_external_out;
310        flow_state_borrow.next_external_out += 1;
311
312        let roots = flow_state_borrow.roots.as_mut().expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled()");
313
314        roots.push(HydroRoot::SendExternal {
315            to_external_id: other.id,
316            to_key: external_key,
317            to_many: false,
318            serialize_fn: serialize_pipeline.map(|e| e.into()),
319            instantiate_fn: DebugInstantiate::Building,
320            input: Box::new(HydroNode::Unpersist {
321                inner: Box::new(self.ir_node.into_inner()),
322                metadata: self.location.new_node_metadata::<T>(),
323            }),
324            op_metadata: HydroIrOpMetadata::new(),
325        });
326
327        ExternalBincodeStream {
328            process_id: other.id,
329            port_id: external_key,
330            _phantom: PhantomData,
331        }
332    }
333}