hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use super::compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use super::deploy::{DeployFlow, DeployResult};
10#[cfg(feature = "build")]
11use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
12use super::ir::HydroRoot;
13use crate::location::{Cluster, External, Process};
14#[cfg(feature = "sim")]
15#[cfg(stageleft_runtime)]
16use crate::sim::flow::SimFlow;
17use crate::staging_util::Invariant;
18
19crate::newtype_counter! {
20    /// Counter for generating unique external output identifiers.
21    pub struct ExternalPortId(usize);
22}
23
24pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
25
26pub(crate) struct FlowStateInner {
27    /// Tracks the roots of the dataflow IR. This is referenced by
28    /// `Stream` and `HfCycle` to build the IR. The inner option will
29    /// be set to `None` when this builder is finalized.
30    pub(crate) roots: Option<Vec<HydroRoot>>,
31
32    /// Counter for generating unique external output identifiers.
33    pub(crate) next_external_port: ExternalPortId,
34
35    /// Counters for generating identifiers for cycles.
36    pub(crate) cycle_counts: usize,
37
38    /// Counters for clock IDs.
39    pub(crate) next_clock_id: usize,
40}
41
42impl FlowStateInner {
43    pub fn next_cycle_id(&mut self) -> usize {
44        let id = self.cycle_counts;
45        self.cycle_counts += 1;
46        id
47    }
48
49    pub fn push_root(&mut self, root: HydroRoot) {
50        self.roots
51            .as_mut()
52            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
53            .push(root);
54    }
55}
56
57#[expect(missing_docs, reason = "TODO")]
58pub struct FlowBuilder<'a> {
59    flow_state: FlowState,
60    processes: RefCell<Vec<(usize, String)>>,
61    clusters: RefCell<Vec<(usize, String)>>,
62    externals: RefCell<Vec<(usize, String)>>,
63
64    next_location_id: RefCell<usize>,
65
66    /// Tracks whether this flow has been finalized; it is an error to
67    /// drop without finalizing.
68    finalized: bool,
69
70    /// 'a on a FlowBuilder is used to ensure that staged code does not
71    /// capture more data that it is allowed to; 'a is generated at the
72    /// entrypoint of the staged code and we keep it invariant here
73    /// to enforce the appropriate constraints
74    _phantom: Invariant<'a>,
75}
76
77impl Drop for FlowBuilder<'_> {
78    fn drop(&mut self) {
79        if !self.finalized && !std::thread::panicking() {
80            panic!(
81                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
82            );
83        }
84    }
85}
86
87#[expect(missing_docs, reason = "TODO")]
88impl<'a> FlowBuilder<'a> {
89    #[expect(
90        clippy::new_without_default,
91        reason = "call `new` explicitly, not `default`"
92    )]
93    pub fn new() -> FlowBuilder<'a> {
94        FlowBuilder {
95            flow_state: Rc::new(RefCell::new(FlowStateInner {
96                roots: Some(vec![]),
97                next_external_port: ExternalPortId(0),
98                cycle_counts: 0,
99                next_clock_id: 0,
100            })),
101            processes: RefCell::new(vec![]),
102            clusters: RefCell::new(vec![]),
103            externals: RefCell::new(vec![]),
104            next_location_id: RefCell::new(0),
105            finalized: false,
106            _phantom: PhantomData,
107        }
108    }
109
110    pub(crate) fn flow_state(&self) -> &FlowState {
111        &self.flow_state
112    }
113
114    pub fn process<P>(&self) -> Process<'a, P> {
115        let mut next_location_id = self.next_location_id.borrow_mut();
116        let id = *next_location_id;
117        *next_location_id += 1;
118
119        self.processes
120            .borrow_mut()
121            .push((id, type_name::<P>().to_string()));
122
123        Process {
124            id,
125            flow_state: self.flow_state().clone(),
126            _phantom: PhantomData,
127        }
128    }
129
130    pub fn external<P>(&self) -> External<'a, P> {
131        let mut next_location_id = self.next_location_id.borrow_mut();
132        let id = *next_location_id;
133        *next_location_id += 1;
134
135        self.externals
136            .borrow_mut()
137            .push((id, type_name::<P>().to_string()));
138
139        External {
140            id,
141            flow_state: self.flow_state().clone(),
142            _phantom: PhantomData,
143        }
144    }
145
146    pub fn cluster<C>(&self) -> Cluster<'a, C> {
147        let mut next_location_id = self.next_location_id.borrow_mut();
148        let id = *next_location_id;
149        *next_location_id += 1;
150
151        self.clusters
152            .borrow_mut()
153            .push((id, type_name::<C>().to_string()));
154
155        Cluster {
156            id,
157            flow_state: self.flow_state().clone(),
158            _phantom: PhantomData,
159        }
160    }
161}
162
163#[cfg(feature = "build")]
164#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
165#[expect(missing_docs, reason = "TODO")]
166impl<'a> FlowBuilder<'a> {
167    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
168        self.finalized = true;
169
170        super::built::BuiltFlow {
171            ir: self.flow_state.borrow_mut().roots.take().unwrap(),
172            process_id_name: self.processes.replace(vec![]),
173            cluster_id_name: self.clusters.replace(vec![]),
174            external_id_name: self.externals.replace(vec![]),
175            next_location_id: *self.next_location_id.borrow(),
176            _phantom: PhantomData,
177        }
178    }
179
180    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
181        self.finalize().with_default_optimize()
182    }
183
184    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
185        self.finalize().optimize_with(f)
186    }
187
188    pub fn with_process<P, D: Deploy<'a>>(
189        self,
190        process: &Process<P>,
191        spec: impl IntoProcessSpec<'a, D>,
192    ) -> DeployFlow<'a, D> {
193        self.with_default_optimize().with_process(process, spec)
194    }
195
196    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
197        self,
198        spec: impl Fn() -> S,
199    ) -> DeployFlow<'a, D> {
200        self.with_default_optimize().with_remaining_processes(spec)
201    }
202
203    pub fn with_external<P, D: Deploy<'a>>(
204        self,
205        process: &External<P>,
206        spec: impl ExternalSpec<'a, D>,
207    ) -> DeployFlow<'a, D> {
208        self.with_default_optimize().with_external(process, spec)
209    }
210
211    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
212        self,
213        spec: impl Fn() -> S,
214    ) -> DeployFlow<'a, D> {
215        self.with_default_optimize().with_remaining_externals(spec)
216    }
217
218    pub fn with_cluster<C, D: Deploy<'a>>(
219        self,
220        cluster: &Cluster<C>,
221        spec: impl ClusterSpec<'a, D>,
222    ) -> DeployFlow<'a, D> {
223        self.with_default_optimize().with_cluster(cluster, spec)
224    }
225
226    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
227        self,
228        spec: impl Fn() -> S,
229    ) -> DeployFlow<'a, D> {
230        self.with_default_optimize().with_remaining_clusters(spec)
231    }
232
233    pub fn compile<D: Deploy<'a>>(self) -> CompiledFlow<'a> {
234        self.with_default_optimize::<D>().compile()
235    }
236
237    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
238        self.with_default_optimize().deploy(env)
239    }
240
241    #[cfg(feature = "sim")]
242    /// Creates a simulation for this builder, which can be used to run deterministic simulations
243    /// of the Hydro program.
244    pub fn sim(self) -> SimFlow<'a> {
245        self.finalize().sim()
246    }
247
248    pub fn rewritten_ir_builder<'b>(built: &super::built::BuiltFlow) -> RewriteIrFlowBuilder<'b> {
249        RewriteIrFlowBuilder {
250            builder: FlowBuilder {
251                flow_state: Rc::new(RefCell::new(FlowStateInner {
252                    roots: None,
253                    next_external_port: ExternalPortId(0),
254                    cycle_counts: 0,
255                    next_clock_id: 0,
256                })),
257                processes: RefCell::new(built.process_id_name.clone()),
258                clusters: RefCell::new(built.cluster_id_name.clone()),
259                externals: RefCell::new(built.external_id_name.clone()),
260                next_location_id: RefCell::new(built.next_location_id),
261                finalized: false,
262                _phantom: PhantomData,
263            },
264        }
265    }
266}
267
268#[expect(missing_docs, reason = "TODO")]
269pub struct RewriteIrFlowBuilder<'a> {
270    builder: FlowBuilder<'a>,
271}
272
273#[expect(missing_docs, reason = "TODO")]
274impl<'a> RewriteIrFlowBuilder<'a> {
275    pub fn build_with(
276        self,
277        thunk: impl FnOnce(&FlowBuilder<'a>) -> Vec<HydroRoot>,
278    ) -> FlowBuilder<'a> {
279        let roots = thunk(&self.builder);
280        self.builder.flow_state().borrow_mut().roots = Some(roots);
281        self.builder
282    }
283}