1use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::LocationKey;
24use crate::location::member_id::TaglessMemberId;
25use crate::staging_util::{Invariant, get_this_crate};
26
27pub struct Cluster<'a, ClusterTag> {
37 pub(crate) key: LocationKey,
38 pub(crate) flow_state: FlowState,
39 pub(crate) _phantom: Invariant<'a, ClusterTag>,
40}
41
42impl<C> Debug for Cluster<'_, C> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 write!(f, "Cluster({})", self.key)
45 }
46}
47
48impl<C> Eq for Cluster<'_, C> {}
49impl<C> PartialEq for Cluster<'_, C> {
50 fn eq(&self, other: &Self) -> bool {
51 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
52 }
53}
54
55impl<C> Clone for Cluster<'_, C> {
56 fn clone(&self) -> Self {
57 Cluster {
58 key: self.key,
59 flow_state: self.flow_state.clone(),
60 _phantom: PhantomData,
61 }
62 }
63}
64
65impl<'a, C> super::dynamic::DynLocation for Cluster<'a, C> {
66 fn id(&self) -> LocationId {
67 LocationId::Cluster(self.key)
68 }
69
70 fn flow_state(&self) -> &FlowState {
71 &self.flow_state
72 }
73
74 fn is_top_level() -> bool {
75 true
76 }
77
78 fn multiversioned(&self) -> bool {
79 false }
81}
82
83impl<'a, C> Location<'a> for Cluster<'a, C> {
84 type Root = Cluster<'a, C>;
85
86 fn root(&self) -> Self::Root {
87 self.clone()
88 }
89}
90
91pub struct ClusterIds<'a> {
96 pub key: LocationKey,
98 pub _phantom: PhantomData<&'a ()>,
100}
101
102impl<'a> Clone for ClusterIds<'a> {
103 fn clone(&self) -> Self {
104 Self {
105 key: self.key,
106 _phantom: Default::default(),
107 }
108 }
109}
110
111impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
112 type O = &'a [TaglessMemberId];
113
114 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
115 where
116 Self: Sized,
117 {
118 let ident = syn::Ident::new(
119 &format!("__hydro_lang_cluster_ids_{}", self.key),
120 Span::call_site(),
121 );
122
123 (
124 QuoteTokens {
125 prelude: None,
126 expr: Some(quote! { #ident }),
127 },
128 (),
129 )
130 }
131}
132
133impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
134
135pub trait IsCluster {
137 type Tag;
139}
140
141impl<C> IsCluster for Cluster<'_, C> {
142 type Tag = C;
143}
144
145pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
148
149#[derive(Clone, Copy)]
154pub struct ClusterSelfId<'a> {
155 _private: &'a (),
156}
157
158impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
159where
160 L: Location<'a>,
161 <L as Location<'a>>::Root: IsCluster,
162{
163 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
164
165 fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
166 where
167 Self: Sized,
168 {
169 let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
170 id
171 } else {
172 unreachable!()
173 };
174
175 let ident = syn::Ident::new(
176 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
177 Span::call_site(),
178 );
179 let root = get_this_crate();
180 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
181
182 (
183 QuoteTokens {
184 prelude: None,
185 expr: Some(
186 quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
187 ),
188 },
189 (),
190 )
191 }
192}
193
194impl<'a, L>
195 QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
196 for ClusterSelfId<'a>
197where
198 L: Location<'a>,
199 <L as Location<'a>>::Root: IsCluster,
200{
201}
202
203#[cfg(test)]
204mod tests {
205 #[cfg(feature = "sim")]
206 use stageleft::q;
207
208 #[cfg(feature = "sim")]
209 use super::CLUSTER_SELF_ID;
210 #[cfg(feature = "sim")]
211 use crate::location::{Location, MemberId, MembershipEvent};
212 #[cfg(feature = "sim")]
213 use crate::networking::TCP;
214 #[cfg(feature = "sim")]
215 use crate::nondet::nondet;
216 #[cfg(feature = "sim")]
217 use crate::prelude::FlowBuilder;
218
219 #[cfg(feature = "sim")]
220 #[test]
221 fn sim_cluster_self_id() {
222 let mut flow = FlowBuilder::new();
223 let cluster1 = flow.cluster::<()>();
224 let cluster2 = flow.cluster::<()>();
225
226 let node = flow.process::<()>();
227
228 let out_recv = cluster1
229 .source_iter(q!(vec![CLUSTER_SELF_ID]))
230 .send(&node, TCP.fail_stop().bincode())
231 .values()
232 .interleave(
233 cluster2
234 .source_iter(q!(vec![CLUSTER_SELF_ID]))
235 .send(&node, TCP.fail_stop().bincode())
236 .values(),
237 )
238 .sim_output();
239
240 flow.sim()
241 .with_cluster_size(&cluster1, 3)
242 .with_cluster_size(&cluster2, 4)
243 .exhaustive(async || {
244 out_recv
245 .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
246 .await
247 });
248 }
249
250 #[cfg(feature = "sim")]
251 #[test]
252 fn sim_cluster_with_tick() {
253 use std::collections::HashMap;
254
255 let mut flow = FlowBuilder::new();
256 let cluster = flow.cluster::<()>();
257 let node = flow.process::<()>();
258
259 let out_recv = cluster
260 .source_iter(q!(vec![1, 2, 3]))
261 .batch(&cluster.tick(), nondet!())
262 .count()
263 .all_ticks()
264 .send(&node, TCP.fail_stop().bincode())
265 .entries()
266 .map(q!(|(id, v)| (id, v)))
267 .sim_output();
268
269 let count = flow
270 .sim()
271 .with_cluster_size(&cluster, 2)
272 .exhaustive(async || {
273 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
274 HashMap::new(),
275 |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
276 *acc.entry(id).or_default() += v;
277 acc
278 },
279 );
280
281 assert!(grouped.len() == 2);
282 for (_id, v) in grouped {
283 assert!(v == 3);
284 }
285 });
286
287 assert_eq!(count, 106);
288 }
292
293 #[cfg(feature = "sim")]
294 #[test]
295 fn sim_cluster_membership() {
296 let mut flow = FlowBuilder::new();
297 let cluster = flow.cluster::<()>();
298 let node = flow.process::<()>();
299
300 let out_recv = node
301 .source_cluster_members(&cluster)
302 .entries()
303 .map(q!(|(id, v)| (id, v)))
304 .sim_output();
305
306 flow.sim()
307 .with_cluster_size(&cluster, 2)
308 .exhaustive(async || {
309 out_recv
310 .assert_yields_only_unordered(vec![
311 (MemberId::from_raw_id(0), MembershipEvent::Joined),
312 (MemberId::from_raw_id(1), MembershipEvent::Joined),
313 ])
314 .await;
315 });
316 }
317}