1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16
17pub enum Sidecar {
20 Simple {
22 location_key: LocationKey,
23 future_expr: Box<syn::Expr>,
24 },
25 Bidi {
29 location_key: LocationKey,
30 sidecar_id: SidecarId,
31 sidecar_closure: Box<syn::Expr>,
32 },
33}
34#[cfg(feature = "sim")]
35#[cfg(stageleft_runtime)]
36use crate::sim::flow::SimFlow;
37use crate::staging_util::Invariant;
38
39#[stageleft::export(ExternalPortId, CycleId, ClockId, SidecarId, StmtId, HandoffId)]
40crate::newtype_counter! {
41 pub struct ExternalPortId(usize);
43
44 pub struct CycleId(usize);
46
47 pub struct ClockId(usize);
49
50 pub struct SidecarId(usize);
52
53 pub struct StmtId(usize);
55
56 pub struct HandoffId(usize);
58}
59
60impl CycleId {
61 #[cfg(feature = "build")]
62 pub(crate) fn as_ident(&self) -> syn::Ident {
63 syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
64 }
65}
66
67impl SidecarId {
68 pub fn idents(&self) -> (syn::Ident, syn::Ident) {
70 let span = proc_macro2::Span::call_site();
71 (
72 syn::Ident::new(&format!("__hydro_sidecar_{}_stream", self), span),
73 syn::Ident::new(&format!("__hydro_sidecar_{}_sink", self), span),
74 )
75 }
76}
77
78pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
79
80pub(crate) struct FlowStateInner {
81 roots: Option<Vec<HydroRoot>>,
85
86 next_external_port: ExternalPortId,
88
89 next_cycle_id: CycleId,
91
92 next_clock_id: ClockId,
94
95 next_sidecar_id: SidecarId,
97
98 pub sidecars: Vec<Sidecar>,
101}
102
103impl FlowStateInner {
104 pub fn next_external_port(&mut self) -> ExternalPortId {
105 self.next_external_port.get_and_increment()
106 }
107
108 pub fn next_cycle_id(&mut self) -> CycleId {
109 self.next_cycle_id.get_and_increment()
110 }
111
112 pub fn next_clock_id(&mut self) -> ClockId {
113 self.next_clock_id.get_and_increment()
114 }
115
116 pub fn next_sidecar_id(&mut self) -> SidecarId {
117 self.next_sidecar_id.get_and_increment()
118 }
119
120 pub fn push_root(&mut self, root: HydroRoot) {
121 self.roots
122 .as_mut()
123 .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
124 .push(root);
125 }
126
127 pub fn try_push_root(&mut self, root: HydroRoot) {
128 if let Some(roots) = self.roots.as_mut() {
129 roots.push(root);
130 }
131 }
132}
133
134pub struct FlowBuilder<'a> {
135 flow_state: FlowState,
137
138 locations: SlotMap<LocationKey, LocationType>,
140 location_names: SecondaryMap<LocationKey, String>,
142
143 #[cfg_attr(
145 not(feature = "build"),
146 expect(dead_code, reason = "unused without build")
147 )]
148 flow_name: String,
149
150 finalized: bool,
153
154 _phantom: Invariant<'a>,
159}
160
161impl Drop for FlowBuilder<'_> {
162 fn drop(&mut self) {
163 if !self.finalized && !std::thread::panicking() {
164 panic!(
165 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
166 );
167 }
168 }
169}
170
171#[expect(missing_docs, reason = "TODO")]
172impl<'a> FlowBuilder<'a> {
173 #[expect(
175 clippy::new_without_default,
176 reason = "call `new` explicitly, not `default`"
177 )]
178 pub fn new() -> Self {
179 let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
180 if let Ok(bin_path) = std::env::current_exe()
181 && let Some(bin_name) = bin_path.file_stem()
182 {
183 name = format!("{}/{}", name, bin_name.display());
184 }
185 Self::with_name(name)
186 }
187
188 pub fn with_name(name: impl Into<String>) -> Self {
190 Self {
191 flow_state: Rc::new(RefCell::new(FlowStateInner {
192 roots: Some(vec![]),
193 next_external_port: ExternalPortId::default(),
194 next_cycle_id: CycleId::default(),
195 next_clock_id: ClockId::default(),
196 next_sidecar_id: SidecarId::default(),
197 sidecars: Vec::new(),
198 })),
199 locations: SlotMap::with_key(),
200 location_names: SecondaryMap::new(),
201 flow_name: name.into(),
202 finalized: false,
203 _phantom: PhantomData,
204 }
205 }
206
207 pub(crate) fn flow_state(&self) -> &FlowState {
208 &self.flow_state
209 }
210
211 pub fn process<P>(&mut self) -> Process<'a, P> {
212 let key = self.locations.insert(LocationType::Process);
213 self.location_names.insert(key, type_name::<P>().to_owned());
214 Process {
215 key,
216 flow_state: self.flow_state().clone(),
217 _phantom: PhantomData,
218 }
219 }
220
221 pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
222 let key = self.locations.insert(LocationType::Cluster);
223 self.location_names.insert(key, type_name::<C>().to_owned());
224 Cluster {
225 key,
226 flow_state: self.flow_state().clone(),
227 _phantom: PhantomData,
228 }
229 }
230
231 pub fn external<E>(&mut self) -> External<'a, E> {
232 let key = self.locations.insert(LocationType::External);
233 self.location_names.insert(key, type_name::<E>().to_owned());
234 External {
235 key,
236 flow_state: self.flow_state().clone(),
237 _phantom: PhantomData,
238 }
239 }
240}
241
242#[cfg(feature = "build")]
243#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
244#[expect(missing_docs, reason = "TODO")]
245impl<'a> FlowBuilder<'a> {
246 pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
247 self.finalized = true;
248
249 let mut flow_state = self.flow_state.borrow_mut();
250 let mut ir = flow_state.roots.take().unwrap();
251 let sidecars = std::mem::take(&mut flow_state.sidecars);
252 drop(flow_state);
253
254 super::ir::unify_atomic_ticks(&mut ir);
255
256 super::built::BuiltFlow {
257 ir,
258 locations: std::mem::take(&mut self.locations),
259 location_names: std::mem::take(&mut self.location_names),
260 sidecars,
261 flow_name: std::mem::take(&mut self.flow_name),
262 _phantom: PhantomData,
263 }
264 }
265
266 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
267 self.finalize().with_default_optimize()
268 }
269
270 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
271 self.finalize().optimize_with(f)
272 }
273
274 pub fn with_process<P, D: Deploy<'a>>(
275 self,
276 process: &Process<P>,
277 spec: impl IntoProcessSpec<'a, D>,
278 ) -> DeployFlow<'a, D> {
279 self.with_default_optimize().with_process(process, spec)
280 }
281
282 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
283 self,
284 spec: impl Fn() -> S,
285 ) -> DeployFlow<'a, D> {
286 self.with_default_optimize().with_remaining_processes(spec)
287 }
288
289 pub fn with_external<P, D: Deploy<'a>>(
290 self,
291 process: &External<P>,
292 spec: impl ExternalSpec<'a, D>,
293 ) -> DeployFlow<'a, D> {
294 self.with_default_optimize().with_external(process, spec)
295 }
296
297 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
298 self,
299 spec: impl Fn() -> S,
300 ) -> DeployFlow<'a, D> {
301 self.with_default_optimize().with_remaining_externals(spec)
302 }
303
304 pub fn with_cluster<C, D: Deploy<'a>>(
305 self,
306 cluster: &Cluster<C>,
307 spec: impl ClusterSpec<'a, D>,
308 ) -> DeployFlow<'a, D> {
309 self.with_default_optimize().with_cluster(cluster, spec)
310 }
311
312 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
313 self,
314 spec: impl Fn() -> S,
315 ) -> DeployFlow<'a, D> {
316 self.with_default_optimize().with_remaining_clusters(spec)
317 }
318
319 pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
320 self.with_default_optimize::<D>().compile()
321 }
322
323 pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
324 self.with_default_optimize().deploy(env)
325 }
326
327 #[cfg(feature = "sim")]
328 pub fn sim(self) -> SimFlow<'a> {
331 self.finalize().sim()
332 }
333
334 pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
335 FlowBuilder {
336 flow_state: Rc::new(RefCell::new(FlowStateInner {
337 roots: None,
338 next_external_port: ExternalPortId::default(),
339 next_cycle_id: CycleId::default(),
340 next_clock_id: ClockId::default(),
341 next_sidecar_id: SidecarId::default(),
342 sidecars: Vec::new(),
343 })),
344 locations: built.locations.clone(),
345 location_names: built.location_names.clone(),
346 flow_name: built.flow_name.clone(),
347 finalized: false,
348 _phantom: PhantomData,
349 }
350 }
351
352 #[doc(hidden)] pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
354 self.flow_state.borrow_mut().roots = Some(roots);
355 }
356
357 #[doc(hidden)] pub fn next_clock_id(&mut self) -> ClockId {
359 self.flow_state.borrow_mut().next_clock_id()
360 }
361
362 #[doc(hidden)] pub fn next_cycle_id(&mut self) -> CycleId {
364 self.flow_state.borrow_mut().next_cycle_id()
365 }
366}