hydro_lang/compile/
built.rs

1use std::collections::{BTreeMap, HashMap};
2use std::marker::PhantomData;
3
4use dfir_lang::graph::{DfirGraph, eliminate_extra_unions_tees, partition_graph};
5
6use super::compiled::CompiledFlow;
7use super::deploy::{DeployFlow, DeployResult};
8use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
9use super::ir::{HydroRoot, emit};
10use crate::location::{Cluster, External, Process};
11#[cfg(stageleft_runtime)]
12#[cfg(feature = "sim")]
13use crate::sim::{flow::SimFlow, graph::SimNode};
14use crate::staging_util::Invariant;
15#[cfg(stageleft_runtime)]
16#[cfg(feature = "viz")]
17use crate::viz::api::GraphApi;
18
19pub struct BuiltFlow<'a> {
20    pub(super) ir: Vec<HydroRoot>,
21    pub(super) process_id_name: Vec<(usize, String)>,
22    pub(super) cluster_id_name: Vec<(usize, String)>,
23    pub(super) external_id_name: Vec<(usize, String)>,
24    pub(super) next_location_id: usize,
25
26    pub(super) _phantom: Invariant<'a>,
27}
28
29pub(crate) fn build_inner<'a, D: Deploy<'a>>(
30    ir: &mut Vec<HydroRoot>,
31) -> BTreeMap<usize, DfirGraph> {
32    emit::<D>(ir)
33        .into_iter()
34        .map(|(k, v)| {
35            let (mut flat_graph, _, _) = v.build();
36            eliminate_extra_unions_tees(&mut flat_graph);
37            let partitioned_graph =
38                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
39            (k, partitioned_graph)
40        })
41        .collect()
42}
43
44impl<'a> BuiltFlow<'a> {
45    pub fn ir(&self) -> &Vec<HydroRoot> {
46        &self.ir
47    }
48
49    pub fn process_id_name(&self) -> &Vec<(usize, String)> {
50        &self.process_id_name
51    }
52
53    pub fn cluster_id_name(&self) -> &Vec<(usize, String)> {
54        &self.cluster_id_name
55    }
56
57    pub fn external_id_name(&self) -> &Vec<(usize, String)> {
58        &self.external_id_name
59    }
60
61    /// Get a GraphApi instance for this built flow
62    #[cfg(stageleft_runtime)]
63    #[cfg(feature = "viz")]
64    pub fn graph_api(&self) -> GraphApi<'_> {
65        GraphApi::new(
66            &self.ir,
67            &self.process_id_name,
68            &self.cluster_id_name,
69            &self.external_id_name,
70        )
71    }
72
73    // String generation methods
74    #[cfg(feature = "viz")]
75    pub fn mermaid_string(
76        &self,
77        show_metadata: bool,
78        show_location_groups: bool,
79        use_short_labels: bool,
80    ) -> String {
81        self.graph_api()
82            .mermaid_to_string(show_metadata, show_location_groups, use_short_labels)
83    }
84
85    #[cfg(feature = "viz")]
86    pub fn dot_string(
87        &self,
88        show_metadata: bool,
89        show_location_groups: bool,
90        use_short_labels: bool,
91    ) -> String {
92        self.graph_api()
93            .dot_to_string(show_metadata, show_location_groups, use_short_labels)
94    }
95
96    #[cfg(feature = "viz")]
97    pub fn hydroscope_string(
98        &self,
99        show_metadata: bool,
100        show_location_groups: bool,
101        use_short_labels: bool,
102    ) -> String {
103        self.graph_api()
104            .hydroscope_to_string(show_metadata, show_location_groups, use_short_labels)
105    }
106
107    // File generation methods
108    #[cfg(feature = "viz")]
109    pub fn mermaid_to_file(
110        &self,
111        filename: &str,
112        show_metadata: bool,
113        show_location_groups: bool,
114        use_short_labels: bool,
115    ) -> Result<(), Box<dyn std::error::Error>> {
116        self.graph_api().mermaid_to_file(
117            filename,
118            show_metadata,
119            show_location_groups,
120            use_short_labels,
121        )
122    }
123
124    #[cfg(feature = "viz")]
125    pub fn dot_to_file(
126        &self,
127        filename: &str,
128        show_metadata: bool,
129        show_location_groups: bool,
130        use_short_labels: bool,
131    ) -> Result<(), Box<dyn std::error::Error>> {
132        self.graph_api().dot_to_file(
133            filename,
134            show_metadata,
135            show_location_groups,
136            use_short_labels,
137        )
138    }
139
140    #[cfg(feature = "viz")]
141    pub fn hydroscope_to_file(
142        &self,
143        filename: &str,
144        show_metadata: bool,
145        show_location_groups: bool,
146        use_short_labels: bool,
147    ) -> Result<(), Box<dyn std::error::Error>> {
148        self.graph_api().hydroscope_to_file(
149            filename,
150            show_metadata,
151            show_location_groups,
152            use_short_labels,
153        )
154    }
155
156    // Browser generation methods
157    #[cfg(feature = "viz")]
158    pub fn mermaid_to_browser(
159        &self,
160        show_metadata: bool,
161        show_location_groups: bool,
162        use_short_labels: bool,
163        message_handler: Option<&dyn Fn(&str)>,
164    ) -> Result<(), Box<dyn std::error::Error>> {
165        self.graph_api().mermaid_to_browser(
166            show_metadata,
167            show_location_groups,
168            use_short_labels,
169            message_handler,
170        )
171    }
172
173    #[cfg(feature = "viz")]
174    pub fn dot_to_browser(
175        &self,
176        show_metadata: bool,
177        show_location_groups: bool,
178        use_short_labels: bool,
179        message_handler: Option<&dyn Fn(&str)>,
180    ) -> Result<(), Box<dyn std::error::Error>> {
181        self.graph_api().dot_to_browser(
182            show_metadata,
183            show_location_groups,
184            use_short_labels,
185            message_handler,
186        )
187    }
188
189    #[cfg(feature = "viz")]
190    pub fn hydroscope_to_browser(
191        &self,
192        show_metadata: bool,
193        show_location_groups: bool,
194        use_short_labels: bool,
195        message_handler: Option<&dyn Fn(&str)>,
196    ) -> Result<(), Box<dyn std::error::Error>> {
197        self.graph_api().hydroscope_to_browser(
198            show_metadata,
199            show_location_groups,
200            use_short_labels,
201            message_handler,
202        )
203    }
204
205    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
206        f(&mut self.ir);
207        BuiltFlow {
208            ir: std::mem::take(&mut self.ir),
209            process_id_name: std::mem::take(&mut self.process_id_name),
210            cluster_id_name: std::mem::take(&mut self.cluster_id_name),
211            external_id_name: std::mem::take(&mut self.external_id_name),
212            next_location_id: self.next_location_id,
213            _phantom: PhantomData,
214        }
215    }
216
217    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
218        self.into_deploy()
219    }
220
221    #[cfg(feature = "sim")]
222    /// Creates a simulation for this builder, which can be used to run deterministic simulations
223    /// of the Hydro program.
224    pub fn sim(mut self) -> SimFlow<'a> {
225        use std::cell::RefCell;
226        use std::rc::Rc;
227
228        use crate::sim::graph::{SimExternal, SimExternalPortRegistry, SimNodePort};
229
230        let shared_port_counter = Rc::new(RefCell::new(SimNodePort::default()));
231        let processes = self
232            .process_id_name
233            .iter()
234            .map(|id| {
235                (
236                    id.0,
237                    SimNode {
238                        shared_port_counter: shared_port_counter.clone(),
239                    },
240                )
241            })
242            .collect();
243
244        let clusters = self
245            .cluster_id_name
246            .iter()
247            .map(|id| {
248                (
249                    id.0,
250                    SimNode {
251                        shared_port_counter: shared_port_counter.clone(),
252                    },
253                )
254            })
255            .collect();
256
257        let externals_port_registry = Rc::new(RefCell::new(SimExternalPortRegistry::default()));
258        let externals = self
259            .external_id_name
260            .iter()
261            .map(|id| {
262                (
263                    id.0,
264                    SimExternal {
265                        shared_inner: externals_port_registry.clone(),
266                    },
267                )
268            })
269            .collect();
270
271        SimFlow {
272            ir: std::mem::take(&mut self.ir),
273            processes,
274            clusters,
275            cluster_max_sizes: HashMap::new(),
276            externals,
277            externals_port_registry,
278            _process_id_name: self.process_id_name,
279            _external_id_name: self.external_id_name,
280            _cluster_id_name: self.cluster_id_name,
281            _phantom: PhantomData,
282        }
283    }
284
285    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
286        let (processes, clusters, externals) = Default::default();
287        DeployFlow {
288            ir: self.ir,
289            processes,
290            process_id_name: self.process_id_name,
291            clusters,
292            cluster_id_name: self.cluster_id_name,
293            externals,
294            external_id_name: self.external_id_name,
295            _phantom: PhantomData,
296        }
297    }
298
299    pub fn with_process<P, D: Deploy<'a>>(
300        self,
301        process: &Process<P>,
302        spec: impl IntoProcessSpec<'a, D>,
303    ) -> DeployFlow<'a, D> {
304        self.into_deploy().with_process(process, spec)
305    }
306
307    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
308        self,
309        spec: impl Fn() -> S,
310    ) -> DeployFlow<'a, D> {
311        self.into_deploy().with_remaining_processes(spec)
312    }
313
314    pub fn with_external<P, D: Deploy<'a>>(
315        self,
316        process: &External<P>,
317        spec: impl ExternalSpec<'a, D>,
318    ) -> DeployFlow<'a, D> {
319        self.into_deploy().with_external(process, spec)
320    }
321
322    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
323        self,
324        spec: impl Fn() -> S,
325    ) -> DeployFlow<'a, D> {
326        self.into_deploy().with_remaining_externals(spec)
327    }
328
329    pub fn with_cluster<C, D: Deploy<'a>>(
330        self,
331        cluster: &Cluster<C>,
332        spec: impl ClusterSpec<'a, D>,
333    ) -> DeployFlow<'a, D> {
334        self.into_deploy().with_cluster(cluster, spec)
335    }
336
337    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
338        self,
339        spec: impl Fn() -> S,
340    ) -> DeployFlow<'a, D> {
341        self.into_deploy().with_remaining_clusters(spec)
342    }
343
344    pub fn compile<D: Deploy<'a>>(self) -> CompiledFlow<'a> {
345        self.into_deploy::<D>().compile()
346    }
347
348    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
349        self.into_deploy::<D>().deploy(env)
350    }
351
352    #[cfg(feature = "viz")]
353    pub fn generate_all_files(
354        &self,
355        prefix: &str,
356        show_metadata: bool,
357        show_location_groups: bool,
358        use_short_labels: bool,
359    ) -> Result<(), Box<dyn std::error::Error>> {
360        self.graph_api().generate_all_files(
361            prefix,
362            show_metadata,
363            show_location_groups,
364            use_short_labels,
365        )
366    }
367
368    #[cfg(feature = "viz")]
369    pub fn generate_graph_with_config(
370        &self,
371        config: &crate::viz::config::GraphConfig,
372        message_handler: Option<&dyn Fn(&str)>,
373    ) -> Result<(), Box<dyn std::error::Error>> {
374        self.graph_api()
375            .generate_graph_with_config(config, message_handler)
376    }
377
378    #[cfg(feature = "viz")]
379    pub fn generate_all_files_with_config(
380        &self,
381        config: &crate::viz::config::GraphConfig,
382        prefix: &str,
383    ) -> Result<(), Box<dyn std::error::Error>> {
384        self.graph_api()
385            .generate_all_files_with_config(config, prefix)
386    }
387}