1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16#[cfg(feature = "sim")]
17#[cfg(stageleft_runtime)]
18use crate::sim::flow::SimFlow;
19use crate::staging_util::Invariant;
20
21#[stageleft::export(ExternalPortId, CycleId, ClockId)]
22crate::newtype_counter! {
23 pub struct ExternalPortId(usize);
25
26 pub struct CycleId(usize);
28
29 pub struct ClockId(usize);
31}
32
33impl CycleId {
34 #[cfg(feature = "build")]
35 pub(crate) fn as_ident(&self) -> syn::Ident {
36 syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
37 }
38}
39
40pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
41
42pub(crate) struct FlowStateInner {
43 roots: Option<Vec<HydroRoot>>,
47
48 next_external_port: ExternalPortId,
50
51 next_cycle_id: CycleId,
53
54 next_clock_id: ClockId,
56}
57
58impl FlowStateInner {
59 pub fn next_external_port(&mut self) -> ExternalPortId {
60 self.next_external_port.get_and_increment()
61 }
62
63 pub fn next_cycle_id(&mut self) -> CycleId {
64 self.next_cycle_id.get_and_increment()
65 }
66
67 pub fn next_clock_id(&mut self) -> ClockId {
68 self.next_clock_id.get_and_increment()
69 }
70
71 pub fn push_root(&mut self, root: HydroRoot) {
72 self.roots
73 .as_mut()
74 .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
75 .push(root);
76 }
77
78 pub fn try_push_root(&mut self, root: HydroRoot) {
79 if let Some(roots) = self.roots.as_mut() {
80 roots.push(root);
81 }
82 }
83}
84
85pub struct FlowBuilder<'a> {
86 flow_state: FlowState,
88
89 locations: SlotMap<LocationKey, LocationType>,
91 location_names: SecondaryMap<LocationKey, String>,
93
94 #[cfg_attr(
96 not(feature = "build"),
97 expect(dead_code, reason = "unused without build")
98 )]
99 flow_name: String,
100
101 finalized: bool,
104
105 _phantom: Invariant<'a>,
110}
111
112impl Drop for FlowBuilder<'_> {
113 fn drop(&mut self) {
114 if !self.finalized && !std::thread::panicking() {
115 panic!(
116 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
117 );
118 }
119 }
120}
121
122#[expect(missing_docs, reason = "TODO")]
123impl<'a> FlowBuilder<'a> {
124 #[expect(
126 clippy::new_without_default,
127 reason = "call `new` explicitly, not `default`"
128 )]
129 pub fn new() -> Self {
130 let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
131 if let Ok(bin_path) = std::env::current_exe()
132 && let Some(bin_name) = bin_path.file_stem()
133 {
134 name = format!("{}/{}", name, bin_name.display());
135 }
136 Self::with_name(name)
137 }
138
139 pub fn with_name(name: impl Into<String>) -> Self {
141 Self {
142 flow_state: Rc::new(RefCell::new(FlowStateInner {
143 roots: Some(vec![]),
144 next_external_port: ExternalPortId::default(),
145 next_cycle_id: CycleId::default(),
146 next_clock_id: ClockId::default(),
147 })),
148 locations: SlotMap::with_key(),
149 location_names: SecondaryMap::new(),
150 flow_name: name.into(),
151 finalized: false,
152 _phantom: PhantomData,
153 }
154 }
155
156 pub(crate) fn flow_state(&self) -> &FlowState {
157 &self.flow_state
158 }
159
160 pub fn process<P>(&mut self) -> Process<'a, P> {
161 let key = self.locations.insert(LocationType::Process);
162 self.location_names.insert(key, type_name::<P>().to_owned());
163 Process {
164 key,
165 flow_state: self.flow_state().clone(),
166 _phantom: PhantomData,
167 }
168 }
169
170 pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
171 let key = self.locations.insert(LocationType::Cluster);
172 self.location_names.insert(key, type_name::<C>().to_owned());
173 Cluster {
174 key,
175 flow_state: self.flow_state().clone(),
176 _phantom: PhantomData,
177 }
178 }
179
180 pub fn external<E>(&mut self) -> External<'a, E> {
181 let key = self.locations.insert(LocationType::External);
182 self.location_names.insert(key, type_name::<E>().to_owned());
183 External {
184 key,
185 flow_state: self.flow_state().clone(),
186 _phantom: PhantomData,
187 }
188 }
189}
190
191#[cfg(feature = "build")]
192#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
193#[expect(missing_docs, reason = "TODO")]
194impl<'a> FlowBuilder<'a> {
195 pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
196 self.finalized = true;
197
198 super::built::BuiltFlow {
199 ir: self.flow_state.borrow_mut().roots.take().unwrap(),
200 locations: std::mem::take(&mut self.locations),
201 location_names: std::mem::take(&mut self.location_names),
202 flow_name: std::mem::take(&mut self.flow_name),
203 _phantom: PhantomData,
204 }
205 }
206
207 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
208 self.finalize().with_default_optimize()
209 }
210
211 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
212 self.finalize().optimize_with(f)
213 }
214
215 pub fn with_process<P, D: Deploy<'a>>(
216 self,
217 process: &Process<P>,
218 spec: impl IntoProcessSpec<'a, D>,
219 ) -> DeployFlow<'a, D> {
220 self.with_default_optimize().with_process(process, spec)
221 }
222
223 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
224 self,
225 spec: impl Fn() -> S,
226 ) -> DeployFlow<'a, D> {
227 self.with_default_optimize().with_remaining_processes(spec)
228 }
229
230 pub fn with_external<P, D: Deploy<'a>>(
231 self,
232 process: &External<P>,
233 spec: impl ExternalSpec<'a, D>,
234 ) -> DeployFlow<'a, D> {
235 self.with_default_optimize().with_external(process, spec)
236 }
237
238 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
239 self,
240 spec: impl Fn() -> S,
241 ) -> DeployFlow<'a, D> {
242 self.with_default_optimize().with_remaining_externals(spec)
243 }
244
245 pub fn with_cluster<C, D: Deploy<'a>>(
246 self,
247 cluster: &Cluster<C>,
248 spec: impl ClusterSpec<'a, D>,
249 ) -> DeployFlow<'a, D> {
250 self.with_default_optimize().with_cluster(cluster, spec)
251 }
252
253 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
254 self,
255 spec: impl Fn() -> S,
256 ) -> DeployFlow<'a, D> {
257 self.with_default_optimize().with_remaining_clusters(spec)
258 }
259
260 pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
261 self.with_default_optimize::<D>().compile()
262 }
263
264 pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
265 self.with_default_optimize().deploy(env)
266 }
267
268 #[cfg(feature = "sim")]
269 pub fn sim(self) -> SimFlow<'a> {
272 self.finalize().sim()
273 }
274
275 pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
276 FlowBuilder {
277 flow_state: Rc::new(RefCell::new(FlowStateInner {
278 roots: None,
279 next_external_port: ExternalPortId::default(),
280 next_cycle_id: CycleId::default(),
281 next_clock_id: ClockId::default(),
282 })),
283 locations: built.locations.clone(),
284 location_names: built.location_names.clone(),
285 flow_name: built.flow_name.clone(),
286 finalized: false,
287 _phantom: PhantomData,
288 }
289 }
290
291 #[doc(hidden)] pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
293 self.flow_state.borrow_mut().roots = Some(roots);
294 }
295}