hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11    TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
14use crate::{Bounded, Optional, Singleton, Stream};
15
16#[sealed]
17pub trait NoTick {}
18#[sealed]
19impl<T> NoTick for Process<'_, T> {}
20#[sealed]
21impl<T> NoTick for Cluster<'_, T> {}
22
23#[sealed]
24pub trait NoAtomic {}
25#[sealed]
26impl<T> NoAtomic for Process<'_, T> {}
27#[sealed]
28impl<T> NoAtomic for Cluster<'_, T> {}
29#[sealed]
30impl<'a, L: Location<'a>> NoAtomic for Tick<L> {}
31
32#[derive(Clone)]
33pub struct Atomic<L> {
34    pub(crate) tick: Tick<L>,
35}
36
37impl<'a, L: Location<'a>> Location<'a> for Atomic<L> {
38    type Root = L::Root;
39
40    fn root(&self) -> Self::Root {
41        self.tick.root()
42    }
43
44    fn id(&self) -> LocationId {
45        self.tick.id()
46    }
47
48    fn flow_state(&self) -> &FlowState {
49        self.tick.flow_state()
50    }
51
52    fn is_top_level() -> bool {
53        L::is_top_level()
54    }
55}
56
57#[sealed]
58impl<L> NoTick for Atomic<L> {}
59
60/// Marks the stream as being inside the single global clock domain.
61#[derive(Clone)]
62pub struct Tick<L> {
63    pub(crate) id: usize,
64    pub(crate) l: L,
65}
66
67impl<'a, L: Location<'a>> Location<'a> for Tick<L> {
68    type Root = L::Root;
69
70    fn root(&self) -> Self::Root {
71        self.l.root()
72    }
73
74    fn id(&self) -> LocationId {
75        LocationId::Tick(self.id, Box::new(self.l.id()))
76    }
77
78    fn flow_state(&self) -> &FlowState {
79        self.l.flow_state()
80    }
81
82    fn is_top_level() -> bool {
83        false
84    }
85
86    fn next_node_id(&self) -> usize {
87        self.l.next_node_id()
88    }
89
90    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
91        self.l.new_node_metadata::<T>()
92    }
93}
94
95impl<'a, L: Location<'a>> Tick<L> {
96    pub fn outer(&self) -> &L {
97        &self.l
98    }
99
100    pub fn spin_batch(
101        &self,
102        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
103    ) -> Stream<(), Self, Bounded>
104    where
105        L: NoTick + NoAtomic,
106    {
107        let out = self
108            .l
109            .spin()
110            .flat_map_ordered(q!(move |_| 0..batch_size))
111            .map(q!(|_| ()));
112
113        unsafe {
114            // SAFETY: at runtime, `spin` produces a single value per tick,
115            // so each batch is guaranteed to be the same size.
116            out.tick_batch(self)
117        }
118    }
119
120    pub fn singleton<T: Clone>(
121        &self,
122        e: impl QuotedWithContext<'a, T, L>,
123    ) -> Singleton<T, Self, Bounded>
124    where
125        L: NoTick + NoAtomic,
126    {
127        unsafe {
128            // SAFETY: a top-level singleton produces the same value each tick
129            self.outer().singleton(e).latest_tick(self)
130        }
131    }
132
133    pub fn optional_first_tick<T: Clone>(
134        &self,
135        e: impl QuotedWithContext<'a, T, Tick<L>>,
136    ) -> Optional<T, Self, Bounded>
137    where
138        L: NoTick + NoAtomic,
139    {
140        let e_arr = q!([e]);
141        let e = e_arr.splice_untyped_ctx(self);
142
143        Optional::new(
144            self.clone(),
145            HydroNode::Source {
146                source: HydroSource::Iter(e.into()),
147                location_kind: self.l.id(),
148                metadata: self.new_node_metadata::<T>(),
149            },
150        )
151    }
152
153    pub fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
154        &self,
155    ) -> (ForwardRef<'a, S>, S)
156    where
157        L: NoTick,
158    {
159        let next_id = {
160            let on_id = match self.l.id() {
161                LocationId::Process(id) => id,
162                LocationId::Cluster(id) => id,
163                LocationId::Tick(_, _) => panic!(),
164                LocationId::ExternalProcess(_) => panic!(),
165            };
166
167            let mut flow_state = self.flow_state().borrow_mut();
168            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
169
170            let id = *next_id_entry;
171            *next_id_entry += 1;
172            id
173        };
174
175        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
176
177        (
178            ForwardRef {
179                completed: false,
180                ident: ident.clone(),
181                expected_location: self.id(),
182                _phantom: PhantomData,
183            },
184            S::create_source(ident, self.clone()),
185        )
186    }
187
188    pub fn forward_ref_atomic<S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>>(
189        &self,
190    ) -> (ForwardRef<'a, S>, S) {
191        let next_id = {
192            let on_id = match self.l.id() {
193                LocationId::Process(id) => id,
194                LocationId::Cluster(id) => id,
195                LocationId::Tick(_, _) => panic!(),
196                LocationId::ExternalProcess(_) => panic!(),
197            };
198
199            let mut flow_state = self.flow_state().borrow_mut();
200            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
201
202            let id = *next_id_entry;
203            *next_id_entry += 1;
204            id
205        };
206
207        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
208
209        (
210            ForwardRef {
211                completed: false,
212                ident: ident.clone(),
213                expected_location: self.id(),
214                _phantom: PhantomData,
215            },
216            S::create_source(ident, Atomic { tick: self.clone() }),
217        )
218    }
219
220    pub fn cycle<S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick>(
221        &self,
222    ) -> (TickCycle<'a, S>, S)
223    where
224        L: NoTick,
225    {
226        let next_id = {
227            let on_id = match self.l.id() {
228                LocationId::Process(id) => id,
229                LocationId::Cluster(id) => id,
230                LocationId::Tick(_, _) => panic!(),
231                LocationId::ExternalProcess(_) => panic!(),
232            };
233
234            let mut flow_state = self.flow_state().borrow_mut();
235            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
236
237            let id = *next_id_entry;
238            *next_id_entry += 1;
239            id
240        };
241
242        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
243
244        (
245            TickCycle {
246                completed: false,
247                ident: ident.clone(),
248                expected_location: self.id(),
249                _phantom: PhantomData,
250            },
251            S::create_source(ident, self.clone()),
252        )
253    }
254
255    pub fn cycle_with_initial<
256        S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
257    >(
258        &self,
259        initial: S,
260    ) -> (TickCycle<'a, S>, S)
261    where
262        L: NoTick,
263    {
264        let next_id = {
265            let on_id = match self.l.id() {
266                LocationId::Process(id) => id,
267                LocationId::Cluster(id) => id,
268                LocationId::Tick(_, _) => panic!(),
269                LocationId::ExternalProcess(_) => panic!(),
270            };
271
272            let mut flow_state = self.flow_state().borrow_mut();
273            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
274
275            let id = *next_id_entry;
276            *next_id_entry += 1;
277            id
278        };
279
280        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
281
282        (
283            TickCycle {
284                completed: false,
285                ident: ident.clone(),
286                expected_location: self.id(),
287                _phantom: PhantomData,
288            },
289            S::create_source(ident, initial, self.clone()),
290        )
291    }
292}