1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use deploy::{DeployFlow, DeployResult};
10use stageleft::*;
11
12#[cfg(feature = "build")]
13use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use crate::ir::HydroLeaf;
15use crate::location::{Cluster, ExternalProcess, Process};
16use crate::staging_util::Invariant;
17
18#[cfg(feature = "build")]
19#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
20pub mod built;
21#[cfg(feature = "build")]
22#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
23pub mod compiled;
24#[cfg(feature = "build")]
25#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
26pub mod deploy;
27
28pub struct FlowStateInner {
29 pub(crate) leaves: Option<Vec<HydroLeaf>>,
33
34 pub(crate) next_external_out: usize,
36
37 pub(crate) cycle_counts: usize,
39
40 pub(crate) next_clock_id: usize,
42
43 pub(crate) next_node_id: usize,
45}
46
47impl FlowStateInner {
48 pub fn next_cycle_id(&mut self) -> usize {
49 let id = self.cycle_counts;
50 self.cycle_counts += 1;
51 id
52 }
53}
54
55pub type FlowState = Rc<RefCell<FlowStateInner>>;
56
57pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
58
59pub struct FlowBuilder<'a> {
60 flow_state: FlowState,
61 processes: RefCell<Vec<(usize, String)>>,
62 clusters: RefCell<Vec<(usize, String)>>,
63 externals: RefCell<Vec<(usize, String)>>,
64
65 next_location_id: RefCell<usize>,
66
67 finalized: bool,
70
71 _phantom: Invariant<'a>,
76}
77
78impl Drop for FlowBuilder<'_> {
79 fn drop(&mut self) {
80 if !self.finalized {
81 panic!(
82 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
83 );
84 }
85 }
86}
87
88impl QuotedContext for FlowBuilder<'_> {
89 fn create() -> Self {
90 FlowBuilder::new()
91 }
92}
93
94impl<'a> FlowBuilder<'a> {
95 #[expect(
96 clippy::new_without_default,
97 reason = "call `new` explicitly, not `default`"
98 )]
99 pub fn new() -> FlowBuilder<'a> {
100 FlowBuilder {
101 flow_state: Rc::new(RefCell::new(FlowStateInner {
102 leaves: Some(vec![]),
103 next_external_out: 0,
104 cycle_counts: 0,
105 next_clock_id: 0,
106 next_node_id: 0,
107 })),
108 processes: RefCell::new(vec![]),
109 clusters: RefCell::new(vec![]),
110 externals: RefCell::new(vec![]),
111 next_location_id: RefCell::new(0),
112 finalized: false,
113 _phantom: PhantomData,
114 }
115 }
116
117 pub fn flow_state(&self) -> &FlowState {
118 &self.flow_state
119 }
120
121 pub fn process<P>(&self) -> Process<'a, P> {
122 let mut next_location_id = self.next_location_id.borrow_mut();
123 let id = *next_location_id;
124 *next_location_id += 1;
125
126 self.processes
127 .borrow_mut()
128 .push((id, type_name::<P>().to_string()));
129
130 Process {
131 id,
132 flow_state: self.flow_state().clone(),
133 _phantom: PhantomData,
134 }
135 }
136
137 pub fn external_process<P>(&self) -> ExternalProcess<'a, P> {
138 let mut next_location_id = self.next_location_id.borrow_mut();
139 let id = *next_location_id;
140 *next_location_id += 1;
141
142 self.externals
143 .borrow_mut()
144 .push((id, type_name::<P>().to_string()));
145
146 ExternalProcess {
147 id,
148 flow_state: self.flow_state().clone(),
149 _phantom: PhantomData,
150 }
151 }
152
153 pub fn cluster<C>(&self) -> Cluster<'a, C> {
154 let mut next_location_id = self.next_location_id.borrow_mut();
155 let id = *next_location_id;
156 *next_location_id += 1;
157
158 self.clusters
159 .borrow_mut()
160 .push((id, type_name::<C>().to_string()));
161
162 Cluster {
163 id,
164 flow_state: self.flow_state().clone(),
165 _phantom: PhantomData,
166 }
167 }
168}
169
170#[cfg(feature = "build")]
171#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
172impl<'a> FlowBuilder<'a> {
173 pub fn finalize(mut self) -> built::BuiltFlow<'a> {
174 self.finalized = true;
175
176 built::BuiltFlow {
177 ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
178 process_id_name: self.processes.replace(vec![]),
179 cluster_id_name: self.clusters.replace(vec![]),
180 external_id_name: self.externals.replace(vec![]),
181 _phantom: PhantomData,
182 }
183 }
184
185 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
186 self.finalize().with_default_optimize()
187 }
188
189 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
190 self.finalize().optimize_with(f)
191 }
192
193 pub fn with_process<P, D: Deploy<'a>>(
194 self,
195 process: &Process<P>,
196 spec: impl IntoProcessSpec<'a, D>,
197 ) -> DeployFlow<'a, D> {
198 self.with_default_optimize().with_process(process, spec)
199 }
200
201 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
202 self,
203 spec: impl Fn() -> S,
204 ) -> DeployFlow<'a, D> {
205 self.with_default_optimize().with_remaining_processes(spec)
206 }
207
208 pub fn with_external<P, D: Deploy<'a>>(
209 self,
210 process: &ExternalProcess<P>,
211 spec: impl ExternalSpec<'a, D>,
212 ) -> DeployFlow<'a, D> {
213 self.with_default_optimize().with_external(process, spec)
214 }
215
216 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
217 self,
218 spec: impl Fn() -> S,
219 ) -> DeployFlow<'a, D> {
220 self.with_default_optimize().with_remaining_externals(spec)
221 }
222
223 pub fn with_cluster<C, D: Deploy<'a>>(
224 self,
225 cluster: &Cluster<C>,
226 spec: impl ClusterSpec<'a, D>,
227 ) -> DeployFlow<'a, D> {
228 self.with_default_optimize().with_cluster(cluster, spec)
229 }
230
231 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
232 self,
233 spec: impl Fn() -> S,
234 ) -> DeployFlow<'a, D> {
235 self.with_default_optimize().with_remaining_clusters(spec)
236 }
237
238 pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
239 self.with_default_optimize::<D>().compile(env)
240 }
241
242 pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
243 self.with_default_optimize::<D>().compile_no_network()
244 }
245
246 pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
247 self,
248 env: &mut D::InstantiateEnv,
249 ) -> DeployResult<'a, D> {
250 self.with_default_optimize().deploy(env)
251 }
252}