hydro_lang/builder/
mod.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use deploy::{DeployFlow, DeployResult};
10use stageleft::*;
11
12#[cfg(feature = "build")]
13use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use crate::ir::HydroLeaf;
15use crate::location::{Cluster, External, Process};
16use crate::staging_util::Invariant;
17
18#[cfg(feature = "build")]
19#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
20pub mod built;
21#[cfg(feature = "build")]
22#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
23pub mod compiled;
24#[cfg(feature = "build")]
25#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
26pub mod deploy;
27
28pub struct FlowStateInner {
29    /// Tracks the leaves of the dataflow IR. This is referenced by
30    /// `Stream` and `HfCycle` to build the IR. The inner option will
31    /// be set to `None` when this builder is finalized.
32    pub(crate) leaves: Option<Vec<HydroLeaf>>,
33
34    /// Counter for generating unique external output identifiers.
35    pub(crate) next_external_out: usize,
36
37    /// Counters for generating identifiers for cycles.
38    pub(crate) cycle_counts: usize,
39
40    /// Counters for clock IDs.
41    pub(crate) next_clock_id: usize,
42
43    /// Counter for unique HydroNode IDs.
44    pub(crate) next_node_id: usize,
45}
46
47impl FlowStateInner {
48    pub fn next_cycle_id(&mut self) -> usize {
49        let id = self.cycle_counts;
50        self.cycle_counts += 1;
51        id
52    }
53}
54
55pub type FlowState = Rc<RefCell<FlowStateInner>>;
56
57pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
58
59pub struct RewriteIrFlowBuilder<'a> {
60    builder: FlowBuilder<'a>,
61}
62
63impl<'a> RewriteIrFlowBuilder<'a> {
64    pub fn build_with(
65        self,
66        thunk: impl FnOnce(&FlowBuilder<'a>) -> Vec<HydroLeaf>,
67    ) -> FlowBuilder<'a> {
68        let leaves = thunk(&self.builder);
69        self.builder.flow_state().borrow_mut().leaves = Some(leaves);
70        self.builder
71    }
72}
73
74pub struct FlowBuilder<'a> {
75    flow_state: FlowState,
76    processes: RefCell<Vec<(usize, String)>>,
77    clusters: RefCell<Vec<(usize, String)>>,
78    externals: RefCell<Vec<(usize, String)>>,
79
80    next_location_id: RefCell<usize>,
81
82    /// Tracks whether this flow has been finalized; it is an error to
83    /// drop without finalizing.
84    finalized: bool,
85
86    /// 'a on a FlowBuilder is used to ensure that staged code does not
87    /// capture more data that it is allowed to; 'a is generated at the
88    /// entrypoint of the staged code and we keep it invariant here
89    /// to enforce the appropriate constraints
90    _phantom: Invariant<'a>,
91}
92
93impl Drop for FlowBuilder<'_> {
94    fn drop(&mut self) {
95        if !self.finalized {
96            panic!(
97                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
98            );
99        }
100    }
101}
102
103impl QuotedContext for FlowBuilder<'_> {
104    fn create() -> Self {
105        FlowBuilder::new()
106    }
107}
108
109impl<'a> FlowBuilder<'a> {
110    #[expect(
111        clippy::new_without_default,
112        reason = "call `new` explicitly, not `default`"
113    )]
114    pub fn new() -> FlowBuilder<'a> {
115        FlowBuilder {
116            flow_state: Rc::new(RefCell::new(FlowStateInner {
117                leaves: Some(vec![]),
118                next_external_out: 0,
119                cycle_counts: 0,
120                next_clock_id: 0,
121                next_node_id: 0,
122            })),
123            processes: RefCell::new(vec![]),
124            clusters: RefCell::new(vec![]),
125            externals: RefCell::new(vec![]),
126            next_location_id: RefCell::new(0),
127            finalized: false,
128            _phantom: PhantomData,
129        }
130    }
131
132    pub fn rewritten_ir_builder<'b>(&self) -> RewriteIrFlowBuilder<'b> {
133        let processes = self.processes.borrow().clone();
134        let clusters = self.clusters.borrow().clone();
135        let externals = self.externals.borrow().clone();
136        let next_location_id = *self.next_location_id.borrow();
137        RewriteIrFlowBuilder {
138            builder: FlowBuilder {
139                flow_state: Rc::new(RefCell::new(FlowStateInner {
140                    leaves: None,
141                    next_external_out: 0,
142                    cycle_counts: 0,
143                    next_clock_id: 0,
144                    next_node_id: 0,
145                })),
146                processes: RefCell::new(processes),
147                clusters: RefCell::new(clusters),
148                externals: RefCell::new(externals),
149                next_location_id: RefCell::new(next_location_id),
150                finalized: false,
151                _phantom: PhantomData,
152            },
153        }
154    }
155
156    pub fn flow_state(&self) -> &FlowState {
157        &self.flow_state
158    }
159
160    pub fn process<P>(&self) -> Process<'a, P> {
161        let mut next_location_id = self.next_location_id.borrow_mut();
162        let id = *next_location_id;
163        *next_location_id += 1;
164
165        self.processes
166            .borrow_mut()
167            .push((id, type_name::<P>().to_string()));
168
169        Process {
170            id,
171            flow_state: self.flow_state().clone(),
172            _phantom: PhantomData,
173        }
174    }
175
176    pub fn external<P>(&self) -> External<'a, P> {
177        let mut next_location_id = self.next_location_id.borrow_mut();
178        let id = *next_location_id;
179        *next_location_id += 1;
180
181        self.externals
182            .borrow_mut()
183            .push((id, type_name::<P>().to_string()));
184
185        External {
186            id,
187            flow_state: self.flow_state().clone(),
188            _phantom: PhantomData,
189        }
190    }
191
192    pub fn cluster<C>(&self) -> Cluster<'a, C> {
193        let mut next_location_id = self.next_location_id.borrow_mut();
194        let id = *next_location_id;
195        *next_location_id += 1;
196
197        self.clusters
198            .borrow_mut()
199            .push((id, type_name::<C>().to_string()));
200
201        Cluster {
202            id,
203            flow_state: self.flow_state().clone(),
204            _phantom: PhantomData,
205        }
206    }
207}
208
209#[cfg(feature = "build")]
210#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
211impl<'a> FlowBuilder<'a> {
212    pub fn finalize(mut self) -> built::BuiltFlow<'a> {
213        self.finalized = true;
214
215        built::BuiltFlow {
216            ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
217            process_id_name: self.processes.replace(vec![]),
218            cluster_id_name: self.clusters.replace(vec![]),
219            external_id_name: self.externals.replace(vec![]),
220            _phantom: PhantomData,
221        }
222    }
223
224    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
225        self.finalize().with_default_optimize()
226    }
227
228    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
229        self.finalize().optimize_with(f)
230    }
231
232    pub fn with_process<P, D: Deploy<'a>>(
233        self,
234        process: &Process<P>,
235        spec: impl IntoProcessSpec<'a, D>,
236    ) -> DeployFlow<'a, D> {
237        self.with_default_optimize().with_process(process, spec)
238    }
239
240    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
241        self,
242        spec: impl Fn() -> S,
243    ) -> DeployFlow<'a, D> {
244        self.with_default_optimize().with_remaining_processes(spec)
245    }
246
247    pub fn with_external<P, D: Deploy<'a>>(
248        self,
249        process: &External<P>,
250        spec: impl ExternalSpec<'a, D>,
251    ) -> DeployFlow<'a, D> {
252        self.with_default_optimize().with_external(process, spec)
253    }
254
255    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
256        self,
257        spec: impl Fn() -> S,
258    ) -> DeployFlow<'a, D> {
259        self.with_default_optimize().with_remaining_externals(spec)
260    }
261
262    pub fn with_cluster<C, D: Deploy<'a>>(
263        self,
264        cluster: &Cluster<C>,
265        spec: impl ClusterSpec<'a, D>,
266    ) -> DeployFlow<'a, D> {
267        self.with_default_optimize().with_cluster(cluster, spec)
268    }
269
270    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
271        self,
272        spec: impl Fn() -> S,
273    ) -> DeployFlow<'a, D> {
274        self.with_default_optimize().with_remaining_clusters(spec)
275    }
276
277    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
278        self.with_default_optimize::<D>().compile(env)
279    }
280
281    pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
282        self.with_default_optimize::<D>().compile_no_network()
283    }
284
285    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
286        self,
287        env: &mut D::InstantiateEnv,
288    ) -> DeployResult<'a, D> {
289        self.with_default_optimize().deploy(env)
290    }
291}