hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39    pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43    fn id(&self) -> LocationId {
44        LocationId::Atomic(Box::new(self.tick.id()))
45    }
46
47    fn flow_state(&self) -> &FlowState {
48        self.tick.flow_state()
49    }
50
51    fn is_top_level() -> bool {
52        L::is_top_level()
53    }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58    L: Location<'a>,
59{
60    type Root = L::Root;
61
62    fn root(&self) -> Self::Root {
63        self.tick.root()
64    }
65}
66
67#[sealed]
68impl<L> NoTick for Atomic<L> {}
69
70pub trait DeferTick {
71    fn defer_tick(self) -> Self;
72}
73
74/// Marks the stream as being inside the single global clock domain.
75#[derive(Clone)]
76pub struct Tick<L> {
77    pub(crate) id: usize,
78    pub(crate) l: L,
79}
80
81impl<L: DynLocation> DynLocation for Tick<L> {
82    fn id(&self) -> LocationId {
83        LocationId::Tick(self.id, Box::new(self.l.id()))
84    }
85
86    fn flow_state(&self) -> &FlowState {
87        self.l.flow_state()
88    }
89
90    fn is_top_level() -> bool {
91        false
92    }
93}
94
95impl<'a, L> Location<'a> for Tick<L>
96where
97    L: Location<'a>,
98{
99    type Root = L::Root;
100
101    fn root(&self) -> Self::Root {
102        self.l.root()
103    }
104}
105
106impl<'a, L> Tick<L>
107where
108    L: Location<'a>,
109{
110    pub fn outer(&self) -> &L {
111        &self.l
112    }
113
114    pub fn spin_batch(
115        &self,
116        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
117    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
118    where
119        L: NoTick,
120    {
121        let out = self
122            .l
123            .spin()
124            .flat_map_ordered(q!(move |_| 0..batch_size))
125            .map(q!(|_| ()));
126
127        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
128    }
129
130    pub fn singleton<T>(
131        &self,
132        e: impl QuotedWithContext<'a, T, Tick<L>>,
133    ) -> Singleton<T, Self, Bounded>
134    where
135        T: Clone,
136    {
137        let e = e.splice_untyped_ctx(self);
138
139        Singleton::new(
140            self.clone(),
141            HydroNode::SingletonSource {
142                value: e.into(),
143                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
144            },
145        )
146    }
147
148    /// Creates an [`Optional`] which has a null value on every tick.
149    ///
150    /// # Example
151    /// ```rust
152    /// # use hydro_lang::prelude::*;
153    /// # use futures::StreamExt;
154    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
155    /// let tick = process.tick();
156    /// let optional = tick.none::<i32>();
157    /// optional.unwrap_or(tick.singleton(q!(123)))
158    /// # .all_ticks()
159    /// # }, |mut stream| async move {
160    /// // 123
161    /// # assert_eq!(stream.next().await.unwrap(), 123);
162    /// # }));
163    /// ```
164    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
165        let e = q!([]);
166        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
167
168        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
169            self.clone(),
170            HydroNode::Source {
171                source: HydroSource::Iter(e.into()),
172                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
173            },
174        );
175
176        unit_optional.map(q!(|_| unreachable!())) // always empty
177    }
178
179    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
180    /// null on all subsequent ticks.
181    ///
182    /// This is useful for bootstrapping stateful computations which need an initial value.
183    ///
184    /// # Example
185    /// ```rust
186    /// # use hydro_lang::prelude::*;
187    /// # use futures::StreamExt;
188    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
189    /// let tick = process.tick();
190    /// // ticks are lazy by default, forces the second tick to run
191    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
192    /// let optional = tick.optional_first_tick(q!(5));
193    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
194    /// # }, |mut stream| async move {
195    /// // 5, 123, 123, 123, ...
196    /// # assert_eq!(stream.next().await.unwrap(), 5);
197    /// # assert_eq!(stream.next().await.unwrap(), 123);
198    /// # assert_eq!(stream.next().await.unwrap(), 123);
199    /// # assert_eq!(stream.next().await.unwrap(), 123);
200    /// # }));
201    /// ```
202    pub fn optional_first_tick<T: Clone>(
203        &self,
204        e: impl QuotedWithContext<'a, T, Tick<L>>,
205    ) -> Optional<T, Self, Bounded> {
206        let e_arr = q!([e]);
207        let e = e_arr.splice_untyped_ctx(self);
208
209        Optional::new(
210            self.clone(),
211            HydroNode::Batch {
212                inner: Box::new(HydroNode::Source {
213                    source: HydroSource::Iter(e.into()),
214                    metadata: self
215                        .outer()
216                        .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
217                }),
218                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
219            },
220        )
221    }
222
223    #[expect(
224        private_bounds,
225        reason = "only Hydro collections can implement ReceiverComplete"
226    )]
227    pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
228    where
229        S: CycleCollection<'a, ForwardRef, Location = Self>,
230        L: NoTick,
231    {
232        let next_id = self.flow_state().borrow_mut().next_cycle_id();
233        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
234
235        (
236            ForwardHandle {
237                completed: false,
238                ident: ident.clone(),
239                expected_location: Location::id(self),
240                _phantom: PhantomData,
241            },
242            S::create_source(ident, self.clone()),
243        )
244    }
245
246    #[expect(
247        private_bounds,
248        reason = "only Hydro collections can implement ReceiverComplete"
249    )]
250    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
251    where
252        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
253        L: NoTick,
254    {
255        let next_id = self.flow_state().borrow_mut().next_cycle_id();
256        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
257
258        (
259            TickCycleHandle {
260                completed: false,
261                ident: ident.clone(),
262                expected_location: Location::id(self),
263                _phantom: PhantomData,
264            },
265            S::create_source(ident, self.clone()).defer_tick(),
266        )
267    }
268
269    #[expect(
270        private_bounds,
271        reason = "only Hydro collections can implement ReceiverComplete"
272    )]
273    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
274    where
275        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
276    {
277        let next_id = self.flow_state().borrow_mut().next_cycle_id();
278        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
279
280        (
281            TickCycleHandle {
282                completed: false,
283                ident: ident.clone(),
284                expected_location: Location::id(self),
285                _phantom: PhantomData,
286            },
287            // no need to defer_tick, create_source_with_initial does it for us
288            S::create_source_with_initial(ident, initial, self.clone()),
289        )
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use stageleft::q;
296
297    use crate::location::Location;
298    use crate::nondet::nondet;
299    use crate::prelude::FlowBuilder;
300
301    #[test]
302    fn sim_atomic_stream() {
303        let flow = FlowBuilder::new();
304        let node = flow.process::<()>();
305        let external = flow.external::<()>();
306
307        let (input_write, write_req) = node.source_external_bincode(&external);
308        let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
309
310        let tick = node.tick();
311        let atomic_write = write_req.atomic(&tick);
312        let current_state = atomic_write.clone().fold(
313            q!(|| 0),
314            q!(|state: &mut i32, v: i32| {
315                *state += v;
316            }),
317        );
318
319        let write_ack = atomic_write.end_atomic().send_bincode_external(&external);
320        let read_response = read_req
321            .batch(&tick, nondet!(/** test */))
322            .cross_singleton(current_state.snapshot_atomic(nondet!(/** test */)))
323            .all_ticks()
324            .send_bincode_external(&external);
325
326        let sim_compiled = flow.sim().compiled();
327        let instances = sim_compiled.exhaustive(async |mut compiled| {
328            let write_send = compiled.connect(&input_write);
329            let read_send = compiled.connect(&input_read);
330            let mut write_ack_recv = compiled.connect(&write_ack);
331            let mut read_response_recv = compiled.connect(&read_response);
332            compiled.launch();
333
334            write_send.send(1);
335            write_ack_recv.assert_yields([1]).await;
336            read_send.send(());
337            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
338        });
339
340        assert_eq!(instances, 1);
341
342        let instances_read_before_write = sim_compiled.exhaustive(async |mut compiled| {
343            let write_send = compiled.connect(&input_write);
344            let read_send = compiled.connect(&input_read);
345            let mut write_ack_recv = compiled.connect(&write_ack);
346            let mut read_response_recv = compiled.connect(&read_response);
347            compiled.launch();
348
349            write_send.send(1);
350            read_send.send(());
351            write_ack_recv.assert_yields([1]).await;
352            let _ = read_response_recv.next().await;
353        });
354
355        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
356    }
357
358    #[test]
359    #[should_panic]
360    fn sim_non_atomic_stream() {
361        // shows that atomic is necessary
362        let flow = FlowBuilder::new();
363        let node = flow.process::<()>();
364        let external = flow.external::<()>();
365
366        let (input_write, write_req) = node.source_external_bincode(&external);
367        let (input_read, read_req) = node.source_external_bincode::<_, (), _, _>(&external);
368
369        let current_state = write_req.clone().fold(
370            q!(|| 0),
371            q!(|state: &mut i32, v: i32| {
372                *state += v;
373            }),
374        );
375
376        let write_ack = write_req.send_bincode_external(&external);
377
378        let tick = node.tick();
379        let read_response = read_req
380            .batch(&tick, nondet!(/** test */))
381            .cross_singleton(current_state.snapshot(&tick, nondet!(/** test */)))
382            .all_ticks()
383            .send_bincode_external(&external);
384
385        flow.sim().exhaustive(async |mut compiled| {
386            let write_send = compiled.connect(&input_write);
387            let read_send = compiled.connect(&input_read);
388            let mut write_ack_recv = compiled.connect(&write_ack);
389            let mut read_response_recv = compiled.connect(&read_response);
390            compiled.launch();
391
392            write_send.send(1);
393            write_ack_recv.assert_yields([1]).await;
394            read_send.send(());
395
396            if let Some((_, v)) = read_response_recv.next().await {
397                assert_eq!(v, 1);
398            }
399        });
400    }
401}