hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use super::compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use super::deploy::{DeployFlow, DeployResult};
10#[cfg(feature = "build")]
11use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
12use super::ir::HydroRoot;
13use crate::location::{Cluster, External, Process};
14#[cfg(feature = "sim")]
15#[cfg(stageleft_runtime)]
16use crate::sim::flow::SimFlow;
17use crate::staging_util::Invariant;
18
19pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
20
21pub(crate) struct FlowStateInner {
22    /// Tracks the roots of the dataflow IR. This is referenced by
23    /// `Stream` and `HfCycle` to build the IR. The inner option will
24    /// be set to `None` when this builder is finalized.
25    pub(crate) roots: Option<Vec<HydroRoot>>,
26
27    /// Counter for generating unique external output identifiers.
28    pub(crate) next_external_out: usize,
29
30    /// Counters for generating identifiers for cycles.
31    pub(crate) cycle_counts: usize,
32
33    /// Counters for clock IDs.
34    pub(crate) next_clock_id: usize,
35}
36
37impl FlowStateInner {
38    pub fn next_cycle_id(&mut self) -> usize {
39        let id = self.cycle_counts;
40        self.cycle_counts += 1;
41        id
42    }
43
44    pub fn push_root(&mut self, root: HydroRoot) {
45        self.roots
46            .as_mut()
47            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
48            .push(root);
49    }
50}
51
52#[expect(missing_docs, reason = "TODO")]
53pub struct FlowBuilder<'a> {
54    flow_state: FlowState,
55    processes: RefCell<Vec<(usize, String)>>,
56    clusters: RefCell<Vec<(usize, String)>>,
57    externals: RefCell<Vec<(usize, String)>>,
58
59    next_location_id: RefCell<usize>,
60
61    /// Tracks whether this flow has been finalized; it is an error to
62    /// drop without finalizing.
63    finalized: bool,
64
65    /// 'a on a FlowBuilder is used to ensure that staged code does not
66    /// capture more data that it is allowed to; 'a is generated at the
67    /// entrypoint of the staged code and we keep it invariant here
68    /// to enforce the appropriate constraints
69    _phantom: Invariant<'a>,
70}
71
72impl Drop for FlowBuilder<'_> {
73    fn drop(&mut self) {
74        if !self.finalized && !std::thread::panicking() {
75            panic!(
76                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
77            );
78        }
79    }
80}
81
82#[expect(missing_docs, reason = "TODO")]
83impl<'a> FlowBuilder<'a> {
84    #[expect(
85        clippy::new_without_default,
86        reason = "call `new` explicitly, not `default`"
87    )]
88    pub fn new() -> FlowBuilder<'a> {
89        FlowBuilder {
90            flow_state: Rc::new(RefCell::new(FlowStateInner {
91                roots: Some(vec![]),
92                next_external_out: 0,
93                cycle_counts: 0,
94                next_clock_id: 0,
95            })),
96            processes: RefCell::new(vec![]),
97            clusters: RefCell::new(vec![]),
98            externals: RefCell::new(vec![]),
99            next_location_id: RefCell::new(0),
100            finalized: false,
101            _phantom: PhantomData,
102        }
103    }
104
105    pub fn rewritten_ir_builder<'b>(&self) -> RewriteIrFlowBuilder<'b> {
106        let processes = self.processes.borrow().clone();
107        let clusters = self.clusters.borrow().clone();
108        let externals = self.externals.borrow().clone();
109        let next_location_id = *self.next_location_id.borrow();
110        RewriteIrFlowBuilder {
111            builder: FlowBuilder {
112                flow_state: Rc::new(RefCell::new(FlowStateInner {
113                    roots: None,
114                    next_external_out: 0,
115                    cycle_counts: 0,
116                    next_clock_id: 0,
117                })),
118                processes: RefCell::new(processes),
119                clusters: RefCell::new(clusters),
120                externals: RefCell::new(externals),
121                next_location_id: RefCell::new(next_location_id),
122                finalized: false,
123                _phantom: PhantomData,
124            },
125        }
126    }
127
128    pub(crate) fn flow_state(&self) -> &FlowState {
129        &self.flow_state
130    }
131
132    pub fn process<P>(&self) -> Process<'a, P> {
133        let mut next_location_id = self.next_location_id.borrow_mut();
134        let id = *next_location_id;
135        *next_location_id += 1;
136
137        self.processes
138            .borrow_mut()
139            .push((id, type_name::<P>().to_string()));
140
141        Process {
142            id,
143            flow_state: self.flow_state().clone(),
144            _phantom: PhantomData,
145        }
146    }
147
148    pub fn external<P>(&self) -> External<'a, P> {
149        let mut next_location_id = self.next_location_id.borrow_mut();
150        let id = *next_location_id;
151        *next_location_id += 1;
152
153        self.externals
154            .borrow_mut()
155            .push((id, type_name::<P>().to_string()));
156
157        External {
158            id,
159            flow_state: self.flow_state().clone(),
160            _phantom: PhantomData,
161        }
162    }
163
164    pub fn cluster<C>(&self) -> Cluster<'a, C> {
165        let mut next_location_id = self.next_location_id.borrow_mut();
166        let id = *next_location_id;
167        *next_location_id += 1;
168
169        self.clusters
170            .borrow_mut()
171            .push((id, type_name::<C>().to_string()));
172
173        Cluster {
174            id,
175            flow_state: self.flow_state().clone(),
176            _phantom: PhantomData,
177        }
178    }
179}
180
181#[cfg(feature = "build")]
182#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
183#[expect(missing_docs, reason = "TODO")]
184impl<'a> FlowBuilder<'a> {
185    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
186        self.finalized = true;
187
188        super::built::BuiltFlow {
189            ir: self.flow_state.borrow_mut().roots.take().unwrap(),
190            process_id_name: self.processes.replace(vec![]),
191            cluster_id_name: self.clusters.replace(vec![]),
192            external_id_name: self.externals.replace(vec![]),
193            _phantom: PhantomData,
194        }
195    }
196
197    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
198        self.finalize().with_default_optimize()
199    }
200
201    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
202        self.finalize().optimize_with(f)
203    }
204
205    pub fn with_process<P, D: Deploy<'a>>(
206        self,
207        process: &Process<P>,
208        spec: impl IntoProcessSpec<'a, D>,
209    ) -> DeployFlow<'a, D> {
210        self.with_default_optimize().with_process(process, spec)
211    }
212
213    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
214        self,
215        spec: impl Fn() -> S,
216    ) -> DeployFlow<'a, D> {
217        self.with_default_optimize().with_remaining_processes(spec)
218    }
219
220    pub fn with_external<P, D: Deploy<'a>>(
221        self,
222        process: &External<P>,
223        spec: impl ExternalSpec<'a, D>,
224    ) -> DeployFlow<'a, D> {
225        self.with_default_optimize().with_external(process, spec)
226    }
227
228    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
229        self,
230        spec: impl Fn() -> S,
231    ) -> DeployFlow<'a, D> {
232        self.with_default_optimize().with_remaining_externals(spec)
233    }
234
235    pub fn with_cluster<C, D: Deploy<'a>>(
236        self,
237        cluster: &Cluster<C>,
238        spec: impl ClusterSpec<'a, D>,
239    ) -> DeployFlow<'a, D> {
240        self.with_default_optimize().with_cluster(cluster, spec)
241    }
242
243    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
244        self,
245        spec: impl Fn() -> S,
246    ) -> DeployFlow<'a, D> {
247        self.with_default_optimize().with_remaining_clusters(spec)
248    }
249
250    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
251        self.with_default_optimize::<D>().compile(env)
252    }
253
254    pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
255        self.with_default_optimize::<D>().compile_no_network()
256    }
257
258    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
259        self,
260        env: &mut D::InstantiateEnv,
261    ) -> DeployResult<'a, D> {
262        self.with_default_optimize().deploy(env)
263    }
264
265    #[cfg(feature = "sim")]
266    /// Creates a simulation for this builder, which can be used to run deterministic simulations
267    /// of the Hydro program.
268    pub fn sim(self) -> SimFlow<'a> {
269        self.finalize().sim()
270    }
271}
272
273#[expect(missing_docs, reason = "TODO")]
274pub struct RewriteIrFlowBuilder<'a> {
275    builder: FlowBuilder<'a>,
276}
277
278#[expect(missing_docs, reason = "TODO")]
279impl<'a> RewriteIrFlowBuilder<'a> {
280    pub fn build_with(
281        self,
282        thunk: impl FnOnce(&FlowBuilder<'a>) -> Vec<HydroRoot>,
283    ) -> FlowBuilder<'a> {
284        let roots = thunk(&self.builder);
285        self.builder.flow_state().borrow_mut().roots = Some(roots);
286        self.builder
287    }
288}