hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39    pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43    fn id(&self) -> LocationId {
44        LocationId::Atomic(Box::new(self.tick.id()))
45    }
46
47    fn flow_state(&self) -> &FlowState {
48        self.tick.flow_state()
49    }
50
51    fn is_top_level() -> bool {
52        L::is_top_level()
53    }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58    L: Location<'a>,
59{
60    type Root = L::Root;
61
62    fn root(&self) -> Self::Root {
63        self.tick.root()
64    }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71    fn defer_tick(self) -> Self;
72}
73
74/// Marks the stream as being inside the single global clock domain.
75#[derive(Clone)]
76pub struct Tick<L> {
77    pub(crate) id: usize,
78    pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82    fn id(&self) -> LocationId {
83        LocationId::Tick(self.id, Box::new(self.l.id()))
84    }
85
86    fn flow_state(&self) -> &FlowState {
87        self.l.flow_state()
88    }
89
90    fn is_top_level() -> bool {
91        false
92    }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97    L: Location<'a>,
98{
99    type Root = L::Root;
100
101    fn root(&self) -> Self::Root {
102        self.l.root()
103    }
104}
105
106impl<'a, L> Tick<Atomic<L>>
107where
108    L: Location<'a>,
109{
110    pub fn as_regular_tick(&self) -> Tick<L> {
111        self.l.tick.clone()
112    }
113}
114
115impl<'a, L> Tick<L>
116where
117    L: Location<'a>,
118{
119    pub fn outer(&self) -> &L {
120        &self.l
121    }
122
123    pub fn spin_batch(
124        &self,
125        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
126    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
127    where
128        L: NoTick,
129    {
130        let out = self
131            .l
132            .spin()
133            .flat_map_ordered(q!(move |_| 0..batch_size))
134            .map(q!(|_| ()));
135
136        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
137    }
138
139    pub fn singleton<T>(
140        &self,
141        e: impl QuotedWithContext<'a, T, Tick<L>>,
142    ) -> Singleton<T, Self, Bounded>
143    where
144        T: Clone,
145    {
146        let e = e.splice_untyped_ctx(self);
147
148        Singleton::new(
149            self.clone(),
150            HydroNode::SingletonSource {
151                value: e.into(),
152                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
153            },
154        )
155    }
156
157    /// Creates an [`Optional`] which has a null value on every tick.
158    ///
159    /// # Example
160    /// ```rust
161    /// # #[cfg(feature = "deploy")] {
162    /// # use hydro_lang::prelude::*;
163    /// # use futures::StreamExt;
164    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
165    /// let tick = process.tick();
166    /// let optional = tick.none::<i32>();
167    /// optional.unwrap_or(tick.singleton(q!(123)))
168    /// # .all_ticks()
169    /// # }, |mut stream| async move {
170    /// // 123
171    /// # assert_eq!(stream.next().await.unwrap(), 123);
172    /// # }));
173    /// # }
174    /// ```
175    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
176        let e = q!([]);
177        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
178
179        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
180            self.clone(),
181            HydroNode::Source {
182                source: HydroSource::Iter(e.into()),
183                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
184            },
185        );
186
187        unit_optional.map(q!(|_| unreachable!())) // always empty
188    }
189
190    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
191    /// null on all subsequent ticks.
192    ///
193    /// This is useful for bootstrapping stateful computations which need an initial value.
194    ///
195    /// # Example
196    /// ```rust
197    /// # #[cfg(feature = "deploy")] {
198    /// # use hydro_lang::prelude::*;
199    /// # use futures::StreamExt;
200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
201    /// let tick = process.tick();
202    /// // ticks are lazy by default, forces the second tick to run
203    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
204    /// let optional = tick.optional_first_tick(q!(5));
205    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
206    /// # }, |mut stream| async move {
207    /// // 5, 123, 123, 123, ...
208    /// # assert_eq!(stream.next().await.unwrap(), 5);
209    /// # assert_eq!(stream.next().await.unwrap(), 123);
210    /// # assert_eq!(stream.next().await.unwrap(), 123);
211    /// # assert_eq!(stream.next().await.unwrap(), 123);
212    /// # }));
213    /// # }
214    /// ```
215    pub fn optional_first_tick<T: Clone>(
216        &self,
217        e: impl QuotedWithContext<'a, T, Tick<L>>,
218    ) -> Optional<T, Self, Bounded> {
219        let e_arr = q!([e]);
220        let e = e_arr.splice_untyped_ctx(self);
221
222        Optional::new(
223            self.clone(),
224            HydroNode::Batch {
225                inner: Box::new(HydroNode::Source {
226                    source: HydroSource::Iter(e.into()),
227                    metadata: self
228                        .outer()
229                        .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
230                }),
231                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
232            },
233        )
234    }
235
236    #[expect(
237        private_bounds,
238        reason = "only Hydro collections can implement ReceiverComplete"
239    )]
240    pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
241    where
242        S: CycleCollection<'a, ForwardRef, Location = Self>,
243        L: NoTick,
244    {
245        let next_id = self.flow_state().borrow_mut().next_cycle_id();
246        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
247
248        (
249            ForwardHandle {
250                completed: false,
251                ident: ident.clone(),
252                expected_location: Location::id(self),
253                _phantom: PhantomData,
254            },
255            S::create_source(ident, self.clone()),
256        )
257    }
258
259    #[expect(
260        private_bounds,
261        reason = "only Hydro collections can implement ReceiverComplete"
262    )]
263    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
264    where
265        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
266        L: NoTick,
267    {
268        let next_id = self.flow_state().borrow_mut().next_cycle_id();
269        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
270
271        (
272            TickCycleHandle {
273                completed: false,
274                ident: ident.clone(),
275                expected_location: Location::id(self),
276                _phantom: PhantomData,
277            },
278            S::create_source(ident, self.clone()).defer_tick(),
279        )
280    }
281
282    #[expect(
283        private_bounds,
284        reason = "only Hydro collections can implement ReceiverComplete"
285    )]
286    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
287    where
288        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
289    {
290        let next_id = self.flow_state().borrow_mut().next_cycle_id();
291        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
292
293        (
294            TickCycleHandle {
295                completed: false,
296                ident: ident.clone(),
297                expected_location: Location::id(self),
298                _phantom: PhantomData,
299            },
300            // no need to defer_tick, create_source_with_initial does it for us
301            S::create_source_with_initial(ident, initial, self.clone()),
302        )
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    #[cfg(feature = "sim")]
309    use stageleft::q;
310
311    #[cfg(feature = "sim")]
312    use crate::live_collections::sliced::sliced;
313    #[cfg(feature = "sim")]
314    use crate::location::Location;
315    #[cfg(feature = "sim")]
316    use crate::nondet::nondet;
317    #[cfg(feature = "sim")]
318    use crate::prelude::FlowBuilder;
319
320    #[cfg(feature = "sim")]
321    #[test]
322    fn sim_atomic_stream() {
323        let flow = FlowBuilder::new();
324        let node = flow.process::<()>();
325
326        let (write_send, write_req) = node.sim_input();
327        let (read_send, read_req) = node.sim_input::<(), _, _>();
328
329        let tick = node.tick();
330        let atomic_write = write_req.atomic(&tick);
331        let current_state = atomic_write.clone().fold(
332            q!(|| 0),
333            q!(|state: &mut i32, v: i32| {
334                *state += v;
335            }),
336        );
337
338        let write_ack_recv = atomic_write.end_atomic().sim_output();
339        let read_response_recv = sliced! {
340            let batch_of_req = use(read_req, nondet!(/** test */));
341            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
342            batch_of_req.cross_singleton(latest_singleton)
343        }
344        .sim_output();
345
346        let sim_compiled = flow.sim().compiled();
347        let instances = sim_compiled.exhaustive(async || {
348            write_send.send(1);
349            write_ack_recv.assert_yields([1]).await;
350            read_send.send(());
351            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
352        });
353
354        assert_eq!(instances, 1);
355
356        let instances_read_before_write = sim_compiled.exhaustive(async || {
357            write_send.send(1);
358            read_send.send(());
359            write_ack_recv.assert_yields([1]).await;
360            let _ = read_response_recv.next().await;
361        });
362
363        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
364    }
365
366    #[cfg(feature = "sim")]
367    #[test]
368    #[should_panic]
369    fn sim_non_atomic_stream() {
370        // shows that atomic is necessary
371        let flow = FlowBuilder::new();
372        let node = flow.process::<()>();
373
374        let (write_send, write_req) = node.sim_input();
375        let (read_send, read_req) = node.sim_input::<(), _, _>();
376
377        let current_state = write_req.clone().fold(
378            q!(|| 0),
379            q!(|state: &mut i32, v: i32| {
380                *state += v;
381            }),
382        );
383
384        let write_ack_recv = write_req.sim_output();
385
386        let read_response_recv = sliced! {
387            let batch_of_req = use(read_req, nondet!(/** test */));
388            let latest_singleton = use(current_state, nondet!(/** test */));
389            batch_of_req.cross_singleton(latest_singleton)
390        }
391        .sim_output();
392
393        flow.sim().exhaustive(async || {
394            write_send.send(1);
395            write_ack_recv.assert_yields([1]).await;
396            read_send.send(());
397
398            if let Some((_, v)) = read_response_recv.next().await {
399                assert_eq!(v, 1);
400            }
401        });
402    }
403}