hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use super::compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use super::deploy::{DeployFlow, DeployResult};
10#[cfg(feature = "build")]
11use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
12use super::ir::HydroRoot;
13use crate::location::{Cluster, External, Process};
14use crate::staging_util::Invariant;
15
16pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
17
18pub(crate) struct FlowStateInner {
19    /// Tracks the roots of the dataflow IR. This is referenced by
20    /// `Stream` and `HfCycle` to build the IR. The inner option will
21    /// be set to `None` when this builder is finalized.
22    pub(crate) roots: Option<Vec<HydroRoot>>,
23
24    /// Counter for generating unique external output identifiers.
25    pub(crate) next_external_out: usize,
26
27    /// Counters for generating identifiers for cycles.
28    pub(crate) cycle_counts: usize,
29
30    /// Counters for clock IDs.
31    pub(crate) next_clock_id: usize,
32}
33
34impl FlowStateInner {
35    pub fn next_cycle_id(&mut self) -> usize {
36        let id = self.cycle_counts;
37        self.cycle_counts += 1;
38        id
39    }
40
41    pub fn push_root(&mut self, root: HydroRoot) {
42        self.roots
43            .as_mut()
44            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
45            .push(root);
46    }
47}
48
49#[expect(missing_docs, reason = "TODO")]
50pub struct FlowBuilder<'a> {
51    flow_state: FlowState,
52    processes: RefCell<Vec<(usize, String)>>,
53    clusters: RefCell<Vec<(usize, String)>>,
54    externals: RefCell<Vec<(usize, String)>>,
55
56    next_location_id: RefCell<usize>,
57
58    /// Tracks whether this flow has been finalized; it is an error to
59    /// drop without finalizing.
60    finalized: bool,
61
62    /// 'a on a FlowBuilder is used to ensure that staged code does not
63    /// capture more data that it is allowed to; 'a is generated at the
64    /// entrypoint of the staged code and we keep it invariant here
65    /// to enforce the appropriate constraints
66    _phantom: Invariant<'a>,
67}
68
69impl Drop for FlowBuilder<'_> {
70    fn drop(&mut self) {
71        if !self.finalized {
72            panic!(
73                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
74            );
75        }
76    }
77}
78
79#[expect(missing_docs, reason = "TODO")]
80impl<'a> FlowBuilder<'a> {
81    #[expect(
82        clippy::new_without_default,
83        reason = "call `new` explicitly, not `default`"
84    )]
85    pub fn new() -> FlowBuilder<'a> {
86        FlowBuilder {
87            flow_state: Rc::new(RefCell::new(FlowStateInner {
88                roots: Some(vec![]),
89                next_external_out: 0,
90                cycle_counts: 0,
91                next_clock_id: 0,
92            })),
93            processes: RefCell::new(vec![]),
94            clusters: RefCell::new(vec![]),
95            externals: RefCell::new(vec![]),
96            next_location_id: RefCell::new(0),
97            finalized: false,
98            _phantom: PhantomData,
99        }
100    }
101
102    pub fn rewritten_ir_builder<'b>(&self) -> RewriteIrFlowBuilder<'b> {
103        let processes = self.processes.borrow().clone();
104        let clusters = self.clusters.borrow().clone();
105        let externals = self.externals.borrow().clone();
106        let next_location_id = *self.next_location_id.borrow();
107        RewriteIrFlowBuilder {
108            builder: FlowBuilder {
109                flow_state: Rc::new(RefCell::new(FlowStateInner {
110                    roots: None,
111                    next_external_out: 0,
112                    cycle_counts: 0,
113                    next_clock_id: 0,
114                })),
115                processes: RefCell::new(processes),
116                clusters: RefCell::new(clusters),
117                externals: RefCell::new(externals),
118                next_location_id: RefCell::new(next_location_id),
119                finalized: false,
120                _phantom: PhantomData,
121            },
122        }
123    }
124
125    pub(crate) fn flow_state(&self) -> &FlowState {
126        &self.flow_state
127    }
128
129    pub fn process<P>(&self) -> Process<'a, P> {
130        let mut next_location_id = self.next_location_id.borrow_mut();
131        let id = *next_location_id;
132        *next_location_id += 1;
133
134        self.processes
135            .borrow_mut()
136            .push((id, type_name::<P>().to_string()));
137
138        Process {
139            id,
140            flow_state: self.flow_state().clone(),
141            _phantom: PhantomData,
142        }
143    }
144
145    pub fn external<P>(&self) -> External<'a, P> {
146        let mut next_location_id = self.next_location_id.borrow_mut();
147        let id = *next_location_id;
148        *next_location_id += 1;
149
150        self.externals
151            .borrow_mut()
152            .push((id, type_name::<P>().to_string()));
153
154        External {
155            id,
156            flow_state: self.flow_state().clone(),
157            _phantom: PhantomData,
158        }
159    }
160
161    pub fn cluster<C>(&self) -> Cluster<'a, C> {
162        let mut next_location_id = self.next_location_id.borrow_mut();
163        let id = *next_location_id;
164        *next_location_id += 1;
165
166        self.clusters
167            .borrow_mut()
168            .push((id, type_name::<C>().to_string()));
169
170        Cluster {
171            id,
172            flow_state: self.flow_state().clone(),
173            _phantom: PhantomData,
174        }
175    }
176}
177
178#[cfg(feature = "build")]
179#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
180#[expect(missing_docs, reason = "TODO")]
181impl<'a> FlowBuilder<'a> {
182    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
183        self.finalized = true;
184
185        super::built::BuiltFlow {
186            ir: self.flow_state.borrow_mut().roots.take().unwrap(),
187            process_id_name: self.processes.replace(vec![]),
188            cluster_id_name: self.clusters.replace(vec![]),
189            external_id_name: self.externals.replace(vec![]),
190            _phantom: PhantomData,
191        }
192    }
193
194    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
195        self.finalize().with_default_optimize()
196    }
197
198    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
199        self.finalize().optimize_with(f)
200    }
201
202    pub fn with_process<P, D: Deploy<'a>>(
203        self,
204        process: &Process<P>,
205        spec: impl IntoProcessSpec<'a, D>,
206    ) -> DeployFlow<'a, D> {
207        self.with_default_optimize().with_process(process, spec)
208    }
209
210    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
211        self,
212        spec: impl Fn() -> S,
213    ) -> DeployFlow<'a, D> {
214        self.with_default_optimize().with_remaining_processes(spec)
215    }
216
217    pub fn with_external<P, D: Deploy<'a>>(
218        self,
219        process: &External<P>,
220        spec: impl ExternalSpec<'a, D>,
221    ) -> DeployFlow<'a, D> {
222        self.with_default_optimize().with_external(process, spec)
223    }
224
225    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
226        self,
227        spec: impl Fn() -> S,
228    ) -> DeployFlow<'a, D> {
229        self.with_default_optimize().with_remaining_externals(spec)
230    }
231
232    pub fn with_cluster<C, D: Deploy<'a>>(
233        self,
234        cluster: &Cluster<C>,
235        spec: impl ClusterSpec<'a, D>,
236    ) -> DeployFlow<'a, D> {
237        self.with_default_optimize().with_cluster(cluster, spec)
238    }
239
240    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
241        self,
242        spec: impl Fn() -> S,
243    ) -> DeployFlow<'a, D> {
244        self.with_default_optimize().with_remaining_clusters(spec)
245    }
246
247    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
248        self.with_default_optimize::<D>().compile(env)
249    }
250
251    pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
252        self.with_default_optimize::<D>().compile_no_network()
253    }
254
255    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
256        self,
257        env: &mut D::InstantiateEnv,
258    ) -> DeployResult<'a, D> {
259        self.with_default_optimize().deploy(env)
260    }
261}
262
263#[expect(missing_docs, reason = "TODO")]
264pub struct RewriteIrFlowBuilder<'a> {
265    builder: FlowBuilder<'a>,
266}
267
268#[expect(missing_docs, reason = "TODO")]
269impl<'a> RewriteIrFlowBuilder<'a> {
270    pub fn build_with(
271        self,
272        thunk: impl FnOnce(&FlowBuilder<'a>) -> Vec<HydroRoot>,
273    ) -> FlowBuilder<'a> {
274        let roots = thunk(&self.builder);
275        self.builder.flow_state().borrow_mut().roots = Some(roots);
276        self.builder
277    }
278}