1use std::marker::PhantomData;
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6use syn::parse_quote;
7
8use crate::boundedness::Boundedness;
9use crate::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
10use crate::keyed_singleton::KeyedSingleton;
11use crate::keyed_stream::KeyedStream;
12use crate::location::external_process::ExternalBincodeStream;
13use crate::location::tick::NoAtomic;
14use crate::location::{MembershipEvent, NoTick};
15use crate::staging_util::get_this_crate;
16use crate::stream::ExactlyOnce;
17use crate::{
18 Cluster, External, Location, MemberId, NonDet, Process, Stream, TotalOrder, Unbounded, nondet,
19};
20
21fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
23 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
24) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
25 membership
26 .fold(
27 q!(|| false),
28 q!(|present, event| {
29 match event {
30 MembershipEvent::Joined => *present = true,
31 MembershipEvent::Left => *present = false,
32 }
33 }),
34 )
35 .filter_map(q!(|v| if v { Some(()) } else { None }))
36}
37
38pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
39 let root = get_this_crate();
40
41 if is_demux {
42 parse_quote! {
43 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::MemberId<_>, #t_type), _>(
44 |(id, data)| {
45 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
46 }
47 )
48 }
49 } else {
50 parse_quote! {
51 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
52 |data| {
53 #root::runtime_support::bincode::serialize(&data).unwrap().into()
54 }
55 )
56 }
57 }
58}
59
60fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
61 serialize_bincode_with_type(is_demux, "e_type::<T>())
62}
63
64pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
65 let root = get_this_crate();
66
67 if let Some(c_type) = tagged {
68 parse_quote! {
69 |res| {
70 let (id, b) = res.unwrap();
71 (#root::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
72 }
73 }
74 } else {
75 parse_quote! {
76 |res| {
77 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
78 }
79 }
80 }
81}
82
83pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
84 deserialize_bincode_with_type(tagged, "e_type::<T>())
85}
86
87impl<'a, T, L, B: Boundedness, O, R> Stream<T, Cluster<'a, L>, B, O, R> {
88 pub fn send_bincode<L2>(
89 self,
90 other: &Process<'a, L2>,
91 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
92 where
93 T: Serialize + DeserializeOwned,
94 {
95 let serialize_pipeline = Some(serialize_bincode::<T>(false));
96
97 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
98
99 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
100 other.clone(),
101 HydroNode::Network {
102 serialize_fn: serialize_pipeline.map(|e| e.into()),
103 instantiate_fn: DebugInstantiate::Building,
104 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
105 input: Box::new(self.ir_node.into_inner()),
106 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
107 },
108 );
109
110 raw_stream.into_keyed()
111 }
112
113 pub fn broadcast_bincode<L2: 'a>(
114 self,
115 other: &Cluster<'a, L2>,
116 nondet_membership: NonDet,
117 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
118 where
119 T: Clone + Serialize + DeserializeOwned,
120 {
121 let ids = track_membership(self.location.source_cluster_members(other));
122 let join_tick = self.location.tick();
123 let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
124
125 current_members
126 .weaker_retries()
127 .assume_ordering::<TotalOrder>(
128 nondet!(),
129 )
130 .cross_product_nested_loop(
131 self.batch(&join_tick, nondet_membership)
132 .assume_ordering::<TotalOrder>(
133 nondet!(),
134 ),
135 )
136 .assume_ordering::<O>(nondet!())
137 .all_ticks()
138 .demux_bincode(other)
139 }
140}
141
142impl<'a, T, L, L2, B: Boundedness, O, R> Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R> {
143 pub fn demux_bincode(
144 self,
145 other: &Cluster<'a, L2>,
146 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
147 where
148 T: Serialize + DeserializeOwned,
149 {
150 self.into_keyed().demux_bincode(other)
151 }
152}
153
154impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R> {
155 pub fn demux_bincode(
156 self,
157 other: &Cluster<'a, L2>,
158 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
159 where
160 T: Serialize + DeserializeOwned,
161 {
162 let serialize_pipeline = Some(serialize_bincode::<T>(true));
163
164 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
165
166 Stream::new(
167 other.clone(),
168 HydroNode::Network {
169 serialize_fn: serialize_pipeline.map(|e| e.into()),
170 instantiate_fn: DebugInstantiate::Building,
171 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
172 input: Box::new(self.underlying.ir_node.into_inner()),
173 metadata: other.new_node_metadata::<T>(),
174 },
175 )
176 }
177}
178
179impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
180 pub fn round_robin_bincode<L2: 'a>(
181 self,
182 other: &Cluster<'a, L2>,
183 nondet_membership: NonDet,
184 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
185 where
186 T: Serialize + DeserializeOwned,
187 {
188 let ids = track_membership(self.location.source_cluster_members(other));
189 let join_tick = self.location.tick();
190 let current_members = ids
191 .snapshot(&join_tick, nondet_membership)
192 .keys()
193 .assume_ordering(
194 nondet!(),
195 )
196 .collect_vec();
197
198 self.enumerate()
199 .batch(&join_tick, nondet_membership)
200 .cross_singleton(current_members)
201 .map(q!(|(data, members)| (
202 members[data.0 % members.len()],
203 data.1
204 )))
205 .all_ticks()
206 .demux_bincode(other)
207 }
208}
209
210impl<'a, T, L, L2, B: Boundedness, O, R> Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R> {
211 pub fn demux_bincode(
212 self,
213 other: &Cluster<'a, L2>,
214 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
215 where
216 T: Serialize + DeserializeOwned,
217 {
218 self.into_keyed().demux_bincode(other)
219 }
220}
221
222impl<'a, T, L, L2, B: Boundedness, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R> {
223 pub fn demux_bincode(
224 self,
225 other: &Cluster<'a, L2>,
226 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
227 where
228 T: Serialize + DeserializeOwned,
229 {
230 let serialize_pipeline = Some(serialize_bincode::<T>(true));
231
232 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
233
234 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
235 other.clone(),
236 HydroNode::Network {
237 serialize_fn: serialize_pipeline.map(|e| e.into()),
238 instantiate_fn: DebugInstantiate::Building,
239 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
240 input: Box::new(self.underlying.ir_node.into_inner()),
241 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
242 },
243 );
244
245 raw_stream.into_keyed()
246 }
247}
248
249impl<'a, T, L, B: Boundedness, O, R> Stream<T, Process<'a, L>, B, O, R> {
250 pub fn send_bincode<L2>(
251 self,
252 other: &Process<'a, L2>,
253 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
254 where
255 T: Serialize + DeserializeOwned,
256 {
257 let serialize_pipeline = Some(serialize_bincode::<T>(false));
258
259 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
260
261 Stream::new(
262 other.clone(),
263 HydroNode::Network {
264 serialize_fn: serialize_pipeline.map(|e| e.into()),
265 instantiate_fn: DebugInstantiate::Building,
266 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
267 input: Box::new(self.ir_node.into_inner()),
268 metadata: other.new_node_metadata::<T>(),
269 },
270 )
271 }
272
273 pub fn broadcast_bincode<L2: 'a>(
274 self,
275 other: &Cluster<'a, L2>,
276 nondet_membership: NonDet,
277 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
278 where
279 T: Clone + Serialize + DeserializeOwned,
280 {
281 let ids = track_membership(self.location.source_cluster_members(other));
282 let join_tick = self.location.tick();
283 let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
284
285 current_members
286 .weaker_retries()
287 .assume_ordering::<TotalOrder>(
288 nondet!(),
289 )
290 .cross_product_nested_loop(
291 self.batch(&join_tick, nondet_membership)
292 .assume_ordering::<TotalOrder>(
293 nondet!(),
294 ),
295 )
296 .assume_ordering::<O>(nondet!())
297 .all_ticks()
298 .demux_bincode(other)
299 }
300
301 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
302 where
303 T: Serialize + DeserializeOwned,
304 {
305 let serialize_pipeline = Some(serialize_bincode::<T>(false));
306
307 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
308
309 let external_key = flow_state_borrow.next_external_out;
310 flow_state_borrow.next_external_out += 1;
311
312 let roots = flow_state_borrow.roots.as_mut().expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled()");
313
314 roots.push(HydroRoot::SendExternal {
315 to_external_id: other.id,
316 to_key: external_key,
317 to_many: false,
318 serialize_fn: serialize_pipeline.map(|e| e.into()),
319 instantiate_fn: DebugInstantiate::Building,
320 input: Box::new(HydroNode::Unpersist {
321 inner: Box::new(self.ir_node.into_inner()),
322 metadata: self.location.new_node_metadata::<T>(),
323 }),
324 op_metadata: HydroIrOpMetadata::new(),
325 });
326
327 ExternalBincodeStream {
328 process_id: other.id,
329 port_id: external_key,
330 _phantom: PhantomData,
331 }
332 }
333}