hydro_lang/builder/
mod.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::rc::Rc;
6
7#[cfg(feature = "build")]
8use compiled::CompiledFlow;
9#[cfg(feature = "build")]
10use deploy::{DeployFlow, DeployResult};
11use stageleft::*;
12
13#[cfg(feature = "build")]
14use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
15use crate::ir::HydroLeaf;
16use crate::location::{Cluster, ExternalProcess, Process};
17use crate::staging_util::Invariant;
18
19#[cfg(feature = "build")]
20#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
21pub mod built;
22#[cfg(feature = "build")]
23#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
24pub mod compiled;
25#[cfg(feature = "build")]
26#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
27pub mod deploy;
28
29pub struct FlowStateInner {
30    /// Tracks the leaves of the dataflow IR. This is referenced by
31    /// `Stream` and `HfCycle` to build the IR. The inner option will
32    /// be set to `None` when this builder is finalized.
33    pub(crate) leaves: Option<Vec<HydroLeaf>>,
34
35    /// Counter for generating unique external output identifiers.
36    pub(crate) next_external_out: usize,
37
38    /// Counters for generating identifiers for cycles.
39    pub(crate) cycle_counts: HashMap<usize, usize>,
40
41    /// Counters for clock IDs.
42    pub(crate) next_clock_id: usize,
43
44    /// Counter for unique HydroNode IDs.
45    pub(crate) next_node_id: usize,
46}
47
48pub type FlowState = Rc<RefCell<FlowStateInner>>;
49
50pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
51
52pub struct FlowBuilder<'a> {
53    flow_state: FlowState,
54    processes: RefCell<Vec<(usize, String)>>,
55    clusters: RefCell<Vec<(usize, String)>>,
56    externals: RefCell<Vec<(usize, String)>>,
57
58    next_location_id: RefCell<usize>,
59
60    /// Tracks whether this flow has been finalized; it is an error to
61    /// drop without finalizing.
62    finalized: bool,
63
64    /// 'a on a FlowBuilder is used to ensure that staged code does not
65    /// capture more data that it is allowed to; 'a is generated at the
66    /// entrypoint of the staged code and we keep it invariant here
67    /// to enforce the appropriate constraints
68    _phantom: Invariant<'a>,
69}
70
71impl Drop for FlowBuilder<'_> {
72    fn drop(&mut self) {
73        if !self.finalized {
74            panic!(
75                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
76            );
77        }
78    }
79}
80
81impl QuotedContext for FlowBuilder<'_> {
82    fn create() -> Self {
83        FlowBuilder::new()
84    }
85}
86
87impl<'a> FlowBuilder<'a> {
88    #[expect(
89        clippy::new_without_default,
90        reason = "call `new` explicitly, not `default`"
91    )]
92    pub fn new() -> FlowBuilder<'a> {
93        FlowBuilder {
94            flow_state: Rc::new(RefCell::new(FlowStateInner {
95                leaves: Some(vec![]),
96                next_external_out: 0,
97                cycle_counts: HashMap::new(),
98                next_clock_id: 0,
99                next_node_id: 0,
100            })),
101            processes: RefCell::new(vec![]),
102            clusters: RefCell::new(vec![]),
103            externals: RefCell::new(vec![]),
104            next_location_id: RefCell::new(0),
105            finalized: false,
106            _phantom: PhantomData,
107        }
108    }
109
110    pub fn flow_state(&self) -> &FlowState {
111        &self.flow_state
112    }
113
114    pub fn process<P>(&self) -> Process<'a, P> {
115        let mut next_location_id = self.next_location_id.borrow_mut();
116        let id = *next_location_id;
117        *next_location_id += 1;
118
119        self.processes
120            .borrow_mut()
121            .push((id, type_name::<P>().to_string()));
122
123        Process {
124            id,
125            flow_state: self.flow_state().clone(),
126            _phantom: PhantomData,
127        }
128    }
129
130    pub fn external_process<P>(&self) -> ExternalProcess<'a, P> {
131        let mut next_location_id = self.next_location_id.borrow_mut();
132        let id = *next_location_id;
133        *next_location_id += 1;
134
135        self.externals
136            .borrow_mut()
137            .push((id, type_name::<P>().to_string()));
138
139        ExternalProcess {
140            id,
141            flow_state: self.flow_state().clone(),
142            _phantom: PhantomData,
143        }
144    }
145
146    pub fn cluster<C>(&self) -> Cluster<'a, C> {
147        let mut next_location_id = self.next_location_id.borrow_mut();
148        let id = *next_location_id;
149        *next_location_id += 1;
150
151        self.clusters
152            .borrow_mut()
153            .push((id, type_name::<C>().to_string()));
154
155        Cluster {
156            id,
157            flow_state: self.flow_state().clone(),
158            _phantom: PhantomData,
159        }
160    }
161}
162
163#[cfg(feature = "build")]
164#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
165impl<'a> FlowBuilder<'a> {
166    pub fn finalize(mut self) -> built::BuiltFlow<'a> {
167        self.finalized = true;
168
169        built::BuiltFlow {
170            ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
171            process_id_name: self.processes.replace(vec![]),
172            cluster_id_name: self.clusters.replace(vec![]),
173            external_id_name: self.externals.replace(vec![]),
174            used: false,
175            _phantom: PhantomData,
176        }
177    }
178
179    pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
180        self.finalize().with_default_optimize()
181    }
182
183    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
184        self.finalize().optimize_with(f)
185    }
186
187    pub fn with_process<P, D: LocalDeploy<'a>>(
188        self,
189        process: &Process<P>,
190        spec: impl IntoProcessSpec<'a, D>,
191    ) -> DeployFlow<'a, D> {
192        self.with_default_optimize().with_process(process, spec)
193    }
194
195    pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
196        self,
197        spec: impl Fn() -> S,
198    ) -> DeployFlow<'a, D> {
199        self.with_default_optimize().with_remaining_processes(spec)
200    }
201
202    pub fn with_external<P, D: LocalDeploy<'a>>(
203        self,
204        process: &ExternalProcess<P>,
205        spec: impl ExternalSpec<'a, D>,
206    ) -> DeployFlow<'a, D> {
207        self.with_default_optimize().with_external(process, spec)
208    }
209
210    pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
211        self,
212        spec: impl Fn() -> S,
213    ) -> DeployFlow<'a, D> {
214        self.with_default_optimize().with_remaining_externals(spec)
215    }
216
217    pub fn with_cluster<C, D: LocalDeploy<'a>>(
218        self,
219        cluster: &Cluster<C>,
220        spec: impl ClusterSpec<'a, D>,
221    ) -> DeployFlow<'a, D> {
222        self.with_default_optimize().with_cluster(cluster, spec)
223    }
224
225    pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
226        self,
227        spec: impl Fn() -> S,
228    ) -> DeployFlow<'a, D> {
229        self.with_default_optimize().with_remaining_clusters(spec)
230    }
231
232    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
233        self.with_default_optimize::<D>().compile(env)
234    }
235
236    pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
237        self.with_default_optimize::<D>().compile_no_network()
238    }
239
240    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
241        self,
242        env: &mut D::InstantiateEnv,
243    ) -> DeployResult<'a, D> {
244        self.with_default_optimize().deploy(env)
245    }
246}