1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11 TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
14use crate::{Bounded, Optional, Singleton, Stream};
15
16#[sealed]
17pub trait NoTick {}
18#[sealed]
19impl<T> NoTick for Process<'_, T> {}
20#[sealed]
21impl<T> NoTick for Cluster<'_, T> {}
22
23#[sealed]
24pub trait NoAtomic {}
25#[sealed]
26impl<T> NoAtomic for Process<'_, T> {}
27#[sealed]
28impl<T> NoAtomic for Cluster<'_, T> {}
29#[sealed]
30impl<'a, L: Location<'a>> NoAtomic for Tick<L> {}
31
32#[derive(Clone)]
33pub struct Atomic<L> {
34 pub(crate) tick: Tick<L>,
35}
36
37impl<'a, L: Location<'a>> Location<'a> for Atomic<L> {
38 type Root = L::Root;
39
40 fn root(&self) -> Self::Root {
41 self.tick.root()
42 }
43
44 fn id(&self) -> LocationId {
45 self.tick.id()
46 }
47
48 fn flow_state(&self) -> &FlowState {
49 self.tick.flow_state()
50 }
51
52 fn is_top_level() -> bool {
53 L::is_top_level()
54 }
55}
56
57#[sealed]
58impl<L> NoTick for Atomic<L> {}
59
60#[derive(Clone)]
62pub struct Tick<L> {
63 pub(crate) id: usize,
64 pub(crate) l: L,
65}
66
67impl<'a, L: Location<'a>> Location<'a> for Tick<L> {
68 type Root = L::Root;
69
70 fn root(&self) -> Self::Root {
71 self.l.root()
72 }
73
74 fn id(&self) -> LocationId {
75 LocationId::Tick(self.id, Box::new(self.l.id()))
76 }
77
78 fn flow_state(&self) -> &FlowState {
79 self.l.flow_state()
80 }
81
82 fn is_top_level() -> bool {
83 false
84 }
85
86 fn next_node_id(&self) -> usize {
87 self.l.next_node_id()
88 }
89
90 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
91 self.l.new_node_metadata::<T>()
92 }
93}
94
95impl<'a, L: Location<'a>> Tick<L> {
96 pub fn outer(&self) -> &L {
97 &self.l
98 }
99
100 pub fn spin_batch(
101 &self,
102 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
103 ) -> Stream<(), Self, Bounded>
104 where
105 L: NoTick + NoAtomic,
106 {
107 let out = self
108 .l
109 .spin()
110 .flat_map_ordered(q!(move |_| 0..batch_size))
111 .map(q!(|_| ()));
112
113 unsafe {
114 out.tick_batch(self)
117 }
118 }
119
120 pub fn singleton<T: Clone>(
121 &self,
122 e: impl QuotedWithContext<'a, T, L>,
123 ) -> Singleton<T, Self, Bounded>
124 where
125 L: NoTick + NoAtomic,
126 {
127 unsafe {
128 self.outer().singleton(e).latest_tick(self)
130 }
131 }
132
133 pub fn optional_first_tick<T: Clone>(
134 &self,
135 e: impl QuotedWithContext<'a, T, Tick<L>>,
136 ) -> Optional<T, Self, Bounded>
137 where
138 L: NoTick + NoAtomic,
139 {
140 let e_arr = q!([e]);
141 let e = e_arr.splice_untyped_ctx(self);
142
143 Optional::new(
144 self.clone(),
145 HydroNode::Source {
146 source: HydroSource::Iter(e.into()),
147 location_kind: self.l.id(),
148 metadata: self.new_node_metadata::<T>(),
149 },
150 )
151 }
152
153 pub fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
154 &self,
155 ) -> (ForwardRef<'a, S>, S)
156 where
157 L: NoTick,
158 {
159 let next_id = {
160 let on_id = match self.l.id() {
161 LocationId::Process(id) => id,
162 LocationId::Cluster(id) => id,
163 LocationId::Tick(_, _) => panic!(),
164 LocationId::ExternalProcess(_) => panic!(),
165 };
166
167 let mut flow_state = self.flow_state().borrow_mut();
168 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
169
170 let id = *next_id_entry;
171 *next_id_entry += 1;
172 id
173 };
174
175 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
176
177 (
178 ForwardRef {
179 completed: false,
180 ident: ident.clone(),
181 expected_location: self.id(),
182 _phantom: PhantomData,
183 },
184 S::create_source(ident, self.clone()),
185 )
186 }
187
188 pub fn forward_ref_atomic<S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>>(
189 &self,
190 ) -> (ForwardRef<'a, S>, S) {
191 let next_id = {
192 let on_id = match self.l.id() {
193 LocationId::Process(id) => id,
194 LocationId::Cluster(id) => id,
195 LocationId::Tick(_, _) => panic!(),
196 LocationId::ExternalProcess(_) => panic!(),
197 };
198
199 let mut flow_state = self.flow_state().borrow_mut();
200 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
201
202 let id = *next_id_entry;
203 *next_id_entry += 1;
204 id
205 };
206
207 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
208
209 (
210 ForwardRef {
211 completed: false,
212 ident: ident.clone(),
213 expected_location: self.id(),
214 _phantom: PhantomData,
215 },
216 S::create_source(ident, Atomic { tick: self.clone() }),
217 )
218 }
219
220 pub fn cycle<S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick>(
221 &self,
222 ) -> (TickCycle<'a, S>, S)
223 where
224 L: NoTick,
225 {
226 let next_id = {
227 let on_id = match self.l.id() {
228 LocationId::Process(id) => id,
229 LocationId::Cluster(id) => id,
230 LocationId::Tick(_, _) => panic!(),
231 LocationId::ExternalProcess(_) => panic!(),
232 };
233
234 let mut flow_state = self.flow_state().borrow_mut();
235 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
236
237 let id = *next_id_entry;
238 *next_id_entry += 1;
239 id
240 };
241
242 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
243
244 (
245 TickCycle {
246 completed: false,
247 ident: ident.clone(),
248 expected_location: self.id(),
249 _phantom: PhantomData,
250 },
251 S::create_source(ident, self.clone()),
252 )
253 }
254
255 pub fn cycle_with_initial<
256 S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
257 >(
258 &self,
259 initial: S,
260 ) -> (TickCycle<'a, S>, S)
261 where
262 L: NoTick,
263 {
264 let next_id = {
265 let on_id = match self.l.id() {
266 LocationId::Process(id) => id,
267 LocationId::Cluster(id) => id,
268 LocationId::Tick(_, _) => panic!(),
269 LocationId::ExternalProcess(_) => panic!(),
270 };
271
272 let mut flow_state = self.flow_state().borrow_mut();
273 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
274
275 let id = *next_id_entry;
276 *next_id_entry += 1;
277 id
278 };
279
280 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
281
282 (
283 TickCycle {
284 completed: false,
285 ident: ident.clone(),
286 expected_location: self.id(),
287 _phantom: PhantomData,
288 },
289 S::create_source(ident, initial, self.clone()),
290 )
291 }
292}