Skip to main content

hydro_lang/location/
tick.rs

1//! Clock domains for batching streaming data into discrete time steps.
2//!
3//! In Hydro, a [`Tick`] represents a logical clock that can be used to batch
4//! unbounded streaming data into discrete, bounded time steps. This is essential
5//! for implementing iterative algorithms, synchronizing data across multiple
6//! streams, and performing aggregations over windows of data.
7//!
8//! A tick is created from a top-level location (such as [`super::Process`] or [`super::Cluster`])
9//! using [`Location::tick`]. Once inside a tick, bounded live collections can be
10//! manipulated with operations like fold, reduce, and cross-product, and the
11//! results can be emitted back to the unbounded stream using methods like
12//! `all_ticks()`.
13//!
14//! The [`Atomic`] wrapper provides atomicity guarantees within a tick, ensuring
15//! that reads and writes within a tick are serialized.
16
17use stageleft::{QuotedWithContext, q};
18
19#[cfg(stageleft_runtime)]
20use super::dynamic::DynLocation;
21use super::{Location, LocationId};
22use crate::compile::builder::{ClockId, FlowState};
23use crate::compile::ir::{HydroNode, HydroSource};
24#[cfg(stageleft_runtime)]
25use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
26use crate::forward_handle::{TickCycle, TickCycleHandle};
27use crate::live_collections::Singleton;
28use crate::live_collections::boundedness::Bounded;
29use crate::live_collections::optional::Optional;
30use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
31use crate::location::TopLevel;
32use crate::nondet::{NonDet, nondet};
33
34/// A location wrapper that provides atomicity guarantees within a [`Tick`].
35///
36/// An `Atomic` context establishes a happens-before relationship between operations:
37/// - Downstream computations from `atomic()` are associated with an internal tick
38/// - Outputs from `end_atomic()` are held until all computations in the tick complete
39/// - Snapshots via `use::atomic` are guaranteed to reflect all updates from associated `end_atomic()`
40///
41/// This ensures read-after-write consistency: if a client receives an acknowledgement
42/// from `end_atomic()`, any subsequent `use::atomic` snapshot will include the effects
43/// of that acknowledged operation.
44#[derive(Clone)]
45pub struct Atomic<Loc> {
46    pub(crate) tick: Tick<Loc>,
47}
48
49impl<L: DynLocation> DynLocation for Atomic<L> {
50    fn dyn_id(&self) -> LocationId {
51        LocationId::Atomic(Box::new(self.tick.dyn_id()))
52    }
53
54    fn flow_state(&self) -> &FlowState {
55        self.tick.flow_state()
56    }
57
58    fn is_top_level() -> bool {
59        L::is_top_level()
60    }
61
62    fn multiversioned(&self) -> bool {
63        self.tick.multiversioned()
64    }
65
66    fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
67        L::cluster_consistency()
68    }
69}
70
71impl<'a, L> Location<'a> for Atomic<L>
72where
73    L: Location<'a>,
74{
75    type Root = L::Root;
76
77    type DropConsistency = Atomic<L::DropConsistency>;
78
79    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
80        L::consistency()
81    }
82
83    fn root(&self) -> Self::Root {
84        self.tick.root()
85    }
86
87    fn drop_consistency(&self) -> Self::DropConsistency {
88        Atomic {
89            tick: self.tick.drop_consistency(),
90        }
91    }
92
93    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
94        Atomic {
95            tick: Tick::from_drop_consistency(l2.tick),
96        }
97    }
98}
99
100/// Trait for live collections that can be deferred by one tick.
101///
102/// When a collection implements `DeferTick`, calling `defer_tick` delays its
103/// values by one clock cycle. This is primarily used internally to implement
104/// tick-based cycles ([`Tick::cycle`]), ensuring that feedback loops advance
105/// by one tick to avoid infinite recursion within a single tick.
106pub trait DeferTick {
107    /// Returns a new collection whose values are delayed by one tick.
108    fn defer_tick(self) -> Self;
109}
110
111/// Marks the stream as being inside the single global clock domain.
112#[derive(Clone)]
113pub struct Tick<L> {
114    pub(crate) id: ClockId,
115    /// Location.
116    pub(crate) l: L,
117}
118
119impl<L: DynLocation> DynLocation for Tick<L> {
120    fn dyn_id(&self) -> LocationId {
121        LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
122    }
123
124    fn flow_state(&self) -> &FlowState {
125        self.l.flow_state()
126    }
127
128    fn is_top_level() -> bool {
129        false
130    }
131
132    fn multiversioned(&self) -> bool {
133        self.l.multiversioned()
134    }
135
136    fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
137        L::cluster_consistency()
138    }
139}
140
141impl<'a, L> Location<'a> for Tick<L>
142where
143    L: Location<'a>,
144{
145    type Root = L::Root;
146
147    type DropConsistency = Tick<L::DropConsistency>;
148
149    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
150        L::consistency()
151    }
152
153    fn root(&self) -> Self::Root {
154        self.l.root()
155    }
156
157    fn drop_consistency(&self) -> Self::DropConsistency {
158        Tick {
159            id: self.id,
160            l: self.l.drop_consistency(),
161        }
162    }
163
164    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
165        Tick {
166            id: l2.id,
167            l: L::from_drop_consistency(l2.l),
168        }
169    }
170}
171
172impl<'a, L> Tick<L>
173where
174    L: Location<'a>,
175{
176    /// Returns a reference to the outer (parent) location that this tick is nested within.
177    ///
178    /// For example, if a `Tick` was created from a `Process`, this returns a reference
179    /// to that `Process`.
180    pub fn outer(&self) -> &L {
181        &self.l
182    }
183
184    /// Creates a bounded stream of `()` values inside this tick, with a fixed batch size.
185    ///
186    /// This is useful for driving computations inside a tick that need to process
187    /// a specific number of elements per tick. Each tick will produce exactly
188    /// `batch_size` unit values.
189    pub fn spin_batch(
190        &self,
191        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
192    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
193    where
194        L: TopLevel<'a>,
195    {
196        let out = self
197            .l
198            .spin()
199            .flat_map_ordered(q!(move |_| 0..batch_size))
200            .map(q!(|_| ()));
201
202        let inner = out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */));
203        Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
204    }
205
206    /// Creates an [`Optional`] which has a null value on every tick.
207    ///
208    /// # Example
209    /// ```rust
210    /// # #[cfg(feature = "deploy")] {
211    /// # use hydro_lang::prelude::*;
212    /// # use futures::StreamExt;
213    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
214    /// let tick = process.tick();
215    /// let optional = tick.none::<i32>();
216    /// optional.unwrap_or(tick.singleton(q!(123)))
217    /// # .all_ticks()
218    /// # }, |mut stream| async move {
219    /// // 123
220    /// # assert_eq!(stream.next().await.unwrap(), 123);
221    /// # }));
222    /// # }
223    /// ```
224    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
225        let e = q!([]);
226        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
227
228        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
229            self.clone(),
230            HydroNode::Source {
231                source: HydroSource::Iter(e.into()),
232                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
233            },
234        );
235
236        unit_optional.map(q!(|_| unreachable!())) // always empty
237    }
238
239    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
240    /// null on all subsequent ticks.
241    ///
242    /// This is useful for bootstrapping stateful computations which need an initial value.
243    ///
244    /// # Example
245    /// ```rust
246    /// # #[cfg(feature = "deploy")] {
247    /// # use hydro_lang::prelude::*;
248    /// # use futures::StreamExt;
249    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
250    /// let tick = process.tick();
251    /// // ticks are lazy by default, forces the second tick to run
252    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
253    /// let optional = tick.optional_first_tick(q!(5));
254    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
255    /// # }, |mut stream| async move {
256    /// // 5, 123, 123, 123, ...
257    /// # assert_eq!(stream.next().await.unwrap(), 5);
258    /// # assert_eq!(stream.next().await.unwrap(), 123);
259    /// # assert_eq!(stream.next().await.unwrap(), 123);
260    /// # assert_eq!(stream.next().await.unwrap(), 123);
261    /// # }));
262    /// # }
263    /// ```
264    pub fn optional_first_tick<T: Clone>(
265        &self,
266        e: impl QuotedWithContext<'a, T, Tick<L>>,
267    ) -> Optional<T, Self, Bounded> {
268        let e = e.splice_untyped_ctx(self);
269
270        Optional::new(
271            self.clone(),
272            HydroNode::SingletonSource {
273                value: e.into(),
274                first_tick_only: true,
275                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
276            },
277        )
278    }
279
280    /// Returns the current wall-clock time as a [`Singleton`] containing a
281    /// [`tokio::time::Instant`].
282    ///
283    /// # Non-Determinism
284    /// Reading wall-clock time is inherently non-deterministic because the
285    /// value depends on when the tick executes. A [`NonDet`] guard is required
286    /// to acknowledge this.
287    pub fn current_tick_instant(
288        &self,
289        _nondet: NonDet,
290    ) -> Singleton<tokio::time::Instant, Tick<L::DropConsistency>, Bounded>
291    where
292        Self: Sized,
293    {
294        // TODO(shadaj): this is a simulator hole, should be reported as unsupported until it is
295        self.singleton(q!(tokio::time::Instant::now()))
296    }
297
298    /// Creates a feedback cycle within this tick for implementing iterative computations.
299    ///
300    /// Returns a handle that must be completed with the actual collection, and a placeholder
301    /// collection that represents the output of the previous tick (deferred by one tick).
302    /// This is useful for implementing fixed-point computations where the output of one
303    /// tick feeds into the input of the next.
304    ///
305    /// The cycle automatically defers values by one tick to prevent infinite recursion.
306    #[expect(
307        private_bounds,
308        reason = "only Hydro collections can implement ReceiverComplete"
309    )]
310    pub fn cycle<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
311        &self,
312    ) -> (TickCycleHandle<'a, S>, S)
313    where
314        S: CycleCollection<'a, TickCycle, Location = L2> + DeferTick,
315    {
316        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
317        (
318            TickCycleHandle::new(cycle_id, Location::id(self)),
319            S::create_source(cycle_id, self.clone().with_consistency_of()).defer_tick(),
320        )
321    }
322
323    /// Creates a feedback cycle with an initial value for the first tick.
324    ///
325    /// Similar to [`Tick::cycle`], but allows providing an initial collection
326    /// that will be used as the value on the first tick before any feedback
327    /// is available. This is useful for bootstrapping iterative computations
328    /// that need a starting state.
329    #[expect(
330        private_bounds,
331        reason = "only Hydro collections can implement ReceiverComplete"
332    )]
333    pub fn cycle_with_initial<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
334        &self,
335        initial: S,
336    ) -> (TickCycleHandle<'a, S>, S)
337    where
338        S: CycleCollectionWithInitial<'a, TickCycle, Location = L2>,
339    {
340        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
341        (
342            TickCycleHandle::new(cycle_id, Location::id(self)),
343            // no need to defer_tick, create_source_with_initial does it for us
344            S::create_source_with_initial(cycle_id, initial, self.clone().with_consistency_of()),
345        )
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    #[cfg(feature = "sim")]
352    use stageleft::q;
353
354    #[cfg(feature = "sim")]
355    use crate::live_collections::sliced::sliced;
356    #[cfg(feature = "sim")]
357    use crate::location::Location;
358    #[cfg(feature = "sim")]
359    use crate::nondet::nondet;
360    #[cfg(feature = "sim")]
361    use crate::prelude::FlowBuilder;
362
363    #[cfg(feature = "sim")]
364    #[test]
365    fn sim_atomic_stream() {
366        let mut flow = FlowBuilder::new();
367        let node = flow.process::<()>();
368
369        let (write_send, write_req) = node.sim_input();
370        let (read_send, read_req) = node.sim_input::<(), _, _>();
371
372        let atomic_write = write_req.atomic();
373        let current_state = atomic_write.clone().fold(
374            q!(|| 0),
375            q!(|state: &mut i32, v: i32| {
376                *state += v;
377            }),
378        );
379
380        let write_ack_recv = atomic_write.end_atomic().sim_output();
381        let read_response_recv = sliced! {
382            let batch_of_req = use(read_req, nondet!(/** test */));
383            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
384            batch_of_req.cross_singleton(latest_singleton)
385        }
386        .sim_output();
387
388        let sim_compiled = flow.sim().compiled();
389        let instances = sim_compiled.exhaustive(async || {
390            write_send.send(1);
391            write_ack_recv.assert_yields([1]).await;
392            read_send.send(());
393            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
394        });
395
396        assert_eq!(instances, 1);
397
398        let instances_read_before_write = sim_compiled.exhaustive(async || {
399            write_send.send(1);
400            read_send.send(());
401            write_ack_recv.assert_yields([1]).await;
402            let _ = read_response_recv.next().await;
403        });
404
405        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
406    }
407
408    #[cfg(feature = "sim")]
409    #[test]
410    #[should_panic]
411    fn sim_non_atomic_stream() {
412        // shows that atomic is necessary
413        let mut flow = FlowBuilder::new();
414        let node = flow.process::<()>();
415
416        let (write_send, write_req) = node.sim_input();
417        let (read_send, read_req) = node.sim_input::<(), _, _>();
418
419        let current_state = write_req.clone().fold(
420            q!(|| 0),
421            q!(|state: &mut i32, v: i32| {
422                *state += v;
423            }),
424        );
425
426        let write_ack_recv = write_req.sim_output();
427
428        let read_response_recv = sliced! {
429            let batch_of_req = use(read_req, nondet!(/** test */));
430            let latest_singleton = use(current_state, nondet!(/** test */));
431            batch_of_req.cross_singleton(latest_singleton)
432        }
433        .sim_output();
434
435        flow.sim().exhaustive(async || {
436            write_send.send(1);
437            write_ack_recv.assert_yields([1]).await;
438            read_send.send(());
439
440            if let Some((_, v)) = read_response_recv.next().await {
441                assert_eq!(v, 1);
442            }
443        });
444    }
445}