1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39 pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43 fn id(&self) -> LocationId {
44 LocationId::Atomic(Box::new(self.tick.id()))
45 }
46
47 fn flow_state(&self) -> &FlowState {
48 self.tick.flow_state()
49 }
50
51 fn is_top_level() -> bool {
52 L::is_top_level()
53 }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58 L: Location<'a>,
59{
60 type Root = L::Root;
61
62 fn root(&self) -> Self::Root {
63 self.tick.root()
64 }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71 fn defer_tick(self) -> Self;
72}
73
74#[derive(Clone)]
76pub struct Tick<L> {
77 pub(crate) id: usize,
78 pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82 fn id(&self) -> LocationId {
83 LocationId::Tick(self.id, Box::new(self.l.id()))
84 }
85
86 fn flow_state(&self) -> &FlowState {
87 self.l.flow_state()
88 }
89
90 fn is_top_level() -> bool {
91 false
92 }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97 L: Location<'a>,
98{
99 type Root = L::Root;
100
101 fn root(&self) -> Self::Root {
102 self.l.root()
103 }
104}
105
106impl<'a, L> Tick<L>
107where
108 L: Location<'a>,
109{
110 pub fn outer(&self) -> &L {
111 &self.l
112 }
113
114 pub fn spin_batch(
115 &self,
116 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
117 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
118 where
119 L: NoTick,
120 {
121 let out = self
122 .l
123 .spin()
124 .flat_map_ordered(q!(move |_| 0..batch_size))
125 .map(q!(|_| ()));
126
127 out.batch(self, nondet!())
128 }
129
130 pub fn singleton<T>(
131 &self,
132 e: impl QuotedWithContext<'a, T, Tick<L>>,
133 ) -> Singleton<T, Self, Bounded>
134 where
135 T: Clone,
136 {
137 let e = e.splice_untyped_ctx(self);
138
139 Singleton::new(
140 self.clone(),
141 HydroNode::SingletonSource {
142 value: e.into(),
143 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
144 },
145 )
146 }
147
148 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
165 let e = q!([]);
166 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
167
168 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
169 self.clone(),
170 HydroNode::Source {
171 source: HydroSource::Iter(e.into()),
172 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
173 },
174 );
175
176 unit_optional.map(q!(|_| unreachable!())) }
178
179 pub fn optional_first_tick<T: Clone>(
203 &self,
204 e: impl QuotedWithContext<'a, T, Tick<L>>,
205 ) -> Optional<T, Self, Bounded> {
206 let e_arr = q!([e]);
207 let e = e_arr.splice_untyped_ctx(self);
208
209 Optional::new(
210 self.clone(),
211 HydroNode::Batch {
212 inner: Box::new(HydroNode::Source {
213 source: HydroSource::Iter(e.into()),
214 metadata: self
215 .outer()
216 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
217 }),
218 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
219 },
220 )
221 }
222
223 #[expect(
224 private_bounds,
225 reason = "only Hydro collections can implement ReceiverComplete"
226 )]
227 pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
228 where
229 S: CycleCollection<'a, ForwardRef, Location = Self>,
230 L: NoTick,
231 {
232 let next_id = self.flow_state().borrow_mut().next_cycle_id();
233 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
234
235 (
236 ForwardHandle {
237 completed: false,
238 ident: ident.clone(),
239 expected_location: Location::id(self),
240 _phantom: PhantomData,
241 },
242 S::create_source(ident, self.clone()),
243 )
244 }
245
246 #[expect(
247 private_bounds,
248 reason = "only Hydro collections can implement ReceiverComplete"
249 )]
250 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
251 where
252 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
253 L: NoTick,
254 {
255 let next_id = self.flow_state().borrow_mut().next_cycle_id();
256 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
257
258 (
259 TickCycleHandle {
260 completed: false,
261 ident: ident.clone(),
262 expected_location: Location::id(self),
263 _phantom: PhantomData,
264 },
265 S::create_source(ident, self.clone()).defer_tick(),
266 )
267 }
268
269 #[expect(
270 private_bounds,
271 reason = "only Hydro collections can implement ReceiverComplete"
272 )]
273 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
274 where
275 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
276 {
277 let next_id = self.flow_state().borrow_mut().next_cycle_id();
278 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
279
280 (
281 TickCycleHandle {
282 completed: false,
283 ident: ident.clone(),
284 expected_location: Location::id(self),
285 _phantom: PhantomData,
286 },
287 S::create_source_with_initial(ident, initial, self.clone()),
289 )
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use stageleft::q;
296
297 use crate::location::Location;
298 use crate::nondet::nondet;
299 use crate::prelude::FlowBuilder;
300
301 #[test]
302 fn sim_atomic_stream() {
303 let flow = FlowBuilder::new();
304 let node = flow.process::<()>();
305 let external = flow.external::<()>();
306
307 let (input_write, write_req) = node.source_external_bincode(&external);
308 let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
309
310 let tick = node.tick();
311 let atomic_write = write_req.atomic(&tick);
312 let current_state = atomic_write.clone().fold(
313 q!(|| 0),
314 q!(|state: &mut i32, v: i32| {
315 *state += v;
316 }),
317 );
318
319 let write_ack = atomic_write.end_atomic().send_bincode_external(&external);
320 let read_response = read_req
321 .batch(&tick, nondet!())
322 .cross_singleton(current_state.snapshot_atomic(nondet!()))
323 .all_ticks()
324 .send_bincode_external(&external);
325
326 let sim_compiled = flow.sim().compiled();
327 let instances = sim_compiled.exhaustive(async |mut compiled| {
328 let write_send = compiled.connect(&input_write);
329 let read_send = compiled.connect(&input_read);
330 let mut write_ack_recv = compiled.connect(&write_ack);
331 let mut read_response_recv = compiled.connect(&read_response);
332 compiled.launch();
333
334 write_send.send(1);
335 write_ack_recv.assert_yields([1]).await;
336 read_send.send(());
337 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
338 });
339
340 assert_eq!(instances, 1);
341
342 let instances_read_before_write = sim_compiled.exhaustive(async |mut compiled| {
343 let write_send = compiled.connect(&input_write);
344 let read_send = compiled.connect(&input_read);
345 let mut write_ack_recv = compiled.connect(&write_ack);
346 let mut read_response_recv = compiled.connect(&read_response);
347 compiled.launch();
348
349 write_send.send(1);
350 read_send.send(());
351 write_ack_recv.assert_yields([1]).await;
352 let _ = read_response_recv.next().await;
353 });
354
355 assert_eq!(instances_read_before_write, 3); }
357
358 #[test]
359 #[should_panic]
360 fn sim_non_atomic_stream() {
361 let flow = FlowBuilder::new();
363 let node = flow.process::<()>();
364 let external = flow.external::<()>();
365
366 let (input_write, write_req) = node.source_external_bincode(&external);
367 let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
368
369 let current_state = write_req.clone().fold(
370 q!(|| 0),
371 q!(|state: &mut i32, v: i32| {
372 *state += v;
373 }),
374 );
375
376 let write_ack = write_req.send_bincode_external(&external);
377
378 let tick = node.tick();
379 let read_response = read_req
380 .batch(&tick, nondet!())
381 .cross_singleton(current_state.snapshot(&tick, nondet!()))
382 .all_ticks()
383 .send_bincode_external(&external);
384
385 flow.sim().exhaustive(async |mut compiled| {
386 let write_send = compiled.connect(&input_write);
387 let read_send = compiled.connect(&input_read);
388 let mut write_ack_recv = compiled.connect(&write_ack);
389 let mut read_response_recv = compiled.connect(&read_response);
390 compiled.launch();
391
392 write_send.send(1);
393 write_ack_recv.assert_yields([1]).await;
394 read_send.send(());
395
396 if let Some((_, v)) = read_response_recv.next().await {
397 assert_eq!(v, 1);
398 }
399 });
400 }
401}