hydro_lang/builder/
mod.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6#[cfg(feature = "build")]
7use compiled::CompiledFlow;
8#[cfg(feature = "build")]
9use deploy::{DeployFlow, DeployResult};
10use stageleft::*;
11
12#[cfg(feature = "build")]
13use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use crate::ir::HydroLeaf;
15use crate::location::{Cluster, ExternalProcess, Process};
16use crate::staging_util::Invariant;
17
18#[cfg(feature = "build")]
19#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
20pub mod built;
21#[cfg(feature = "build")]
22#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
23pub mod compiled;
24#[cfg(feature = "build")]
25#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
26pub mod deploy;
27
28pub struct FlowStateInner {
29    /// Tracks the leaves of the dataflow IR. This is referenced by
30    /// `Stream` and `HfCycle` to build the IR. The inner option will
31    /// be set to `None` when this builder is finalized.
32    pub(crate) leaves: Option<Vec<HydroLeaf>>,
33
34    /// Counter for generating unique external output identifiers.
35    pub(crate) next_external_out: usize,
36
37    /// Counters for generating identifiers for cycles.
38    pub(crate) cycle_counts: usize,
39
40    /// Counters for clock IDs.
41    pub(crate) next_clock_id: usize,
42
43    /// Counter for unique HydroNode IDs.
44    pub(crate) next_node_id: usize,
45}
46
47impl FlowStateInner {
48    pub fn next_cycle_id(&mut self) -> usize {
49        let id = self.cycle_counts;
50        self.cycle_counts += 1;
51        id
52    }
53}
54
55pub type FlowState = Rc<RefCell<FlowStateInner>>;
56
57pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
58
59pub struct FlowBuilder<'a> {
60    flow_state: FlowState,
61    processes: RefCell<Vec<(usize, String)>>,
62    clusters: RefCell<Vec<(usize, String)>>,
63    externals: RefCell<Vec<(usize, String)>>,
64
65    next_location_id: RefCell<usize>,
66
67    /// Tracks whether this flow has been finalized; it is an error to
68    /// drop without finalizing.
69    finalized: bool,
70
71    /// 'a on a FlowBuilder is used to ensure that staged code does not
72    /// capture more data that it is allowed to; 'a is generated at the
73    /// entrypoint of the staged code and we keep it invariant here
74    /// to enforce the appropriate constraints
75    _phantom: Invariant<'a>,
76}
77
78impl Drop for FlowBuilder<'_> {
79    fn drop(&mut self) {
80        if !self.finalized {
81            panic!(
82                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
83            );
84        }
85    }
86}
87
88impl QuotedContext for FlowBuilder<'_> {
89    fn create() -> Self {
90        FlowBuilder::new()
91    }
92}
93
94impl<'a> FlowBuilder<'a> {
95    #[expect(
96        clippy::new_without_default,
97        reason = "call `new` explicitly, not `default`"
98    )]
99    pub fn new() -> FlowBuilder<'a> {
100        FlowBuilder {
101            flow_state: Rc::new(RefCell::new(FlowStateInner {
102                leaves: Some(vec![]),
103                next_external_out: 0,
104                cycle_counts: 0,
105                next_clock_id: 0,
106                next_node_id: 0,
107            })),
108            processes: RefCell::new(vec![]),
109            clusters: RefCell::new(vec![]),
110            externals: RefCell::new(vec![]),
111            next_location_id: RefCell::new(0),
112            finalized: false,
113            _phantom: PhantomData,
114        }
115    }
116
117    pub fn flow_state(&self) -> &FlowState {
118        &self.flow_state
119    }
120
121    pub fn process<P>(&self) -> Process<'a, P> {
122        let mut next_location_id = self.next_location_id.borrow_mut();
123        let id = *next_location_id;
124        *next_location_id += 1;
125
126        self.processes
127            .borrow_mut()
128            .push((id, type_name::<P>().to_string()));
129
130        Process {
131            id,
132            flow_state: self.flow_state().clone(),
133            _phantom: PhantomData,
134        }
135    }
136
137    pub fn external_process<P>(&self) -> ExternalProcess<'a, P> {
138        let mut next_location_id = self.next_location_id.borrow_mut();
139        let id = *next_location_id;
140        *next_location_id += 1;
141
142        self.externals
143            .borrow_mut()
144            .push((id, type_name::<P>().to_string()));
145
146        ExternalProcess {
147            id,
148            flow_state: self.flow_state().clone(),
149            _phantom: PhantomData,
150        }
151    }
152
153    pub fn cluster<C>(&self) -> Cluster<'a, C> {
154        let mut next_location_id = self.next_location_id.borrow_mut();
155        let id = *next_location_id;
156        *next_location_id += 1;
157
158        self.clusters
159            .borrow_mut()
160            .push((id, type_name::<C>().to_string()));
161
162        Cluster {
163            id,
164            flow_state: self.flow_state().clone(),
165            _phantom: PhantomData,
166        }
167    }
168}
169
170#[cfg(feature = "build")]
171#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
172impl<'a> FlowBuilder<'a> {
173    pub fn finalize(mut self) -> built::BuiltFlow<'a> {
174        self.finalized = true;
175
176        built::BuiltFlow {
177            ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
178            process_id_name: self.processes.replace(vec![]),
179            cluster_id_name: self.clusters.replace(vec![]),
180            external_id_name: self.externals.replace(vec![]),
181            _phantom: PhantomData,
182        }
183    }
184
185    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
186        self.finalize().with_default_optimize()
187    }
188
189    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
190        self.finalize().optimize_with(f)
191    }
192
193    pub fn with_process<P, D: Deploy<'a>>(
194        self,
195        process: &Process<P>,
196        spec: impl IntoProcessSpec<'a, D>,
197    ) -> DeployFlow<'a, D> {
198        self.with_default_optimize().with_process(process, spec)
199    }
200
201    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
202        self,
203        spec: impl Fn() -> S,
204    ) -> DeployFlow<'a, D> {
205        self.with_default_optimize().with_remaining_processes(spec)
206    }
207
208    pub fn with_external<P, D: Deploy<'a>>(
209        self,
210        process: &ExternalProcess<P>,
211        spec: impl ExternalSpec<'a, D>,
212    ) -> DeployFlow<'a, D> {
213        self.with_default_optimize().with_external(process, spec)
214    }
215
216    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
217        self,
218        spec: impl Fn() -> S,
219    ) -> DeployFlow<'a, D> {
220        self.with_default_optimize().with_remaining_externals(spec)
221    }
222
223    pub fn with_cluster<C, D: Deploy<'a>>(
224        self,
225        cluster: &Cluster<C>,
226        spec: impl ClusterSpec<'a, D>,
227    ) -> DeployFlow<'a, D> {
228        self.with_default_optimize().with_cluster(cluster, spec)
229    }
230
231    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
232        self,
233        spec: impl Fn() -> S,
234    ) -> DeployFlow<'a, D> {
235        self.with_default_optimize().with_remaining_clusters(spec)
236    }
237
238    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
239        self.with_default_optimize::<D>().compile(env)
240    }
241
242    pub fn compile_no_network<D: Deploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
243        self.with_default_optimize::<D>().compile_no_network()
244    }
245
246    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
247        self,
248        env: &mut D::InstantiateEnv,
249    ) -> DeployResult<'a, D> {
250        self.with_default_optimize().deploy(env)
251    }
252}