1use std::cell::UnsafeCell;
2use std::collections::{BTreeMap, HashMap};
3use std::marker::PhantomData;
4
5use dfir_lang::graph::{DfirGraph, eliminate_extra_unions_tees, partition_graph};
6
7use super::compiled::CompiledFlow;
8use super::deploy::{DeployFlow, DeployResult};
9use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
10use crate::ir::{HydroLeaf, emit};
11use crate::location::{Cluster, ExternalProcess, Process};
12use crate::staging_util::Invariant;
13
14pub struct BuiltFlow<'a> {
15 pub(super) ir: Vec<HydroLeaf>,
16 pub(super) process_id_name: Vec<(usize, String)>,
17 pub(super) cluster_id_name: Vec<(usize, String)>,
18 pub(super) external_id_name: Vec<(usize, String)>,
19 pub(super) used: bool,
20
21 pub(super) _phantom: Invariant<'a>,
22}
23
24impl Drop for BuiltFlow<'_> {
25 fn drop(&mut self) {
26 if !self.used {
27 panic!(
28 "Dropped BuiltFlow without instantiating, you may have forgotten to call `compile` or `deploy`."
29 );
30 }
31 }
32}
33
34pub(crate) fn build_inner(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, DfirGraph> {
35 emit(ir)
36 .into_iter()
37 .map(|(k, v)| {
38 let (mut flat_graph, _, _) = v.build();
39 eliminate_extra_unions_tees(&mut flat_graph);
40 let partitioned_graph =
41 partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
42 (k, partitioned_graph)
43 })
44 .collect()
45}
46
47impl<'a> BuiltFlow<'a> {
48 pub fn ir(&self) -> &Vec<HydroLeaf> {
49 &self.ir
50 }
51
52 pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroLeaf])) -> Self {
53 self.used = true;
54 f(&mut self.ir);
55 BuiltFlow {
56 ir: std::mem::take(&mut self.ir),
57 process_id_name: std::mem::take(&mut self.process_id_name),
58 cluster_id_name: std::mem::take(&mut self.cluster_id_name),
59 external_id_name: std::mem::take(&mut self.external_id_name),
60 used: false,
61 _phantom: PhantomData,
62 }
63 }
64
65 pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
66 self.optimize_with(crate::rewrites::persist_pullup::persist_pullup)
67 .into_deploy()
68 }
69
70 pub fn into_deploy<D: LocalDeploy<'a>>(mut self) -> DeployFlow<'a, D> {
71 self.used = true;
72 let processes = if D::has_trivial_node() {
73 self.process_id_name
74 .iter()
75 .map(|id| (id.0, D::trivial_process(id.0)))
76 .collect()
77 } else {
78 HashMap::new()
79 };
80
81 let clusters = if D::has_trivial_node() {
82 self.cluster_id_name
83 .iter()
84 .map(|id| (id.0, D::trivial_cluster(id.0)))
85 .collect()
86 } else {
87 HashMap::new()
88 };
89
90 let externals = if D::has_trivial_node() {
91 self.external_id_name
92 .iter()
93 .map(|id| (id.0, D::trivial_external(id.0)))
94 .collect()
95 } else {
96 HashMap::new()
97 };
98
99 DeployFlow {
100 ir: UnsafeCell::new(std::mem::take(&mut self.ir)),
101 processes,
102 process_id_name: std::mem::take(&mut self.process_id_name),
103 clusters,
104 cluster_id_name: std::mem::take(&mut self.cluster_id_name),
105 externals,
106 external_id_name: std::mem::take(&mut self.external_id_name),
107 used: false,
108 _phantom: PhantomData,
109 }
110 }
111
112 pub fn with_process<P, D: LocalDeploy<'a>>(
113 self,
114 process: &Process<P>,
115 spec: impl IntoProcessSpec<'a, D>,
116 ) -> DeployFlow<'a, D> {
117 self.into_deploy().with_process(process, spec)
118 }
119
120 pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
121 self,
122 spec: impl Fn() -> S,
123 ) -> DeployFlow<'a, D> {
124 self.into_deploy().with_remaining_processes(spec)
125 }
126
127 pub fn with_external<P, D: LocalDeploy<'a>>(
128 self,
129 process: &ExternalProcess<P>,
130 spec: impl ExternalSpec<'a, D>,
131 ) -> DeployFlow<'a, D> {
132 self.into_deploy().with_external(process, spec)
133 }
134
135 pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
136 self,
137 spec: impl Fn() -> S,
138 ) -> DeployFlow<'a, D> {
139 self.into_deploy().with_remaining_externals(spec)
140 }
141
142 pub fn with_cluster<C, D: LocalDeploy<'a>>(
143 self,
144 cluster: &Cluster<C>,
145 spec: impl ClusterSpec<'a, D>,
146 ) -> DeployFlow<'a, D> {
147 self.into_deploy().with_cluster(cluster, spec)
148 }
149
150 pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
151 self,
152 spec: impl Fn() -> S,
153 ) -> DeployFlow<'a, D> {
154 self.into_deploy().with_remaining_clusters(spec)
155 }
156
157 pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
158 self.into_deploy::<D>().compile(env)
159 }
160
161 pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
162 self.into_deploy::<D>().compile_no_network()
163 }
164
165 pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
166 self,
167 env: &mut D::InstantiateEnv,
168 ) -> DeployResult<'a, D> {
169 self.into_deploy::<D>().deploy(env)
170 }
171}