1use stageleft::{QuotedWithContext, q};
18
19#[cfg(stageleft_runtime)]
20use super::dynamic::DynLocation;
21use super::{Location, LocationId};
22use crate::compile::builder::{ClockId, FlowState};
23use crate::compile::ir::{HydroNode, HydroSource};
24#[cfg(stageleft_runtime)]
25use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
26use crate::forward_handle::{TickCycle, TickCycleHandle};
27use crate::live_collections::Singleton;
28use crate::live_collections::boundedness::Bounded;
29use crate::live_collections::optional::Optional;
30use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
31use crate::location::TopLevel;
32use crate::nondet::{NonDet, nondet};
33
34#[derive(Clone)]
45pub struct Atomic<Loc> {
46 pub(crate) tick: Tick<Loc>,
47}
48
49impl<L: DynLocation> DynLocation for Atomic<L> {
50 fn dyn_id(&self) -> LocationId {
51 LocationId::Atomic(Box::new(self.tick.dyn_id()))
52 }
53
54 fn flow_state(&self) -> &FlowState {
55 self.tick.flow_state()
56 }
57
58 fn is_top_level() -> bool {
59 L::is_top_level()
60 }
61
62 fn multiversioned(&self) -> bool {
63 self.tick.multiversioned()
64 }
65
66 fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
67 L::cluster_consistency()
68 }
69}
70
71impl<'a, L> Location<'a> for Atomic<L>
72where
73 L: Location<'a>,
74{
75 type Root = L::Root;
76
77 type DropConsistency = Atomic<L::DropConsistency>;
78
79 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
80 L::consistency()
81 }
82
83 fn root(&self) -> Self::Root {
84 self.tick.root()
85 }
86
87 fn drop_consistency(&self) -> Self::DropConsistency {
88 Atomic {
89 tick: self.tick.drop_consistency(),
90 }
91 }
92
93 fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
94 Atomic {
95 tick: Tick::from_drop_consistency(l2.tick),
96 }
97 }
98}
99
100pub trait DeferTick {
107 fn defer_tick(self) -> Self;
109}
110
111#[derive(Clone)]
113pub struct Tick<L> {
114 pub(crate) id: ClockId,
115 pub(crate) l: L,
117}
118
119impl<L: DynLocation> DynLocation for Tick<L> {
120 fn dyn_id(&self) -> LocationId {
121 LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
122 }
123
124 fn flow_state(&self) -> &FlowState {
125 self.l.flow_state()
126 }
127
128 fn is_top_level() -> bool {
129 false
130 }
131
132 fn multiversioned(&self) -> bool {
133 self.l.multiversioned()
134 }
135
136 fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
137 L::cluster_consistency()
138 }
139}
140
141impl<'a, L> Location<'a> for Tick<L>
142where
143 L: Location<'a>,
144{
145 type Root = L::Root;
146
147 type DropConsistency = Tick<L::DropConsistency>;
148
149 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
150 L::consistency()
151 }
152
153 fn root(&self) -> Self::Root {
154 self.l.root()
155 }
156
157 fn drop_consistency(&self) -> Self::DropConsistency {
158 Tick {
159 id: self.id,
160 l: self.l.drop_consistency(),
161 }
162 }
163
164 fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
165 Tick {
166 id: l2.id,
167 l: L::from_drop_consistency(l2.l),
168 }
169 }
170}
171
172impl<'a, L> Tick<L>
173where
174 L: Location<'a>,
175{
176 pub fn outer(&self) -> &L {
181 &self.l
182 }
183
184 pub fn spin_batch(
190 &self,
191 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
192 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
193 where
194 L: TopLevel<'a>,
195 {
196 let out = self
197 .l
198 .spin()
199 .flat_map_ordered(q!(move |_| 0..batch_size))
200 .map(q!(|_| ()));
201
202 let inner = out.batch(self, nondet!());
203 Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
204 }
205
206 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
225 let e = q!([]);
226 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
227
228 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
229 self.clone(),
230 HydroNode::Source {
231 source: HydroSource::Iter(e.into()),
232 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
233 },
234 );
235
236 unit_optional.map(q!(|_| unreachable!())) }
238
239 pub fn optional_first_tick<T: Clone>(
265 &self,
266 e: impl QuotedWithContext<'a, T, Tick<L>>,
267 ) -> Optional<T, Self, Bounded> {
268 let e = e.splice_untyped_ctx(self);
269
270 Optional::new(
271 self.clone(),
272 HydroNode::SingletonSource {
273 value: e.into(),
274 first_tick_only: true,
275 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
276 },
277 )
278 }
279
280 pub fn current_tick_instant(
288 &self,
289 _nondet: NonDet,
290 ) -> Singleton<tokio::time::Instant, Tick<L::DropConsistency>, Bounded>
291 where
292 Self: Sized,
293 {
294 self.singleton(q!(tokio::time::Instant::now()))
296 }
297
298 #[expect(
307 private_bounds,
308 reason = "only Hydro collections can implement ReceiverComplete"
309 )]
310 pub fn cycle<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
311 &self,
312 ) -> (TickCycleHandle<'a, S>, S)
313 where
314 S: CycleCollection<'a, TickCycle, Location = L2> + DeferTick,
315 {
316 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
317 (
318 TickCycleHandle::new(cycle_id, Location::id(self)),
319 S::create_source(cycle_id, self.clone().with_consistency_of()).defer_tick(),
320 )
321 }
322
323 #[expect(
330 private_bounds,
331 reason = "only Hydro collections can implement ReceiverComplete"
332 )]
333 pub fn cycle_with_initial<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
334 &self,
335 initial: S,
336 ) -> (TickCycleHandle<'a, S>, S)
337 where
338 S: CycleCollectionWithInitial<'a, TickCycle, Location = L2>,
339 {
340 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
341 (
342 TickCycleHandle::new(cycle_id, Location::id(self)),
343 S::create_source_with_initial(cycle_id, initial, self.clone().with_consistency_of()),
345 )
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 #[cfg(feature = "sim")]
352 use stageleft::q;
353
354 #[cfg(feature = "sim")]
355 use crate::live_collections::sliced::sliced;
356 #[cfg(feature = "sim")]
357 use crate::location::Location;
358 #[cfg(feature = "sim")]
359 use crate::nondet::nondet;
360 #[cfg(feature = "sim")]
361 use crate::prelude::FlowBuilder;
362
363 #[cfg(feature = "sim")]
364 #[test]
365 fn sim_atomic_stream() {
366 let mut flow = FlowBuilder::new();
367 let node = flow.process::<()>();
368
369 let (write_send, write_req) = node.sim_input();
370 let (read_send, read_req) = node.sim_input::<(), _, _>();
371
372 let atomic_write = write_req.atomic();
373 let current_state = atomic_write.clone().fold(
374 q!(|| 0),
375 q!(|state: &mut i32, v: i32| {
376 *state += v;
377 }),
378 );
379
380 let write_ack_recv = atomic_write.end_atomic().sim_output();
381 let read_response_recv = sliced! {
382 let batch_of_req = use(read_req, nondet!());
383 let latest_singleton = use::atomic(current_state, nondet!());
384 batch_of_req.cross_singleton(latest_singleton)
385 }
386 .sim_output();
387
388 let sim_compiled = flow.sim().compiled();
389 let instances = sim_compiled.exhaustive(async || {
390 write_send.send(1);
391 write_ack_recv.assert_yields([1]).await;
392 read_send.send(());
393 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
394 });
395
396 assert_eq!(instances, 1);
397
398 let instances_read_before_write = sim_compiled.exhaustive(async || {
399 write_send.send(1);
400 read_send.send(());
401 write_ack_recv.assert_yields([1]).await;
402 let _ = read_response_recv.next().await;
403 });
404
405 assert_eq!(instances_read_before_write, 3); }
407
408 #[cfg(feature = "sim")]
409 #[test]
410 #[should_panic]
411 fn sim_non_atomic_stream() {
412 let mut flow = FlowBuilder::new();
414 let node = flow.process::<()>();
415
416 let (write_send, write_req) = node.sim_input();
417 let (read_send, read_req) = node.sim_input::<(), _, _>();
418
419 let current_state = write_req.clone().fold(
420 q!(|| 0),
421 q!(|state: &mut i32, v: i32| {
422 *state += v;
423 }),
424 );
425
426 let write_ack_recv = write_req.sim_output();
427
428 let read_response_recv = sliced! {
429 let batch_of_req = use(read_req, nondet!());
430 let latest_singleton = use(current_state, nondet!());
431 batch_of_req.cross_singleton(latest_singleton)
432 }
433 .sim_output();
434
435 flow.sim().exhaustive(async || {
436 write_send.send(1);
437 write_ack_recv.assert_yields([1]).await;
438 read_send.send(());
439
440 if let Some((_, v)) = read_response_recv.next().await {
441 assert_eq!(v, 1);
442 }
443 });
444 }
445}