1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11 TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
14use crate::{Bounded, Optional, Singleton, Stream};
15
16#[sealed]
17pub trait NoTick {}
18#[sealed]
19impl<T> NoTick for Process<'_, T> {}
20#[sealed]
21impl<T> NoTick for Cluster<'_, T> {}
22
23#[sealed]
24pub trait NoAtomic {}
25#[sealed]
26impl<T> NoAtomic for Process<'_, T> {}
27#[sealed]
28impl<T> NoAtomic for Cluster<'_, T> {}
29#[sealed]
30impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
31
32#[derive(Clone)]
33pub struct Atomic<Loc> {
34 pub(crate) tick: Tick<Loc>,
35}
36
37impl<'a, L> Location<'a> for Atomic<L>
38where
39 L: Location<'a>,
40{
41 type Root = L::Root;
42
43 fn root(&self) -> Self::Root {
44 self.tick.root()
45 }
46
47 fn id(&self) -> LocationId {
48 self.tick.id()
49 }
50
51 fn flow_state(&self) -> &FlowState {
52 self.tick.flow_state()
53 }
54
55 fn is_top_level() -> bool {
56 L::is_top_level()
57 }
58}
59
60#[sealed]
61impl<L> NoTick for Atomic<L> {}
62
63#[derive(Clone)]
65pub struct Tick<Loc> {
66 pub(crate) id: usize,
67 pub(crate) l: Loc,
68}
69
70impl<'a, L> Location<'a> for Tick<L>
71where
72 L: Location<'a>,
73{
74 type Root = L::Root;
75
76 fn root(&self) -> Self::Root {
77 self.l.root()
78 }
79
80 fn id(&self) -> LocationId {
81 LocationId::Tick(self.id, Box::new(self.l.id()))
82 }
83
84 fn flow_state(&self) -> &FlowState {
85 self.l.flow_state()
86 }
87
88 fn is_top_level() -> bool {
89 false
90 }
91
92 fn next_node_id(&self) -> usize {
93 self.l.next_node_id()
94 }
95
96 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
97 self.l.new_node_metadata::<T>()
98 }
99}
100
101impl<'a, L> Tick<L>
102where
103 L: Location<'a>,
104{
105 pub fn outer(&self) -> &L {
106 &self.l
107 }
108
109 pub fn spin_batch(
110 &self,
111 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
112 ) -> Stream<(), Self, Bounded>
113 where
114 L: NoTick + NoAtomic,
115 {
116 let out = self
117 .l
118 .spin()
119 .flat_map_ordered(q!(move |_| 0..batch_size))
120 .map(q!(|_| ()));
121
122 unsafe {
123 out.tick_batch(self)
126 }
127 }
128
129 pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
130 where
131 T: Clone,
132 L: NoTick + NoAtomic,
133 {
134 unsafe {
135 self.outer().singleton(e).latest_tick(self)
137 }
138 }
139
140 pub fn optional_first_tick<T: Clone>(
141 &self,
142 e: impl QuotedWithContext<'a, T, Tick<L>>,
143 ) -> Optional<T, Self, Bounded>
144 where
145 L: NoTick + NoAtomic,
146 {
147 let e_arr = q!([e]);
148 let e = e_arr.splice_untyped_ctx(self);
149
150 Optional::new(
151 self.clone(),
152 HydroNode::Source {
153 source: HydroSource::Iter(e.into()),
154 location_kind: self.l.id(),
155 metadata: self.new_node_metadata::<T>(),
156 },
157 )
158 }
159
160 pub fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
161 where
162 S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
163 L: NoTick,
164 {
165 let next_id = {
166 let on_id = match self.l.id() {
167 LocationId::Process(id) => id,
168 LocationId::Cluster(id) => id,
169 LocationId::Tick(_, _) => panic!(),
170 LocationId::ExternalProcess(_) => panic!(),
171 };
172
173 let mut flow_state = self.flow_state().borrow_mut();
174 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
175
176 let id = *next_id_entry;
177 *next_id_entry += 1;
178 id
179 };
180
181 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
182
183 (
184 ForwardRef {
185 completed: false,
186 ident: ident.clone(),
187 expected_location: self.id(),
188 _phantom: PhantomData,
189 },
190 S::create_source(ident, self.clone()),
191 )
192 }
193
194 pub fn forward_ref_atomic<S>(&self) -> (ForwardRef<'a, S>, S)
195 where
196 S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>,
197 {
198 let next_id = {
199 let on_id = match self.l.id() {
200 LocationId::Process(id) => id,
201 LocationId::Cluster(id) => id,
202 LocationId::Tick(_, _) => panic!(),
203 LocationId::ExternalProcess(_) => panic!(),
204 };
205
206 let mut flow_state = self.flow_state().borrow_mut();
207 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
208
209 let id = *next_id_entry;
210 *next_id_entry += 1;
211 id
212 };
213
214 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
215
216 (
217 ForwardRef {
218 completed: false,
219 ident: ident.clone(),
220 expected_location: self.id(),
221 _phantom: PhantomData,
222 },
223 S::create_source(ident, Atomic { tick: self.clone() }),
224 )
225 }
226
227 pub fn cycle<S>(&self) -> (TickCycle<'a, S>, S)
228 where
229 S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick,
230 L: NoTick,
231 {
232 let next_id = {
233 let on_id = match self.l.id() {
234 LocationId::Process(id) => id,
235 LocationId::Cluster(id) => id,
236 LocationId::Tick(_, _) => panic!(),
237 LocationId::ExternalProcess(_) => panic!(),
238 };
239
240 let mut flow_state = self.flow_state().borrow_mut();
241 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
242
243 let id = *next_id_entry;
244 *next_id_entry += 1;
245 id
246 };
247
248 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
249
250 (
251 TickCycle {
252 completed: false,
253 ident: ident.clone(),
254 expected_location: self.id(),
255 _phantom: PhantomData,
256 },
257 S::create_source(ident, self.clone()),
258 )
259 }
260
261 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycle<'a, S>, S)
262 where
263 S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
264 L: NoTick,
265 {
266 let next_id = {
267 let on_id = match self.l.id() {
268 LocationId::Process(id) => id,
269 LocationId::Cluster(id) => id,
270 LocationId::Tick(_, _) => panic!(),
271 LocationId::ExternalProcess(_) => panic!(),
272 };
273
274 let mut flow_state = self.flow_state().borrow_mut();
275 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
276
277 let id = *next_id_entry;
278 *next_id_entry += 1;
279 id
280 };
281
282 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
283
284 (
285 TickCycle {
286 completed: false,
287 ident: ident.clone(),
288 expected_location: self.id(),
289 _phantom: PhantomData,
290 },
291 S::create_source(ident, initial, self.clone()),
292 )
293 }
294}