Skip to main content

hydro_lang/compile/
built.rs

1use std::marker::PhantomData;
2
3use dfir_lang::graph::{
4    DfirGraph, FlatGraphBuilderOutput, eliminate_extra_unions_tees, partition_graph,
5};
6use slotmap::{SecondaryMap, SlotMap};
7
8use super::compiled::CompiledFlow;
9use super::deploy::{DeployFlow, DeployResult};
10use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
11use super::ir::{HydroRoot, emit};
12use crate::location::{Cluster, External, LocationKey, LocationType, Process};
13#[cfg(stageleft_runtime)]
14#[cfg(feature = "sim")]
15use crate::sim::{flow::SimFlow, graph::SimNode};
16use crate::staging_util::Invariant;
17#[cfg(stageleft_runtime)]
18#[cfg(feature = "viz")]
19use crate::viz::api::GraphApi;
20
21pub struct BuiltFlow<'a> {
22    pub(super) ir: Vec<HydroRoot>,
23    pub(super) locations: SlotMap<LocationKey, LocationType>,
24    pub(super) location_names: SecondaryMap<LocationKey, String>,
25
26    /// Compile-time sidecar directives extracted from the flow state.
27    pub(super) sidecars: Vec<super::builder::Sidecar>,
28
29    /// Application name used in telemetry.
30    pub(super) flow_name: String,
31
32    pub(super) _phantom: Invariant<'a>,
33}
34
35pub(crate) fn build_inner(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, DfirGraph> {
36    emit(ir)
37        .into_iter()
38        .map(|(k, v)| {
39            let FlatGraphBuilderOutput { mut flat_graph, .. } =
40                v.build().expect("Failed to build DFIR flat graph.");
41            eliminate_extra_unions_tees(&mut flat_graph);
42            let partitioned_graph =
43                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
44            (k, partitioned_graph)
45        })
46        .collect()
47}
48
49impl<'a> BuiltFlow<'a> {
50    /// Returns all [`HydroRoot`]s in the IR.
51    pub fn ir(&self) -> &[HydroRoot] {
52        &self.ir
53    }
54
55    /// Serialize the IR as JSON.
56    #[cfg(feature = "runtime_support")]
57    pub fn ir_json(&self) -> Result<String, serde_json::Error> {
58        super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
59    }
60
61    /// Returns all raw location ID -> location name mappings.
62    pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
63        &self.location_names
64    }
65
66    /// Get a GraphApi instance for this built flow
67    #[cfg(stageleft_runtime)]
68    #[cfg(feature = "viz")]
69    pub fn graph_api(&self) -> GraphApi<'_> {
70        GraphApi::new(&self.ir, self.location_names())
71    }
72
73    /// Render graph to string in the given format.
74    #[cfg(feature = "viz")]
75    pub fn render_graph(
76        &self,
77        format: crate::viz::config::GraphType,
78        use_short_labels: bool,
79        show_metadata: bool,
80    ) -> String {
81        self.graph_api()
82            .render(format, use_short_labels, show_metadata)
83    }
84
85    /// Write graph to file.
86    #[cfg(feature = "viz")]
87    pub fn write_graph_to_file(
88        &self,
89        format: crate::viz::config::GraphType,
90        filename: &str,
91        use_short_labels: bool,
92        show_metadata: bool,
93    ) -> Result<(), Box<dyn std::error::Error>> {
94        self.graph_api()
95            .write_to_file(format, filename, use_short_labels, show_metadata)
96    }
97
98    /// Generate graph based on CLI config. Returns Some(path) if written.
99    #[cfg(feature = "viz")]
100    pub fn generate_graph(
101        &self,
102        config: &crate::viz::config::GraphConfig,
103    ) -> Result<Option<String>, Box<dyn std::error::Error>> {
104        self.graph_api().generate_graph(config)
105    }
106
107    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
108        f(&mut self.ir);
109        self
110    }
111
112    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
113        self.into_deploy()
114    }
115
116    #[cfg(feature = "sim")]
117    /// Creates a simulation for this builder, which can be used to run deterministic simulations
118    /// of the Hydro program.
119    pub fn sim(self) -> SimFlow<'a> {
120        use std::cell::RefCell;
121        use std::rc::Rc;
122
123        use slotmap::SparseSecondaryMap;
124
125        use crate::sim::graph::SimNodePort;
126
127        let shared_port_counter = Rc::new(RefCell::new(SimNodePort::default()));
128
129        let mut processes = SparseSecondaryMap::new();
130        let mut clusters = SparseSecondaryMap::new();
131        let externals = SparseSecondaryMap::new();
132
133        for (key, loc) in self.locations.iter() {
134            match loc {
135                LocationType::Process => {
136                    processes.insert(
137                        key,
138                        SimNode {
139                            shared_port_counter: shared_port_counter.clone(),
140                        },
141                    );
142                }
143                LocationType::Cluster => {
144                    clusters.insert(
145                        key,
146                        SimNode {
147                            shared_port_counter: shared_port_counter.clone(),
148                        },
149                    );
150                }
151                LocationType::External => {
152                    panic!("Sim cannot have externals");
153                }
154            }
155        }
156
157        SimFlow {
158            ir: self.ir,
159            processes,
160            clusters,
161            externals,
162            cluster_max_sizes: SparseSecondaryMap::new(),
163            externals_port_registry: Default::default(),
164            test_safety_only: false,
165            skip_consistency_assertions: false,
166            unit_test_fuzz_iterations: 8192,
167            _phantom: PhantomData,
168        }
169    }
170
171    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
172        let (processes, clusters, externals) = Default::default();
173        DeployFlow {
174            ir: self.ir,
175            locations: self.locations,
176            location_names: self.location_names,
177            processes,
178            clusters,
179            externals,
180            sidecars: self.sidecars,
181            flow_name: self.flow_name,
182            _phantom: PhantomData,
183        }
184    }
185
186    pub fn with_process<P, D: Deploy<'a>>(
187        self,
188        process: &Process<P>,
189        spec: impl IntoProcessSpec<'a, D>,
190    ) -> DeployFlow<'a, D> {
191        self.into_deploy().with_process(process, spec)
192    }
193
194    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
195        self,
196        spec: impl Fn() -> S,
197    ) -> DeployFlow<'a, D> {
198        self.into_deploy().with_remaining_processes(spec)
199    }
200
201    pub fn with_external<P, D: Deploy<'a>>(
202        self,
203        process: &External<P>,
204        spec: impl ExternalSpec<'a, D>,
205    ) -> DeployFlow<'a, D> {
206        self.into_deploy().with_external(process, spec)
207    }
208
209    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
210        self,
211        spec: impl Fn() -> S,
212    ) -> DeployFlow<'a, D> {
213        self.into_deploy().with_remaining_externals(spec)
214    }
215
216    pub fn with_cluster<C, D: Deploy<'a>>(
217        self,
218        cluster: &Cluster<C>,
219        spec: impl ClusterSpec<'a, D>,
220    ) -> DeployFlow<'a, D> {
221        self.into_deploy().with_cluster(cluster, spec)
222    }
223
224    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
225        self,
226        spec: impl Fn() -> S,
227    ) -> DeployFlow<'a, D> {
228        self.into_deploy().with_remaining_clusters(spec)
229    }
230
231    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
232        self.into_deploy::<D>().compile()
233    }
234
235    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
236        self.into_deploy::<D>().deploy(env)
237    }
238}