1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use super::compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use super::deploy::{DeployFlow, DeployResult};
10#[cfg(feature = "build")]
11use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
12use super::ir::HydroRoot;
13use crate::location::{Cluster, External, Process};
14use crate::staging_util::Invariant;
15
16pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
17
18pub(crate) struct FlowStateInner {
19 pub(crate) roots: Option<Vec<HydroRoot>>,
23
24 pub(crate) next_external_out: usize,
26
27 pub(crate) cycle_counts: usize,
29
30 pub(crate) next_clock_id: usize,
32}
33
34impl FlowStateInner {
35 pub fn next_cycle_id(&mut self) -> usize {
36 let id = self.cycle_counts;
37 self.cycle_counts += 1;
38 id
39 }
40
41 pub fn push_root(&mut self, root: HydroRoot) {
42 self.roots
43 .as_mut()
44 .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
45 .push(root);
46 }
47}
48
49#[expect(missing_docs, reason = "TODO")]
50pub struct FlowBuilder<'a> {
51 flow_state: FlowState,
52 processes: RefCell<Vec<(usize, String)>>,
53 clusters: RefCell<Vec<(usize, String)>>,
54 externals: RefCell<Vec<(usize, String)>>,
55
56 next_location_id: RefCell<usize>,
57
58 finalized: bool,
61
62 _phantom: Invariant<'a>,
67}
68
69impl Drop for FlowBuilder<'_> {
70 fn drop(&mut self) {
71 if !self.finalized {
72 panic!(
73 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
74 );
75 }
76 }
77}
78
79#[expect(missing_docs, reason = "TODO")]
80impl<'a> FlowBuilder<'a> {
81 #[expect(
82 clippy::new_without_default,
83 reason = "call `new` explicitly, not `default`"
84 )]
85 pub fn new() -> FlowBuilder<'a> {
86 FlowBuilder {
87 flow_state: Rc::new(RefCell::new(FlowStateInner {
88 roots: Some(vec![]),
89 next_external_out: 0,
90 cycle_counts: 0,
91 next_clock_id: 0,
92 })),
93 processes: RefCell::new(vec![]),
94 clusters: RefCell::new(vec![]),
95 externals: RefCell::new(vec![]),
96 next_location_id: RefCell::new(0),
97 finalized: false,
98 _phantom: PhantomData,
99 }
100 }
101
102 pub fn rewritten_ir_builder<'b>(&self) -> RewriteIrFlowBuilder<'b> {
103 let processes = self.processes.borrow().clone();
104 let clusters = self.clusters.borrow().clone();
105 let externals = self.externals.borrow().clone();
106 let next_location_id = *self.next_location_id.borrow();
107 RewriteIrFlowBuilder {
108 builder: FlowBuilder {
109 flow_state: Rc::new(RefCell::new(FlowStateInner {
110 roots: None,
111 next_external_out: 0,
112 cycle_counts: 0,
113 next_clock_id: 0,
114 })),
115 processes: RefCell::new(processes),
116 clusters: RefCell::new(clusters),
117 externals: RefCell::new(externals),
118 next_location_id: RefCell::new(next_location_id),
119 finalized: false,
120 _phantom: PhantomData,
121 },
122 }
123 }
124
125 pub(crate) fn flow_state(&self) -> &FlowState {
126 &self.flow_state
127 }
128
129 pub fn process<P>(&self) -> Process<'a, P> {
130 let mut next_location_id = self.next_location_id.borrow_mut();
131 let id = *next_location_id;
132 *next_location_id += 1;
133
134 self.processes
135 .borrow_mut()
136 .push((id, type_name::<P>().to_string()));
137
138 Process {
139 id,
140 flow_state: self.flow_state().clone(),
141 _phantom: PhantomData,
142 }
143 }
144
145 pub fn external<P>(&self) -> External<'a, P> {
146 let mut next_location_id = self.next_location_id.borrow_mut();
147 let id = *next_location_id;
148 *next_location_id += 1;
149
150 self.externals
151 .borrow_mut()
152 .push((id, type_name::<P>().to_string()));
153
154 External {
155 id,
156 flow_state: self.flow_state().clone(),
157 _phantom: PhantomData,
158 }
159 }
160
161 pub fn cluster<C>(&self) -> Cluster<'a, C> {
162 let mut next_location_id = self.next_location_id.borrow_mut();
163 let id = *next_location_id;
164 *next_location_id += 1;
165
166 self.clusters
167 .borrow_mut()
168 .push((id, type_name::<C>().to_string()));
169
170 Cluster {
171 id,
172 flow_state: self.flow_state().clone(),
173 _phantom: PhantomData,
174 }
175 }
176}
177
178#[cfg(feature = "build")]
179#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
180#[expect(missing_docs, reason = "TODO")]
181impl<'a> FlowBuilder<'a> {
182 pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
183 self.finalized = true;
184
185 super::built::BuiltFlow {
186 ir: self.flow_state.borrow_mut().roots.take().unwrap(),
187 process_id_name: self.processes.replace(vec![]),
188 cluster_id_name: self.clusters.replace(vec![]),
189 external_id_name: self.externals.replace(vec![]),
190 _phantom: PhantomData,
191 }
192 }
193
194 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
195 self.finalize().with_default_optimize()
196 }
197
198 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
199 self.finalize().optimize_with(f)
200 }
201
202 pub fn with_process<P, D: Deploy<'a>>(
203 self,
204 process: &Process<P>,
205 spec: impl IntoProcessSpec<'a, D>,
206 ) -> DeployFlow<'a, D> {
207 self.with_default_optimize().with_process(process, spec)
208 }
209
210 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
211 self,
212 spec: impl Fn() -> S,
213 ) -> DeployFlow<'a, D> {
214 self.with_default_optimize().with_remaining_processes(spec)
215 }
216
217 pub fn with_external<P, D: Deploy<'a>>(
218 self,
219 process: &External<P>,
220 spec: impl ExternalSpec<'a, D>,
221 ) -> DeployFlow<'a, D> {
222 self.with_default_optimize().with_external(process, spec)
223 }
224
225 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
226 self,
227 spec: impl Fn() -> S,
228 ) -> DeployFlow<'a, D> {
229 self.with_default_optimize().with_remaining_externals(spec)
230 }
231
232 pub fn with_cluster<C, D: Deploy<'a>>(
233 self,
234 cluster: &Cluster<C>,
235 spec: impl ClusterSpec<'a, D>,
236 ) -> DeployFlow<'a, D> {
237 self.with_default_optimize().with_cluster(cluster, spec)
238 }
239
240 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
241 self,
242 spec: impl Fn() -> S,
243 ) -> DeployFlow<'a, D> {
244 self.with_default_optimize().with_remaining_clusters(spec)
245 }
246
247 pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
248 self.with_default_optimize::<D>().compile(env)
249 }
250
251 pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
252 self.with_default_optimize::<D>().compile_no_network()
253 }
254
255 pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
256 self,
257 env: &mut D::InstantiateEnv,
258 ) -> DeployResult<'a, D> {
259 self.with_default_optimize().deploy(env)
260 }
261}
262
263#[expect(missing_docs, reason = "TODO")]
264pub struct RewriteIrFlowBuilder<'a> {
265 builder: FlowBuilder<'a>,
266}
267
268#[expect(missing_docs, reason = "TODO")]
269impl<'a> RewriteIrFlowBuilder<'a> {
270 pub fn build_with(
271 self,
272 thunk: impl FnOnce(&FlowBuilder<'a>) -> Vec<HydroRoot>,
273 ) -> FlowBuilder<'a> {
274 let roots = thunk(&self.builder);
275 self.builder.flow_state().borrow_mut().roots = Some(roots);
276 self.builder
277 }
278}