hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11    TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroNode, HydroSource};
14use crate::stream::ExactlyOnce;
15use crate::{Bounded, Optional, Singleton, Stream, TotalOrder, nondet};
16
17#[sealed]
18pub trait NoTick {}
19#[sealed]
20impl<T> NoTick for Process<'_, T> {}
21#[sealed]
22impl<T> NoTick for Cluster<'_, T> {}
23
24#[sealed]
25pub trait NoAtomic {}
26#[sealed]
27impl<T> NoAtomic for Process<'_, T> {}
28#[sealed]
29impl<T> NoAtomic for Cluster<'_, T> {}
30#[sealed]
31impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
32
33#[derive(Clone)]
34pub struct Atomic<Loc> {
35    pub(crate) tick: Tick<Loc>,
36}
37
38impl<'a, L> Location<'a> for Atomic<L>
39where
40    L: Location<'a>,
41{
42    type Root = L::Root;
43
44    fn root(&self) -> Self::Root {
45        self.tick.root()
46    }
47
48    fn id(&self) -> LocationId {
49        self.tick.id()
50    }
51
52    fn flow_state(&self) -> &FlowState {
53        self.tick.flow_state()
54    }
55
56    fn is_top_level() -> bool {
57        L::is_top_level()
58    }
59}
60
61#[sealed]
62impl<L> NoTick for Atomic<L> {}
63
64/// Marks the stream as being inside the single global clock domain.
65#[derive(Clone)]
66pub struct Tick<Loc> {
67    pub(crate) id: usize,
68    pub(crate) l: Loc,
69}
70
71impl<'a, L> Location<'a> for Tick<L>
72where
73    L: Location<'a>,
74{
75    type Root = L::Root;
76
77    fn root(&self) -> Self::Root {
78        self.l.root()
79    }
80
81    fn id(&self) -> LocationId {
82        LocationId::Tick(self.id, Box::new(self.l.id()))
83    }
84
85    fn flow_state(&self) -> &FlowState {
86        self.l.flow_state()
87    }
88
89    fn is_top_level() -> bool {
90        false
91    }
92
93    fn next_node_id(&self) -> usize {
94        self.l.next_node_id()
95    }
96}
97
98impl<'a, L> Tick<L>
99where
100    L: Location<'a>,
101{
102    pub fn outer(&self) -> &L {
103        &self.l
104    }
105
106    pub fn spin_batch(
107        &self,
108        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
109    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
110    where
111        L: NoTick + NoAtomic,
112    {
113        let out = self
114            .l
115            .spin()
116            .flat_map_ordered(q!(move |_| 0..batch_size))
117            .map(q!(|_| ()));
118
119        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
120    }
121
122    pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
123    where
124        T: Clone,
125        L: NoTick + NoAtomic,
126    {
127        self.outer().singleton(e).snapshot(
128            self,
129            nondet!(/** a top-level singleton produces the same value each tick */),
130        )
131    }
132
133    pub fn optional_first_tick<T: Clone>(
134        &self,
135        e: impl QuotedWithContext<'a, T, Tick<L>>,
136    ) -> Optional<T, Self, Bounded> {
137        let e_arr = q!([e]);
138        let e = e_arr.splice_untyped_ctx(self);
139
140        Optional::new(
141            self.clone(),
142            HydroNode::Source {
143                source: HydroSource::Iter(e.into()),
144                metadata: self.new_node_metadata::<T>(),
145            },
146        )
147    }
148
149    pub fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
150    where
151        S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
152        L: NoTick,
153    {
154        let next_id = self.flow_state().borrow_mut().next_cycle_id();
155        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
156
157        (
158            ForwardRef {
159                completed: false,
160                ident: ident.clone(),
161                expected_location: self.id(),
162                _phantom: PhantomData,
163            },
164            S::create_source(ident, self.clone()),
165        )
166    }
167
168    pub fn forward_ref_atomic<S>(&self) -> (ForwardRef<'a, S>, S)
169    where
170        S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>,
171    {
172        let next_id = self.flow_state().borrow_mut().next_cycle_id();
173        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
174
175        (
176            ForwardRef {
177                completed: false,
178                ident: ident.clone(),
179                expected_location: self.id(),
180                _phantom: PhantomData,
181            },
182            S::create_source(ident, Atomic { tick: self.clone() }),
183        )
184    }
185
186    pub fn cycle<S>(&self) -> (TickCycle<'a, S>, S)
187    where
188        S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick,
189        L: NoTick,
190    {
191        let next_id = self.flow_state().borrow_mut().next_cycle_id();
192        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
193
194        (
195            TickCycle {
196                completed: false,
197                ident: ident.clone(),
198                expected_location: self.id(),
199                _phantom: PhantomData,
200            },
201            S::create_source(ident, self.clone()),
202        )
203    }
204
205    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycle<'a, S>, S)
206    where
207        S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
208        L: NoTick,
209    {
210        let next_id = self.flow_state().borrow_mut().next_cycle_id();
211        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
212
213        (
214            TickCycle {
215                completed: false,
216                ident: ident.clone(),
217                expected_location: self.id(),
218                _phantom: PhantomData,
219            },
220            S::create_source(ident, initial, self.clone()),
221        )
222    }
223}