hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11    TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
14use crate::stream::ExactlyOnce;
15use crate::{Bounded, Optional, Singleton, Stream, TotalOrder};
16
17#[sealed]
18pub trait NoTick {}
19#[sealed]
20impl<T> NoTick for Process<'_, T> {}
21#[sealed]
22impl<T> NoTick for Cluster<'_, T> {}
23
24#[sealed]
25pub trait NoAtomic {}
26#[sealed]
27impl<T> NoAtomic for Process<'_, T> {}
28#[sealed]
29impl<T> NoAtomic for Cluster<'_, T> {}
30#[sealed]
31impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
32
33#[derive(Clone)]
34pub struct Atomic<Loc> {
35    pub(crate) tick: Tick<Loc>,
36}
37
38impl<'a, L> Location<'a> for Atomic<L>
39where
40    L: Location<'a>,
41{
42    type Root = L::Root;
43
44    fn root(&self) -> Self::Root {
45        self.tick.root()
46    }
47
48    fn id(&self) -> LocationId {
49        self.tick.id()
50    }
51
52    fn flow_state(&self) -> &FlowState {
53        self.tick.flow_state()
54    }
55
56    fn is_top_level() -> bool {
57        L::is_top_level()
58    }
59}
60
61#[sealed]
62impl<L> NoTick for Atomic<L> {}
63
64/// Marks the stream as being inside the single global clock domain.
65#[derive(Clone)]
66pub struct Tick<Loc> {
67    pub(crate) id: usize,
68    pub(crate) l: Loc,
69}
70
71impl<'a, L> Location<'a> for Tick<L>
72where
73    L: Location<'a>,
74{
75    type Root = L::Root;
76
77    fn root(&self) -> Self::Root {
78        self.l.root()
79    }
80
81    fn id(&self) -> LocationId {
82        LocationId::Tick(self.id, Box::new(self.l.id()))
83    }
84
85    fn flow_state(&self) -> &FlowState {
86        self.l.flow_state()
87    }
88
89    fn is_top_level() -> bool {
90        false
91    }
92
93    fn next_node_id(&self) -> usize {
94        self.l.next_node_id()
95    }
96
97    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
98        self.l.new_node_metadata::<T>()
99    }
100}
101
102impl<'a, L> Tick<L>
103where
104    L: Location<'a>,
105{
106    pub fn outer(&self) -> &L {
107        &self.l
108    }
109
110    pub fn spin_batch(
111        &self,
112        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
113    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
114    where
115        L: NoTick + NoAtomic,
116    {
117        let out = self
118            .l
119            .spin()
120            .flat_map_ordered(q!(move |_| 0..batch_size))
121            .map(q!(|_| ()));
122
123        unsafe {
124            // SAFETY: at runtime, `spin` produces a single value per tick,
125            // so each batch is guaranteed to be the same size.
126            out.tick_batch(self)
127        }
128    }
129
130    pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
131    where
132        T: Clone,
133        L: NoTick + NoAtomic,
134    {
135        unsafe {
136            // SAFETY: a top-level singleton produces the same value each tick
137            self.outer().singleton(e).latest_tick(self)
138        }
139    }
140
141    pub fn optional_first_tick<T: Clone>(
142        &self,
143        e: impl QuotedWithContext<'a, T, Tick<L>>,
144    ) -> Optional<T, Self, Bounded>
145    where
146        L: NoTick + NoAtomic,
147    {
148        let e_arr = q!([e]);
149        let e = e_arr.splice_untyped_ctx(self);
150
151        Optional::new(
152            self.clone(),
153            HydroNode::Source {
154                source: HydroSource::Iter(e.into()),
155                location_kind: self.l.id(),
156                metadata: self.new_node_metadata::<T>(),
157            },
158        )
159    }
160
161    pub fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
162    where
163        S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
164        L: NoTick,
165    {
166        let next_id = self.flow_state().borrow_mut().next_cycle_id();
167        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
168
169        (
170            ForwardRef {
171                completed: false,
172                ident: ident.clone(),
173                expected_location: self.id(),
174                _phantom: PhantomData,
175            },
176            S::create_source(ident, self.clone()),
177        )
178    }
179
180    pub fn forward_ref_atomic<S>(&self) -> (ForwardRef<'a, S>, S)
181    where
182        S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>,
183    {
184        let next_id = self.flow_state().borrow_mut().next_cycle_id();
185        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
186
187        (
188            ForwardRef {
189                completed: false,
190                ident: ident.clone(),
191                expected_location: self.id(),
192                _phantom: PhantomData,
193            },
194            S::create_source(ident, Atomic { tick: self.clone() }),
195        )
196    }
197
198    pub fn cycle<S>(&self) -> (TickCycle<'a, S>, S)
199    where
200        S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick,
201        L: NoTick,
202    {
203        let next_id = self.flow_state().borrow_mut().next_cycle_id();
204        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
205
206        (
207            TickCycle {
208                completed: false,
209                ident: ident.clone(),
210                expected_location: self.id(),
211                _phantom: PhantomData,
212            },
213            S::create_source(ident, self.clone()),
214        )
215    }
216
217    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycle<'a, S>, S)
218    where
219        S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
220        L: NoTick,
221    {
222        let next_id = self.flow_state().borrow_mut().next_cycle_id();
223        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
224
225        (
226            TickCycle {
227                completed: false,
228                ident: ident.clone(),
229                expected_location: self.id(),
230                _phantom: PhantomData,
231            },
232            S::create_source(ident, initial, self.clone()),
233        )
234    }
235}