1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7use super::{Cluster, Location, LocationId, Process};
8use crate::builder::FlowState;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, ForwardRefMarker,
11 TickCycle, TickCycleMarker,
12};
13use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
14use crate::stream::ExactlyOnce;
15use crate::{Bounded, Optional, Singleton, Stream, TotalOrder};
16
17#[sealed]
18pub trait NoTick {}
19#[sealed]
20impl<T> NoTick for Process<'_, T> {}
21#[sealed]
22impl<T> NoTick for Cluster<'_, T> {}
23
24#[sealed]
25pub trait NoAtomic {}
26#[sealed]
27impl<T> NoAtomic for Process<'_, T> {}
28#[sealed]
29impl<T> NoAtomic for Cluster<'_, T> {}
30#[sealed]
31impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
32
33#[derive(Clone)]
34pub struct Atomic<Loc> {
35 pub(crate) tick: Tick<Loc>,
36}
37
38impl<'a, L> Location<'a> for Atomic<L>
39where
40 L: Location<'a>,
41{
42 type Root = L::Root;
43
44 fn root(&self) -> Self::Root {
45 self.tick.root()
46 }
47
48 fn id(&self) -> LocationId {
49 self.tick.id()
50 }
51
52 fn flow_state(&self) -> &FlowState {
53 self.tick.flow_state()
54 }
55
56 fn is_top_level() -> bool {
57 L::is_top_level()
58 }
59}
60
61#[sealed]
62impl<L> NoTick for Atomic<L> {}
63
64#[derive(Clone)]
66pub struct Tick<Loc> {
67 pub(crate) id: usize,
68 pub(crate) l: Loc,
69}
70
71impl<'a, L> Location<'a> for Tick<L>
72where
73 L: Location<'a>,
74{
75 type Root = L::Root;
76
77 fn root(&self) -> Self::Root {
78 self.l.root()
79 }
80
81 fn id(&self) -> LocationId {
82 LocationId::Tick(self.id, Box::new(self.l.id()))
83 }
84
85 fn flow_state(&self) -> &FlowState {
86 self.l.flow_state()
87 }
88
89 fn is_top_level() -> bool {
90 false
91 }
92
93 fn next_node_id(&self) -> usize {
94 self.l.next_node_id()
95 }
96
97 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
98 self.l.new_node_metadata::<T>()
99 }
100}
101
102impl<'a, L> Tick<L>
103where
104 L: Location<'a>,
105{
106 pub fn outer(&self) -> &L {
107 &self.l
108 }
109
110 pub fn spin_batch(
111 &self,
112 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
113 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
114 where
115 L: NoTick + NoAtomic,
116 {
117 let out = self
118 .l
119 .spin()
120 .flat_map_ordered(q!(move |_| 0..batch_size))
121 .map(q!(|_| ()));
122
123 unsafe {
124 out.tick_batch(self)
127 }
128 }
129
130 pub fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, L>) -> Singleton<T, Self, Bounded>
131 where
132 T: Clone,
133 L: NoTick + NoAtomic,
134 {
135 unsafe {
136 self.outer().singleton(e).latest_tick(self)
138 }
139 }
140
141 pub fn optional_first_tick<T: Clone>(
142 &self,
143 e: impl QuotedWithContext<'a, T, Tick<L>>,
144 ) -> Optional<T, Self, Bounded>
145 where
146 L: NoTick + NoAtomic,
147 {
148 let e_arr = q!([e]);
149 let e = e_arr.splice_untyped_ctx(self);
150
151 Optional::new(
152 self.clone(),
153 HydroNode::Source {
154 source: HydroSource::Iter(e.into()),
155 location_kind: self.l.id(),
156 metadata: self.new_node_metadata::<T>(),
157 },
158 )
159 }
160
161 pub fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
162 where
163 S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
164 L: NoTick,
165 {
166 let next_id = self.flow_state().borrow_mut().next_cycle_id();
167 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
168
169 (
170 ForwardRef {
171 completed: false,
172 ident: ident.clone(),
173 expected_location: self.id(),
174 _phantom: PhantomData,
175 },
176 S::create_source(ident, self.clone()),
177 )
178 }
179
180 pub fn forward_ref_atomic<S>(&self) -> (ForwardRef<'a, S>, S)
181 where
182 S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>,
183 {
184 let next_id = self.flow_state().borrow_mut().next_cycle_id();
185 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
186
187 (
188 ForwardRef {
189 completed: false,
190 ident: ident.clone(),
191 expected_location: self.id(),
192 _phantom: PhantomData,
193 },
194 S::create_source(ident, Atomic { tick: self.clone() }),
195 )
196 }
197
198 pub fn cycle<S>(&self) -> (TickCycle<'a, S>, S)
199 where
200 S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick,
201 L: NoTick,
202 {
203 let next_id = self.flow_state().borrow_mut().next_cycle_id();
204 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
205
206 (
207 TickCycle {
208 completed: false,
209 ident: ident.clone(),
210 expected_location: self.id(),
211 _phantom: PhantomData,
212 },
213 S::create_source(ident, self.clone()),
214 )
215 }
216
217 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycle<'a, S>, S)
218 where
219 S: CycleCollectionWithInitial<'a, TickCycleMarker, Location = Self> + DeferTick,
220 L: NoTick,
221 {
222 let next_id = self.flow_state().borrow_mut().next_cycle_id();
223 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
224
225 (
226 TickCycle {
227 completed: false,
228 ident: ident.clone(),
229 expected_location: self.id(),
230 _phantom: PhantomData,
231 },
232 S::create_source(ident, initial, self.clone()),
233 )
234 }
235}