hydro_lang/builder/
built.rs

1use std::cell::UnsafeCell;
2use std::collections::{BTreeMap, HashMap};
3use std::marker::PhantomData;
4
5use dfir_lang::graph::{DfirGraph, eliminate_extra_unions_tees, partition_graph};
6
7use super::compiled::CompiledFlow;
8use super::deploy::{DeployFlow, DeployResult};
9use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
10use crate::ir::{HydroLeaf, emit};
11use crate::location::{Cluster, ExternalProcess, Process};
12use crate::staging_util::Invariant;
13
14pub struct BuiltFlow<'a> {
15    pub(super) ir: Vec<HydroLeaf>,
16    pub(super) process_id_name: Vec<(usize, String)>,
17    pub(super) cluster_id_name: Vec<(usize, String)>,
18    pub(super) external_id_name: Vec<(usize, String)>,
19    pub(super) used: bool,
20
21    pub(super) _phantom: Invariant<'a>,
22}
23
24impl Drop for BuiltFlow<'_> {
25    fn drop(&mut self) {
26        if !self.used {
27            panic!(
28                "Dropped BuiltFlow without instantiating, you may have forgotten to call `compile` or `deploy`."
29            );
30        }
31    }
32}
33
34pub(crate) fn build_inner(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, DfirGraph> {
35    emit(ir)
36        .into_iter()
37        .map(|(k, v)| {
38            let (mut flat_graph, _, _) = v.build();
39            eliminate_extra_unions_tees(&mut flat_graph);
40            let partitioned_graph =
41                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
42            (k, partitioned_graph)
43        })
44        .collect()
45}
46
47impl<'a> BuiltFlow<'a> {
48    pub fn ir(&self) -> &Vec<HydroLeaf> {
49        &self.ir
50    }
51
52    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroLeaf])) -> Self {
53        self.used = true;
54        f(&mut self.ir);
55        BuiltFlow {
56            ir: std::mem::take(&mut self.ir),
57            process_id_name: std::mem::take(&mut self.process_id_name),
58            cluster_id_name: std::mem::take(&mut self.cluster_id_name),
59            external_id_name: std::mem::take(&mut self.external_id_name),
60            used: false,
61            _phantom: PhantomData,
62        }
63    }
64
65    pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
66        self.optimize_with(crate::rewrites::persist_pullup::persist_pullup)
67            .into_deploy()
68    }
69
70    pub fn into_deploy<D: LocalDeploy<'a>>(mut self) -> DeployFlow<'a, D> {
71        self.used = true;
72        let processes = if D::has_trivial_node() {
73            self.process_id_name
74                .iter()
75                .map(|id| (id.0, D::trivial_process(id.0)))
76                .collect()
77        } else {
78            HashMap::new()
79        };
80
81        let clusters = if D::has_trivial_node() {
82            self.cluster_id_name
83                .iter()
84                .map(|id| (id.0, D::trivial_cluster(id.0)))
85                .collect()
86        } else {
87            HashMap::new()
88        };
89
90        let externals = if D::has_trivial_node() {
91            self.external_id_name
92                .iter()
93                .map(|id| (id.0, D::trivial_external(id.0)))
94                .collect()
95        } else {
96            HashMap::new()
97        };
98
99        DeployFlow {
100            ir: UnsafeCell::new(std::mem::take(&mut self.ir)),
101            processes,
102            process_id_name: std::mem::take(&mut self.process_id_name),
103            clusters,
104            cluster_id_name: std::mem::take(&mut self.cluster_id_name),
105            externals,
106            external_id_name: std::mem::take(&mut self.external_id_name),
107            used: false,
108            _phantom: PhantomData,
109        }
110    }
111
112    pub fn with_process<P, D: LocalDeploy<'a>>(
113        self,
114        process: &Process<P>,
115        spec: impl IntoProcessSpec<'a, D>,
116    ) -> DeployFlow<'a, D> {
117        self.into_deploy().with_process(process, spec)
118    }
119
120    pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
121        self,
122        spec: impl Fn() -> S,
123    ) -> DeployFlow<'a, D> {
124        self.into_deploy().with_remaining_processes(spec)
125    }
126
127    pub fn with_external<P, D: LocalDeploy<'a>>(
128        self,
129        process: &ExternalProcess<P>,
130        spec: impl ExternalSpec<'a, D>,
131    ) -> DeployFlow<'a, D> {
132        self.into_deploy().with_external(process, spec)
133    }
134
135    pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
136        self,
137        spec: impl Fn() -> S,
138    ) -> DeployFlow<'a, D> {
139        self.into_deploy().with_remaining_externals(spec)
140    }
141
142    pub fn with_cluster<C, D: LocalDeploy<'a>>(
143        self,
144        cluster: &Cluster<C>,
145        spec: impl ClusterSpec<'a, D>,
146    ) -> DeployFlow<'a, D> {
147        self.into_deploy().with_cluster(cluster, spec)
148    }
149
150    pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
151        self,
152        spec: impl Fn() -> S,
153    ) -> DeployFlow<'a, D> {
154        self.into_deploy().with_remaining_clusters(spec)
155    }
156
157    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
158        self.into_deploy::<D>().compile(env)
159    }
160
161    pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
162        self.into_deploy::<D>().compile_no_network()
163    }
164
165    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
166        self,
167        env: &mut D::InstantiateEnv,
168    ) -> DeployResult<'a, D> {
169        self.into_deploy::<D>().deploy(env)
170    }
171}