Skip to main content

hydro_lang/location/
tick.rs

1//! Clock domains for batching streaming data into discrete time steps.
2//!
3//! In Hydro, a [`Tick`] represents a logical clock that can be used to batch
4//! unbounded streaming data into discrete, bounded time steps. This is essential
5//! for implementing iterative algorithms, synchronizing data across multiple
6//! streams, and performing aggregations over windows of data.
7//!
8//! A tick is created from a top-level location (such as [`Process`] or [`Cluster`])
9//! using [`Location::tick`]. Once inside a tick, bounded live collections can be
10//! manipulated with operations like fold, reduce, and cross-product, and the
11//! results can be emitted back to the unbounded stream using methods like
12//! `all_ticks()`.
13//!
14//! The [`Atomic`] wrapper provides atomicity guarantees within a tick, ensuring
15//! that reads and writes within a tick are serialized.
16//!
17//! The [`NoTick`] marker trait is used to constrain APIs that should only be
18//! called on top-level locations (not inside a tick), while [`NoAtomic`] constrains
19//! APIs that should not be called inside an atomic context.
20
21use sealed::sealed;
22use stageleft::{QuotedWithContext, q};
23
24#[cfg(stageleft_runtime)]
25use super::dynamic::DynLocation;
26use super::{Cluster, Location, LocationId, Process};
27use crate::compile::builder::{ClockId, FlowState};
28use crate::compile::ir::{HydroNode, HydroSource};
29#[cfg(stageleft_runtime)]
30use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
31use crate::forward_handle::{TickCycle, TickCycleHandle};
32use crate::live_collections::boundedness::{Bounded, Unbounded};
33use crate::live_collections::optional::Optional;
34use crate::live_collections::singleton::Singleton;
35use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
36use crate::nondet::nondet;
37
38/// Marker trait for locations that are **not** inside a [`Tick`] clock domain.
39///
40/// This trait is implemented by top-level locations such as [`Process`] and [`Cluster`],
41/// as well as [`Atomic`]. It is used to constrain APIs that should only be called
42/// outside of a tick context (e.g., creating a new tick or sourcing external data).
43#[sealed]
44pub trait NoTick {}
45#[sealed]
46impl<T> NoTick for Process<'_, T> {}
47#[sealed]
48impl<T> NoTick for Cluster<'_, T> {}
49
50/// Marker trait for locations that are **not** inside an [`Atomic`] context.
51///
52/// This trait is implemented by top-level locations ([`Process`], [`Cluster`]) and
53/// by [`Tick`]. It is used to constrain APIs that should not be called from within
54/// an atomic block.
55#[sealed]
56pub trait NoAtomic {}
57#[sealed]
58impl<T> NoAtomic for Process<'_, T> {}
59#[sealed]
60impl<T> NoAtomic for Cluster<'_, T> {}
61#[sealed]
62impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
63
64/// A location wrapper that provides atomicity guarantees within a [`Tick`].
65///
66/// An `Atomic` context establishes a happens-before relationship between operations:
67/// - Downstream computations from `atomic(&tick)` are associated with that tick
68/// - Outputs from `end_atomic()` are held until all computations in the tick complete
69/// - Snapshots via `use::atomic` are guaranteed to reflect all updates from associated `end_atomic()`
70///
71/// This ensures read-after-write consistency: if a client receives an acknowledgement
72/// from `end_atomic()`, any subsequent `use::atomic` snapshot will include the effects
73/// of that acknowledged operation.
74#[derive(Clone)]
75pub struct Atomic<Loc> {
76    pub(crate) tick: Tick<Loc>,
77}
78
79impl<L: DynLocation> DynLocation for Atomic<L> {
80    fn id(&self) -> LocationId {
81        LocationId::Atomic(Box::new(self.tick.id()))
82    }
83
84    fn flow_state(&self) -> &FlowState {
85        self.tick.flow_state()
86    }
87
88    fn is_top_level() -> bool {
89        L::is_top_level()
90    }
91
92    fn multiversioned(&self) -> bool {
93        self.tick.multiversioned()
94    }
95}
96
97impl<'a, L> Location<'a> for Atomic<L>
98where
99    L: Location<'a>,
100{
101    type Root = L::Root;
102
103    fn root(&self) -> Self::Root {
104        self.tick.root()
105    }
106}
107
108#[sealed]
109impl<L> NoTick for Atomic<L> {}
110
111/// Trait for live collections that can be deferred by one tick.
112///
113/// When a collection implements `DeferTick`, calling `defer_tick` delays its
114/// values by one clock cycle. This is primarily used internally to implement
115/// tick-based cycles ([`Tick::cycle`]), ensuring that feedback loops advance
116/// by one tick to avoid infinite recursion within a single tick.
117pub trait DeferTick {
118    /// Returns a new collection whose values are delayed by one tick.
119    fn defer_tick(self) -> Self;
120}
121
122/// Marks the stream as being inside the single global clock domain.
123#[derive(Clone)]
124pub struct Tick<L> {
125    pub(crate) id: ClockId,
126    /// Location.
127    pub(crate) l: L,
128}
129
130impl<L: DynLocation> DynLocation for Tick<L> {
131    fn id(&self) -> LocationId {
132        LocationId::Tick(self.id, Box::new(self.l.id()))
133    }
134
135    fn flow_state(&self) -> &FlowState {
136        self.l.flow_state()
137    }
138
139    fn is_top_level() -> bool {
140        false
141    }
142
143    fn multiversioned(&self) -> bool {
144        self.l.multiversioned()
145    }
146}
147
148impl<'a, L> Location<'a> for Tick<L>
149where
150    L: Location<'a>,
151{
152    type Root = L::Root;
153
154    fn root(&self) -> Self::Root {
155        self.l.root()
156    }
157}
158
159impl<'a, L> Tick<L>
160where
161    L: Location<'a>,
162{
163    /// Returns a reference to the outer (parent) location that this tick is nested within.
164    ///
165    /// For example, if a `Tick` was created from a `Process`, this returns a reference
166    /// to that `Process`.
167    pub fn outer(&self) -> &L {
168        &self.l
169    }
170
171    /// Creates a bounded stream of `()` values inside this tick, with a fixed batch size.
172    ///
173    /// This is useful for driving computations inside a tick that need to process
174    /// a specific number of elements per tick. Each tick will produce exactly
175    /// `batch_size` unit values.
176    pub fn spin_batch(
177        &self,
178        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
179    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
180    where
181        L: NoTick,
182    {
183        let out = self
184            .l
185            .spin()
186            .flat_map_ordered(q!(move |_| 0..batch_size))
187            .map(q!(|_| ()));
188
189        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
190    }
191
192    /// Constructs a [`Singleton`] materialized inside this tick with the given static value.
193    ///
194    /// The singleton will have the provided value on every tick. This is useful
195    /// for providing constant values to computations inside a tick.
196    pub fn singleton<T>(
197        &self,
198        e: impl QuotedWithContext<'a, T, Tick<L>>,
199    ) -> Singleton<T, Self, Bounded>
200    where
201        T: Clone,
202    {
203        let e = e.splice_untyped_ctx(self);
204
205        Singleton::new(
206            self.clone(),
207            HydroNode::SingletonSource {
208                value: e.into(),
209                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
210            },
211        )
212    }
213
214    /// Creates an [`Optional`] which has a null value on every tick.
215    ///
216    /// # Example
217    /// ```rust
218    /// # #[cfg(feature = "deploy")] {
219    /// # use hydro_lang::prelude::*;
220    /// # use futures::StreamExt;
221    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
222    /// let tick = process.tick();
223    /// let optional = tick.none::<i32>();
224    /// optional.unwrap_or(tick.singleton(q!(123)))
225    /// # .all_ticks()
226    /// # }, |mut stream| async move {
227    /// // 123
228    /// # assert_eq!(stream.next().await.unwrap(), 123);
229    /// # }));
230    /// # }
231    /// ```
232    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
233        let e = q!([]);
234        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
235
236        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
237            self.clone(),
238            HydroNode::Source {
239                source: HydroSource::Iter(e.into()),
240                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
241            },
242        );
243
244        unit_optional.map(q!(|_| unreachable!())) // always empty
245    }
246
247    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
248    /// null on all subsequent ticks.
249    ///
250    /// This is useful for bootstrapping stateful computations which need an initial value.
251    ///
252    /// # Example
253    /// ```rust
254    /// # #[cfg(feature = "deploy")] {
255    /// # use hydro_lang::prelude::*;
256    /// # use futures::StreamExt;
257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
258    /// let tick = process.tick();
259    /// // ticks are lazy by default, forces the second tick to run
260    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
261    /// let optional = tick.optional_first_tick(q!(5));
262    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
263    /// # }, |mut stream| async move {
264    /// // 5, 123, 123, 123, ...
265    /// # assert_eq!(stream.next().await.unwrap(), 5);
266    /// # assert_eq!(stream.next().await.unwrap(), 123);
267    /// # assert_eq!(stream.next().await.unwrap(), 123);
268    /// # assert_eq!(stream.next().await.unwrap(), 123);
269    /// # }));
270    /// # }
271    /// ```
272    pub fn optional_first_tick<T: Clone>(
273        &self,
274        e: impl QuotedWithContext<'a, T, Tick<L>>,
275    ) -> Optional<T, Self, Bounded> {
276        let e_arr = q!([e]);
277        let e = e_arr.splice_untyped_ctx(self);
278
279        Optional::new(
280            self.clone(),
281            HydroNode::Batch {
282                inner: Box::new(HydroNode::Source {
283                    source: HydroSource::Iter(e.into()),
284                    metadata: self
285                        .outer()
286                        .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
287                }),
288                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
289            },
290        )
291    }
292
293    /// Creates a feedback cycle within this tick for implementing iterative computations.
294    ///
295    /// Returns a handle that must be completed with the actual collection, and a placeholder
296    /// collection that represents the output of the previous tick (deferred by one tick).
297    /// This is useful for implementing fixed-point computations where the output of one
298    /// tick feeds into the input of the next.
299    ///
300    /// The cycle automatically defers values by one tick to prevent infinite recursion.
301    #[expect(
302        private_bounds,
303        reason = "only Hydro collections can implement ReceiverComplete"
304    )]
305    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
306    where
307        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
308        L: NoTick,
309    {
310        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
311        (
312            TickCycleHandle::new(cycle_id, Location::id(self)),
313            S::create_source(cycle_id, self.clone()).defer_tick(),
314        )
315    }
316
317    /// Creates a feedback cycle with an initial value for the first tick.
318    ///
319    /// Similar to [`Tick::cycle`], but allows providing an initial collection
320    /// that will be used as the value on the first tick before any feedback
321    /// is available. This is useful for bootstrapping iterative computations
322    /// that need a starting state.
323    #[expect(
324        private_bounds,
325        reason = "only Hydro collections can implement ReceiverComplete"
326    )]
327    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
328    where
329        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
330    {
331        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
332        (
333            TickCycleHandle::new(cycle_id, Location::id(self)),
334            // no need to defer_tick, create_source_with_initial does it for us
335            S::create_source_with_initial(cycle_id, initial, self.clone()),
336        )
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    #[cfg(feature = "sim")]
343    use stageleft::q;
344
345    #[cfg(feature = "sim")]
346    use crate::live_collections::sliced::sliced;
347    #[cfg(feature = "sim")]
348    use crate::location::Location;
349    #[cfg(feature = "sim")]
350    use crate::nondet::nondet;
351    #[cfg(feature = "sim")]
352    use crate::prelude::FlowBuilder;
353
354    #[cfg(feature = "sim")]
355    #[test]
356    fn sim_atomic_stream() {
357        let mut flow = FlowBuilder::new();
358        let node = flow.process::<()>();
359
360        let (write_send, write_req) = node.sim_input();
361        let (read_send, read_req) = node.sim_input::<(), _, _>();
362
363        let tick = node.tick();
364        let atomic_write = write_req.atomic(&tick);
365        let current_state = atomic_write.clone().fold(
366            q!(|| 0),
367            q!(|state: &mut i32, v: i32| {
368                *state += v;
369            }),
370        );
371
372        let write_ack_recv = atomic_write.end_atomic().sim_output();
373        let read_response_recv = sliced! {
374            let batch_of_req = use(read_req, nondet!(/** test */));
375            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
376            batch_of_req.cross_singleton(latest_singleton)
377        }
378        .sim_output();
379
380        let sim_compiled = flow.sim().compiled();
381        let instances = sim_compiled.exhaustive(async || {
382            write_send.send(1);
383            write_ack_recv.assert_yields([1]).await;
384            read_send.send(());
385            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
386        });
387
388        assert_eq!(instances, 1);
389
390        let instances_read_before_write = sim_compiled.exhaustive(async || {
391            write_send.send(1);
392            read_send.send(());
393            write_ack_recv.assert_yields([1]).await;
394            let _ = read_response_recv.next().await;
395        });
396
397        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
398    }
399
400    #[cfg(feature = "sim")]
401    #[test]
402    #[should_panic]
403    fn sim_non_atomic_stream() {
404        // shows that atomic is necessary
405        let mut flow = FlowBuilder::new();
406        let node = flow.process::<()>();
407
408        let (write_send, write_req) = node.sim_input();
409        let (read_send, read_req) = node.sim_input::<(), _, _>();
410
411        let current_state = write_req.clone().fold(
412            q!(|| 0),
413            q!(|state: &mut i32, v: i32| {
414                *state += v;
415            }),
416        );
417
418        let write_ack_recv = write_req.sim_output();
419
420        let read_response_recv = sliced! {
421            let batch_of_req = use(read_req, nondet!(/** test */));
422            let latest_singleton = use(current_state, nondet!(/** test */));
423            batch_of_req.cross_singleton(latest_singleton)
424        }
425        .sim_output();
426
427        flow.sim().exhaustive(async || {
428            write_send.send(1);
429            write_ack_recv.assert_yields([1]).await;
430            read_send.send(());
431
432            if let Some((_, v)) = read_response_recv.next().await {
433                assert_eq!(v, 1);
434            }
435        });
436    }
437}