1use std::fmt::{Debug, Formatter};
2use std::marker::PhantomData;
3
4use proc_macro2::Span;
5use quote::quote;
6use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
7use stageleft::{QuotedWithContext, quote_type};
8
9use super::dynamic::LocationId;
10use super::{Location, MemberId};
11use crate::compile::builder::FlowState;
12use crate::staging_util::{Invariant, get_this_crate};
13
14pub struct Cluster<'a, ClusterTag> {
15 pub(crate) id: usize,
16 pub(crate) flow_state: FlowState,
17 pub(crate) _phantom: Invariant<'a, ClusterTag>,
18}
19
20impl<C> Debug for Cluster<'_, C> {
21 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
22 write!(f, "Cluster({})", self.id)
23 }
24}
25
26impl<C> Eq for Cluster<'_, C> {}
27impl<C> PartialEq for Cluster<'_, C> {
28 fn eq(&self, other: &Self) -> bool {
29 self.id == other.id && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
30 }
31}
32
33impl<C> Clone for Cluster<'_, C> {
34 fn clone(&self) -> Self {
35 Cluster {
36 id: self.id,
37 flow_state: self.flow_state.clone(),
38 _phantom: PhantomData,
39 }
40 }
41}
42
43impl<'a, C> super::dynamic::DynLocation for Cluster<'a, C> {
44 fn id(&self) -> LocationId {
45 LocationId::Cluster(self.id)
46 }
47
48 fn flow_state(&self) -> &FlowState {
49 &self.flow_state
50 }
51
52 fn is_top_level() -> bool {
53 true
54 }
55}
56
57impl<'a, C> Location<'a> for Cluster<'a, C> {
58 type Root = Cluster<'a, C>;
59
60 fn root(&self) -> Self::Root {
61 self.clone()
62 }
63}
64
65pub struct ClusterIds<'a, C> {
66 pub(crate) id: usize,
67 pub(crate) _phantom: Invariant<'a, C>,
68}
69
70impl<C> Clone for ClusterIds<'_, C> {
71 fn clone(&self) -> Self {
72 *self
73 }
74}
75
76impl<C> Copy for ClusterIds<'_, C> {}
77
78impl<'a, C: 'a, Ctx> FreeVariableWithContext<Ctx> for ClusterIds<'a, C> {
79 type O = &'a [MemberId<C>];
80
81 fn to_tokens(self, _ctx: &Ctx) -> QuoteTokens
82 where
83 Self: Sized,
84 {
85 let ident = syn::Ident::new(
86 &format!("__hydro_lang_cluster_ids_{}", self.id),
87 Span::call_site(),
88 );
89 let root = get_this_crate();
90 let c_type = quote_type::<C>();
91
92 QuoteTokens {
93 prelude: None,
94 expr: Some(
95 quote! { unsafe { ::std::mem::transmute::<_, &[#root::__staged::location::MemberId<#c_type>]>(#ident) } },
96 ),
97 }
98 }
99}
100
101impl<'a, C, Ctx> QuotedWithContext<'a, &'a [MemberId<C>], Ctx> for ClusterIds<'a, C> {}
102
103pub trait IsCluster {
104 type Tag;
105}
106
107impl<C> IsCluster for Cluster<'_, C> {
108 type Tag = C;
109}
110
111pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
114
115#[derive(Clone, Copy)]
116pub struct ClusterSelfId<'a> {
117 _private: &'a (),
118}
119
120impl<'a, L> FreeVariableWithContext<L> for ClusterSelfId<'a>
121where
122 L: Location<'a>,
123 <L as Location<'a>>::Root: IsCluster,
124{
125 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
126
127 fn to_tokens(self, ctx: &L) -> QuoteTokens
128 where
129 Self: Sized,
130 {
131 let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
132 id
133 } else {
134 unreachable!()
135 };
136
137 let ident = syn::Ident::new(
138 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
139 Span::call_site(),
140 );
141 let root = get_this_crate();
142 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
143
144 QuoteTokens {
145 prelude: None,
146 expr: Some(quote! { #root::location::MemberId::<#c_type>::from_raw(#ident) }),
147 }
148 }
149}
150
151impl<'a, L> QuotedWithContext<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L>
152 for ClusterSelfId<'a>
153where
154 L: Location<'a>,
155 <L as Location<'a>>::Root: IsCluster,
156{
157}
158
159#[cfg(test)]
160mod tests {
161 use std::collections::HashMap;
162
163 use stageleft::q;
164
165 use super::CLUSTER_SELF_ID;
166 use crate::location::{Location, MembershipEvent};
167 use crate::nondet::nondet;
168 use crate::prelude::FlowBuilder;
169
170 #[test]
171 fn sim_cluster_self_id() {
172 let flow = FlowBuilder::new();
173 let cluster1 = flow.cluster::<()>();
174 let cluster2 = flow.cluster::<()>();
175
176 let node = flow.process::<()>();
177 let external = flow.external::<()>();
178
179 let out_port = cluster1
180 .source_iter(q!(vec![CLUSTER_SELF_ID.raw_id]))
181 .send_bincode(&node)
182 .values()
183 .interleave(
184 cluster2
185 .source_iter(q!(vec![CLUSTER_SELF_ID.raw_id]))
186 .send_bincode(&node)
187 .values(),
188 )
189 .send_bincode_external(&external);
190
191 flow.sim()
192 .with_cluster_size(&cluster1, 3)
193 .with_cluster_size(&cluster2, 4)
194 .exhaustive(async |mut compiled| {
195 let out_recv = compiled.connect(&out_port);
196 compiled.launch();
197
198 out_recv
199 .assert_yields_only_unordered(vec![0, 1, 2, 0, 1, 2, 3])
200 .await
201 });
202 }
203
204 #[test]
205 fn sim_cluster_with_tick() {
206 let flow = FlowBuilder::new();
207 let cluster = flow.cluster::<()>();
208 let node = flow.process::<()>();
209 let external = flow.external::<()>();
210
211 let out_port = cluster
212 .source_iter(q!(vec![1, 2, 3]))
213 .batch(&cluster.tick(), nondet!())
214 .count()
215 .all_ticks()
216 .send_bincode(&node)
217 .entries()
218 .map(q!(|(id, v)| (id.raw_id, v)))
219 .send_bincode_external(&external);
220
221 let count = flow
222 .sim()
223 .with_cluster_size(&cluster, 2)
224 .exhaustive(async |mut compiled| {
225 let out_recv = compiled.connect(&out_port);
226 compiled.launch();
227
228 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
229 HashMap::new(),
230 |mut acc: HashMap<u32, usize>, (id, v)| {
231 *acc.entry(id).or_default() += v;
232 acc
233 },
234 );
235
236 assert!(grouped.len() == 2);
237 for (_id, v) in grouped {
238 assert!(v == 3);
239 }
240 });
241
242 assert_eq!(count, 106);
243 }
247
248 #[test]
249 fn sim_cluster_membership() {
250 let flow = FlowBuilder::new();
251 let cluster = flow.cluster::<()>();
252 let node = flow.process::<()>();
253 let external = flow.external::<()>();
254
255 let out_port = node
256 .source_cluster_members(&cluster)
257 .entries()
258 .map(q!(|(id, v)| (id.raw_id, v)))
259 .send_bincode_external(&external);
260
261 flow.sim()
262 .with_cluster_size(&cluster, 2)
263 .exhaustive(async |mut compiled| {
264 let out_recv = compiled.connect(&out_port);
265 compiled.launch();
266
267 out_recv
268 .assert_yields_only_unordered(vec![
269 (0, MembershipEvent::Joined),
270 (1, MembershipEvent::Joined),
271 ])
272 .await;
273 });
274 }
275}