hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11    TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
14use crate::{Bounded, Optional, Singleton, Stream};
15
16#[sealed]
17pub trait NoTick {}
18#[sealed]
19impl<T> NoTick for Process<'_, T> {}
20#[sealed]
21impl<T> NoTick for Cluster<'_, T> {}
22
23#[sealed]
24pub trait NoAtomic {}
25#[sealed]
26impl<T> NoAtomic for Process<'_, T> {}
27#[sealed]
28impl<T> NoAtomic for Cluster<'_, T> {}
29#[sealed]
30impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
31
32#[derive(Clone)]
33pub struct Atomic<Loc> {
34    pub(crate) tick: Tick<Loc>,
35}
36
37impl<'a, L> Location<'a> for Atomic<L>
38where
39    L: Location<'a>,
40{
41    type Root = L::Root;
42
43    fn root(&self) -> Self::Root {
44        self.tick.root()
45    }
46
47    fn id(&self) -> LocationId {
48        self.tick.id()
49    }
50
51    fn flow_state(&self) -> &FlowState {
52        self.tick.flow_state()
53    }
54
55    fn is_top_level() -> bool {
56        L::is_top_level()
57    }
58}
59
60#[sealed]
61impl<L> NoTick for Atomic<L> {}
62
63/// Marks the stream as being inside the single global clock domain.
64#[derive(Clone)]
65pub struct Tick<Loc> {
66    pub(crate) id: usize,
67    pub(crate) l: Loc,
68}
69
70impl<'a, L> Location<'a> for Tick<L>
71where
72    L: Location<'a>,
73{
74    type Root = L::Root;
75
76    fn root(&self) -> Self::Root {
77        self.l.root()
78    }
79
80    fn id(&self) -> LocationId {
81        LocationId::Tick(self.id, Box::new(self.l.id()))
82    }
83
84    fn flow_state(&self) -> &FlowState {
85        self.l.flow_state()
86    }
87
88    fn is_top_level() -> bool {
89        false
90    }
91
92    fn next_node_id(&self) -> usize {
93        self.l.next_node_id()
94    }
95
96    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
97        self.l.new_node_metadata::<T>()
98    }
99}
100
101impl<'a, L> Tick<L>
102where
103    L: Location<'a>,
104{
105    pub fn outer(&self) -> &L {
106        &self.l
107    }
108
109    pub fn spin_batch(
110        &self,
111        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
112    ) -> Stream<(), Self, Bounded>
113    where
114        L: NoTick + NoAtomic,
115    {
116        let out = self
117            .l
118            .spin()
119            .flat_map_ordered(q!(move |_| 0..batch_size))
120            .map(q!(|_| ()));
121
122        unsafe {
123            // SAFETY: at runtime, `spin` produces a single value per tick,
124            // so each batch is guaranteed to be the same size.
125            out.tick_batch(self)
126        }
127    }
128
129    pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
130    where
131        T: Clone,
132        L: NoTick + NoAtomic,
133    {
134        unsafe {
135            // SAFETY: a top-level singleton produces the same value each tick
136            self.outer().singleton(e).latest_tick(self)
137        }
138    }
139
140    pub fn optional_first_tick<T: Clone>(
141        &self,
142        e: impl QuotedWithContext<'a, T, Tick<L>>,
143    ) -> Optional<T, Self, Bounded>
144    where
145        L: NoTick + NoAtomic,
146    {
147        let e_arr = q!([e]);
148        let e = e_arr.splice_untyped_ctx(self);
149
150        Optional::new(
151            self.clone(),
152            HydroNode::Source {
153                source: HydroSource::Iter(e.into()),
154                location_kind: self.l.id(),
155                metadata: self.new_node_metadata::<T>(),
156            },
157        )
158    }
159
160    pub fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
161    where
162        S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
163        L: NoTick,
164    {
165        let next_id = {
166            let on_id = match self.l.id() {
167                LocationId::Process(id) => id,
168                LocationId::Cluster(id) => id,
169                LocationId::Tick(_, _) => panic!(),
170                LocationId::ExternalProcess(_) => panic!(),
171            };
172
173            let mut flow_state = self.flow_state().borrow_mut();
174            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
175
176            let id = *next_id_entry;
177            *next_id_entry += 1;
178            id
179        };
180
181        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
182
183        (
184            ForwardRef {
185                completed: false,
186                ident: ident.clone(),
187                expected_location: self.id(),
188                _phantom: PhantomData,
189            },
190            S::create_source(ident, self.clone()),
191        )
192    }
193
194    pub fn forward_ref_atomic<S>(&self) -> (ForwardRef<'a, S>, S)
195    where
196        S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>,
197    {
198        let next_id = {
199            let on_id = match self.l.id() {
200                LocationId::Process(id) => id,
201                LocationId::Cluster(id) => id,
202                LocationId::Tick(_, _) => panic!(),
203                LocationId::ExternalProcess(_) => panic!(),
204            };
205
206            let mut flow_state = self.flow_state().borrow_mut();
207            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
208
209            let id = *next_id_entry;
210            *next_id_entry += 1;
211            id
212        };
213
214        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
215
216        (
217            ForwardRef {
218                completed: false,
219                ident: ident.clone(),
220                expected_location: self.id(),
221                _phantom: PhantomData,
222            },
223            S::create_source(ident, Atomic { tick: self.clone() }),
224        )
225    }
226
227    pub fn cycle<S>(&self) -> (TickCycle<'a, S>, S)
228    where
229        S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick,
230        L: NoTick,
231    {
232        let next_id = {
233            let on_id = match self.l.id() {
234                LocationId::Process(id) => id,
235                LocationId::Cluster(id) => id,
236                LocationId::Tick(_, _) => panic!(),
237                LocationId::ExternalProcess(_) => panic!(),
238            };
239
240            let mut flow_state = self.flow_state().borrow_mut();
241            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
242
243            let id = *next_id_entry;
244            *next_id_entry += 1;
245            id
246        };
247
248        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
249
250        (
251            TickCycle {
252                completed: false,
253                ident: ident.clone(),
254                expected_location: self.id(),
255                _phantom: PhantomData,
256            },
257            S::create_source(ident, self.clone()),
258        )
259    }
260
261    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycle<'a, S>, S)
262    where
263        S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
264        L: NoTick,
265    {
266        let next_id = {
267            let on_id = match self.l.id() {
268                LocationId::Process(id) => id,
269                LocationId::Cluster(id) => id,
270                LocationId::Tick(_, _) => panic!(),
271                LocationId::ExternalProcess(_) => panic!(),
272            };
273
274            let mut flow_state = self.flow_state().borrow_mut();
275            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
276
277            let id = *next_id_entry;
278            *next_id_entry += 1;
279            id
280        };
281
282        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
283
284        (
285            TickCycle {
286                completed: false,
287                ident: ident.clone(),
288                expected_location: self.id(),
289                _phantom: PhantomData,
290            },
291            S::create_source(ident, initial, self.clone()),
292        )
293    }
294}