1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use deploy::{DeployFlow, DeployResult};
10use stageleft::*;
11
12#[cfg(feature = "build")]
13use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use crate::ir::HydroLeaf;
15use crate::location::{Cluster, External, Process};
16use crate::staging_util::Invariant;
17
18#[cfg(feature = "build")]
19#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
20pub mod built;
21#[cfg(feature = "build")]
22#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
23pub mod compiled;
24#[cfg(feature = "build")]
25#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
26pub mod deploy;
27
28pub struct FlowStateInner {
29 pub(crate) leaves: Option<Vec<HydroLeaf>>,
33
34 pub(crate) next_external_out: usize,
36
37 pub(crate) cycle_counts: usize,
39
40 pub(crate) next_clock_id: usize,
42
43 pub(crate) next_node_id: usize,
45}
46
47impl FlowStateInner {
48 pub fn next_cycle_id(&mut self) -> usize {
49 let id = self.cycle_counts;
50 self.cycle_counts += 1;
51 id
52 }
53}
54
55pub type FlowState = Rc<RefCell<FlowStateInner>>;
56
57pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
58
59pub struct RewriteIrFlowBuilder<'a> {
60 builder: FlowBuilder<'a>,
61}
62
63impl<'a> RewriteIrFlowBuilder<'a> {
64 pub fn build_with(
65 self,
66 thunk: impl FnOnce(&FlowBuilder<'a>) -> Vec<HydroLeaf>,
67 ) -> FlowBuilder<'a> {
68 let leaves = thunk(&self.builder);
69 self.builder.flow_state().borrow_mut().leaves = Some(leaves);
70 self.builder
71 }
72}
73
74pub struct FlowBuilder<'a> {
75 flow_state: FlowState,
76 processes: RefCell<Vec<(usize, String)>>,
77 clusters: RefCell<Vec<(usize, String)>>,
78 externals: RefCell<Vec<(usize, String)>>,
79
80 next_location_id: RefCell<usize>,
81
82 finalized: bool,
85
86 _phantom: Invariant<'a>,
91}
92
93impl Drop for FlowBuilder<'_> {
94 fn drop(&mut self) {
95 if !self.finalized {
96 panic!(
97 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
98 );
99 }
100 }
101}
102
103impl QuotedContext for FlowBuilder<'_> {
104 fn create() -> Self {
105 FlowBuilder::new()
106 }
107}
108
109impl<'a> FlowBuilder<'a> {
110 #[expect(
111 clippy::new_without_default,
112 reason = "call `new` explicitly, not `default`"
113 )]
114 pub fn new() -> FlowBuilder<'a> {
115 FlowBuilder {
116 flow_state: Rc::new(RefCell::new(FlowStateInner {
117 leaves: Some(vec![]),
118 next_external_out: 0,
119 cycle_counts: 0,
120 next_clock_id: 0,
121 next_node_id: 0,
122 })),
123 processes: RefCell::new(vec![]),
124 clusters: RefCell::new(vec![]),
125 externals: RefCell::new(vec![]),
126 next_location_id: RefCell::new(0),
127 finalized: false,
128 _phantom: PhantomData,
129 }
130 }
131
132 pub fn rewritten_ir_builder<'b>(&self) -> RewriteIrFlowBuilder<'b> {
133 let processes = self.processes.borrow().clone();
134 let clusters = self.clusters.borrow().clone();
135 let externals = self.externals.borrow().clone();
136 let next_location_id = *self.next_location_id.borrow();
137 RewriteIrFlowBuilder {
138 builder: FlowBuilder {
139 flow_state: Rc::new(RefCell::new(FlowStateInner {
140 leaves: None,
141 next_external_out: 0,
142 cycle_counts: 0,
143 next_clock_id: 0,
144 next_node_id: 0,
145 })),
146 processes: RefCell::new(processes),
147 clusters: RefCell::new(clusters),
148 externals: RefCell::new(externals),
149 next_location_id: RefCell::new(next_location_id),
150 finalized: false,
151 _phantom: PhantomData,
152 },
153 }
154 }
155
156 pub fn flow_state(&self) -> &FlowState {
157 &self.flow_state
158 }
159
160 pub fn process<P>(&self) -> Process<'a, P> {
161 let mut next_location_id = self.next_location_id.borrow_mut();
162 let id = *next_location_id;
163 *next_location_id += 1;
164
165 self.processes
166 .borrow_mut()
167 .push((id, type_name::<P>().to_string()));
168
169 Process {
170 id,
171 flow_state: self.flow_state().clone(),
172 _phantom: PhantomData,
173 }
174 }
175
176 pub fn external<P>(&self) -> External<'a, P> {
177 let mut next_location_id = self.next_location_id.borrow_mut();
178 let id = *next_location_id;
179 *next_location_id += 1;
180
181 self.externals
182 .borrow_mut()
183 .push((id, type_name::<P>().to_string()));
184
185 External {
186 id,
187 flow_state: self.flow_state().clone(),
188 _phantom: PhantomData,
189 }
190 }
191
192 pub fn cluster<C>(&self) -> Cluster<'a, C> {
193 let mut next_location_id = self.next_location_id.borrow_mut();
194 let id = *next_location_id;
195 *next_location_id += 1;
196
197 self.clusters
198 .borrow_mut()
199 .push((id, type_name::<C>().to_string()));
200
201 Cluster {
202 id,
203 flow_state: self.flow_state().clone(),
204 _phantom: PhantomData,
205 }
206 }
207}
208
209#[cfg(feature = "build")]
210#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
211impl<'a> FlowBuilder<'a> {
212 pub fn finalize(mut self) -> built::BuiltFlow<'a> {
213 self.finalized = true;
214
215 built::BuiltFlow {
216 ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
217 process_id_name: self.processes.replace(vec![]),
218 cluster_id_name: self.clusters.replace(vec![]),
219 external_id_name: self.externals.replace(vec![]),
220 _phantom: PhantomData,
221 }
222 }
223
224 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
225 self.finalize().with_default_optimize()
226 }
227
228 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
229 self.finalize().optimize_with(f)
230 }
231
232 pub fn with_process<P, D: Deploy<'a>>(
233 self,
234 process: &Process<P>,
235 spec: impl IntoProcessSpec<'a, D>,
236 ) -> DeployFlow<'a, D> {
237 self.with_default_optimize().with_process(process, spec)
238 }
239
240 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
241 self,
242 spec: impl Fn() -> S,
243 ) -> DeployFlow<'a, D> {
244 self.with_default_optimize().with_remaining_processes(spec)
245 }
246
247 pub fn with_external<P, D: Deploy<'a>>(
248 self,
249 process: &External<P>,
250 spec: impl ExternalSpec<'a, D>,
251 ) -> DeployFlow<'a, D> {
252 self.with_default_optimize().with_external(process, spec)
253 }
254
255 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
256 self,
257 spec: impl Fn() -> S,
258 ) -> DeployFlow<'a, D> {
259 self.with_default_optimize().with_remaining_externals(spec)
260 }
261
262 pub fn with_cluster<C, D: Deploy<'a>>(
263 self,
264 cluster: &Cluster<C>,
265 spec: impl ClusterSpec<'a, D>,
266 ) -> DeployFlow<'a, D> {
267 self.with_default_optimize().with_cluster(cluster, spec)
268 }
269
270 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
271 self,
272 spec: impl Fn() -> S,
273 ) -> DeployFlow<'a, D> {
274 self.with_default_optimize().with_remaining_clusters(spec)
275 }
276
277 pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
278 self.with_default_optimize::<D>().compile(env)
279 }
280
281 pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
282 self.with_default_optimize::<D>().compile_no_network()
283 }
284
285 pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
286 self,
287 env: &mut D::InstantiateEnv,
288 ) -> DeployResult<'a, D> {
289 self.with_default_optimize().deploy(env)
290 }
291}