hydro_lang/compile/
deploy.rs

1use std::cell::UnsafeCell;
2use std::collections::{BTreeMap, HashMap};
3use std::io::Error;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use bytes::{Bytes, BytesMut};
8use futures::{Sink, Stream};
9use proc_macro2::Span;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23    ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, Process};
26use crate::staging_util::Invariant;
27
28pub struct DeployFlow<'a, D>
29where
30    D: Deploy<'a>,
31{
32    // We need to grab an `&mut` reference to the IR in `preview_compile` even though
33    // that function does not modify the IR. Using an `UnsafeCell` allows us to do this
34    // while still being able to lend out immutable references to the IR.
35    pub(super) ir: UnsafeCell<Vec<HydroRoot>>,
36
37    /// Deployed instances of each process in the flow
38    pub(super) processes: HashMap<usize, D::Process>,
39
40    /// Lists all the processes that were created in the flow, same ID as `processes`
41    /// but with the type name of the tag.
42    pub(super) process_id_name: Vec<(usize, String)>,
43
44    pub(super) externals: HashMap<usize, D::External>,
45    pub(super) external_id_name: Vec<(usize, String)>,
46
47    pub(super) clusters: HashMap<usize, D::Cluster>,
48    pub(super) cluster_id_name: Vec<(usize, String)>,
49
50    pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54    pub fn ir(&self) -> &Vec<HydroRoot> {
55        unsafe {
56            // SAFETY: even when we grab this as mutable in `preview_compile`, we do not modify it
57            &*self.ir.get()
58        }
59    }
60
61    pub fn with_process_id_name(
62        mut self,
63        process_id: usize,
64        process_name: String,
65        spec: impl IntoProcessSpec<'a, D>,
66    ) -> Self {
67        self.processes.insert(
68            process_id,
69            spec.into_process_spec().build(process_id, &process_name),
70        );
71        self
72    }
73
74    pub fn with_process<P>(self, process: &Process<P>, spec: impl IntoProcessSpec<'a, D>) -> Self {
75        self.with_process_id_name(process.id, std::any::type_name::<P>().to_string(), spec)
76    }
77
78    pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
79        mut self,
80        spec: impl Fn() -> S,
81    ) -> Self {
82        for (id, name) in &self.process_id_name {
83            self.processes
84                .insert(*id, spec().into_process_spec().build(*id, name));
85        }
86
87        self
88    }
89
90    pub fn with_external<P>(
91        mut self,
92        process: &External<P>,
93        spec: impl ExternalSpec<'a, D>,
94    ) -> Self {
95        let tag_name = std::any::type_name::<P>().to_string();
96        self.externals
97            .insert(process.id, spec.build(process.id, &tag_name));
98        self
99    }
100
101    pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
102        mut self,
103        spec: impl Fn() -> S,
104    ) -> Self {
105        for (id, name) in &self.external_id_name {
106            self.externals.insert(*id, spec().build(*id, name));
107        }
108
109        self
110    }
111
112    pub fn with_cluster_id_name(
113        mut self,
114        cluster_id: usize,
115        cluster_name: String,
116        spec: impl ClusterSpec<'a, D>,
117    ) -> Self {
118        self.clusters
119            .insert(cluster_id, spec.build(cluster_id, &cluster_name));
120        self
121    }
122
123    pub fn with_cluster<C>(self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
124        self.with_cluster_id_name(cluster.id, std::any::type_name::<C>().to_string(), spec)
125    }
126
127    pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
128        mut self,
129        spec: impl Fn() -> S,
130    ) -> Self {
131        for (id, name) in &self.cluster_id_name {
132            self.clusters.insert(*id, spec().build(*id, name));
133        }
134
135        self
136    }
137
138    /// Compiles the flow into DFIR using placeholders for the network.
139    /// Useful for generating Mermaid diagrams of the DFIR.
140    pub fn preview_compile(&self) -> CompiledFlow<'a, ()> {
141        CompiledFlow {
142            dfir: build_inner(unsafe {
143                // SAFETY: `build_inner` does not mutate the IR, &mut is required
144                // only because the shared traversal logic requires it
145                &mut *self.ir.get()
146            }),
147            _phantom: PhantomData,
148        }
149    }
150
151    pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> {
152        CompiledFlow {
153            dfir: build_inner(self.ir.get_mut()),
154            _phantom: PhantomData,
155        }
156    }
157}
158
159impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
160    pub fn compile(mut self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
161        let mut seen_tees: HashMap<_, _> = HashMap::new();
162        let mut extra_stmts = BTreeMap::new();
163        self.ir.get_mut().iter_mut().for_each(|leaf| {
164            leaf.compile_network::<D>(
165                env,
166                &mut extra_stmts,
167                &mut seen_tees,
168                &self.processes,
169                &self.clusters,
170                &self.externals,
171            );
172        });
173
174        CompiledFlow {
175            dfir: build_inner(self.ir.get_mut()),
176            _phantom: PhantomData,
177        }
178    }
179
180    fn cluster_id_stmts(
181        &self,
182        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
183        env: &<D as Deploy<'a>>::CompileEnv,
184    ) {
185        let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
186        all_clusters_sorted.sort();
187
188        for &c_id in all_clusters_sorted {
189            let self_id_ident = syn::Ident::new(
190                &format!("__hydro_lang_cluster_self_id_{}", c_id),
191                Span::call_site(),
192            );
193            let self_id_expr = D::cluster_self_id(env).splice_untyped();
194            extra_stmts
195                .entry(c_id)
196                .or_default()
197                .push(syn::parse_quote! {
198                    let #self_id_ident = #self_id_expr;
199                });
200
201            for other_location in self.processes.keys().chain(self.clusters.keys()) {
202                let other_id_ident = syn::Ident::new(
203                    &format!("__hydro_lang_cluster_ids_{}", c_id),
204                    Span::call_site(),
205                );
206                let other_id_expr = D::cluster_ids(env, c_id).splice_untyped();
207                extra_stmts
208                    .entry(*other_location)
209                    .or_default()
210                    .push(syn::parse_quote! {
211                        let #other_id_ident = #other_id_expr;
212                    });
213            }
214        }
215    }
216}
217
218impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
219    #[must_use]
220    pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
221        let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
222        let mut extra_stmts = BTreeMap::new();
223        self.ir.get_mut().iter_mut().for_each(|leaf| {
224            leaf.compile_network::<D>(
225                &(),
226                &mut extra_stmts,
227                &mut seen_tees_instantiate,
228                &self.processes,
229                &self.clusters,
230                &self.externals,
231            );
232        });
233
234        let mut compiled = build_inner(self.ir.get_mut());
235        self.cluster_id_stmts(&mut extra_stmts, &());
236        let mut meta = D::Meta::default();
237
238        let (mut processes, mut clusters, mut externals) = (
239            std::mem::take(&mut self.processes)
240                .into_iter()
241                .filter_map(|(node_id, node)| {
242                    if let Some(ir) = compiled.remove(&node_id) {
243                        node.instantiate(
244                            env,
245                            &mut meta,
246                            ir,
247                            extra_stmts.remove(&node_id).unwrap_or_default(),
248                        );
249                        Some((node_id, node))
250                    } else {
251                        None
252                    }
253                })
254                .collect::<HashMap<_, _>>(),
255            std::mem::take(&mut self.clusters)
256                .into_iter()
257                .filter_map(|(cluster_id, cluster)| {
258                    if let Some(ir) = compiled.remove(&cluster_id) {
259                        cluster.instantiate(
260                            env,
261                            &mut meta,
262                            ir,
263                            extra_stmts.remove(&cluster_id).unwrap_or_default(),
264                        );
265                        Some((cluster_id, cluster))
266                    } else {
267                        None
268                    }
269                })
270                .collect::<HashMap<_, _>>(),
271            std::mem::take(&mut self.externals)
272                .into_iter()
273                .map(|(external_id, external)| {
274                    external.instantiate(
275                        env,
276                        &mut meta,
277                        Default::default(),
278                        extra_stmts.remove(&external_id).unwrap_or_default(),
279                    );
280                    (external_id, external)
281                })
282                .collect::<HashMap<_, _>>(),
283        );
284
285        for node in processes.values_mut() {
286            node.update_meta(&meta);
287        }
288
289        for cluster in clusters.values_mut() {
290            cluster.update_meta(&meta);
291        }
292
293        for external in externals.values_mut() {
294            external.update_meta(&meta);
295        }
296
297        let mut seen_tees_connect = HashMap::new();
298        self.ir.get_mut().iter_mut().for_each(|leaf| {
299            leaf.connect_network(&mut seen_tees_connect);
300        });
301
302        DeployResult {
303            processes,
304            clusters,
305            externals,
306            cluster_id_name: std::mem::take(&mut self.cluster_id_name)
307                .into_iter()
308                .collect(),
309            process_id_name: std::mem::take(&mut self.process_id_name)
310                .into_iter()
311                .collect(),
312        }
313    }
314}
315
316pub struct DeployResult<'a, D: Deploy<'a>> {
317    processes: HashMap<usize, D::Process>,
318    clusters: HashMap<usize, D::Cluster>,
319    externals: HashMap<usize, D::External>,
320    cluster_id_name: HashMap<usize, String>,
321    process_id_name: HashMap<usize, String>,
322}
323
324impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
325    pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
326        let id = match p.id() {
327            LocationId::Process(id) => id,
328            _ => panic!("Process ID expected"),
329        };
330
331        self.processes.get(&id).unwrap()
332    }
333
334    pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
335        let id = match c.id() {
336            LocationId::Cluster(id) => id,
337            _ => panic!("Cluster ID expected"),
338        };
339
340        self.clusters.get(&id).unwrap()
341    }
342
343    pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, String, &D::Cluster)> {
344        self.clusters.iter().map(|(&id, c)| {
345            (
346                LocationId::Cluster(id),
347                self.cluster_id_name.get(&id).unwrap().clone(),
348                c,
349            )
350        })
351    }
352
353    pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, String, &D::Process)> {
354        self.processes.iter().map(|(&id, p)| {
355            (
356                LocationId::Process(id),
357                self.process_id_name.get(&id).unwrap().clone(),
358                p,
359            )
360        })
361    }
362
363    pub fn get_external<P>(&self, p: &External<P>) -> &D::External {
364        self.externals.get(&p.id).unwrap()
365    }
366
367    pub fn raw_port<M>(&self, port: ExternalBytesPort<M>) -> D::ExternalRawPort {
368        self.externals
369            .get(&port.process_id)
370            .unwrap()
371            .raw_port(port.port_id)
372    }
373
374    #[deprecated(note = "use `connect` instead")]
375    pub async fn connect_bytes<M>(
376        &self,
377        port: ExternalBytesPort<M>,
378    ) -> (
379        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
380        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
381    ) {
382        self.connect(port).await
383    }
384
385    #[deprecated(note = "use `connect` instead")]
386    pub async fn connect_sink_bytes<M>(
387        &self,
388        port: ExternalBytesPort<M>,
389    ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
390        self.connect(port).await.1
391    }
392
393    pub async fn connect_bincode<
394        InT: Serialize + 'static,
395        OutT: DeserializeOwned + 'static,
396        Many,
397    >(
398        &self,
399        port: ExternalBincodeBidi<InT, OutT, Many>,
400    ) -> (
401        Pin<Box<dyn Stream<Item = OutT>>>,
402        Pin<Box<dyn Sink<InT, Error = Error>>>,
403    ) {
404        self.externals
405            .get(&port.process_id)
406            .unwrap()
407            .as_bincode_bidi(port.port_id)
408            .await
409    }
410
411    #[deprecated(note = "use `connect` instead")]
412    pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
413        &self,
414        port: ExternalBincodeSink<T, Many>,
415    ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
416        self.connect(port).await
417    }
418
419    #[deprecated(note = "use `connect` instead")]
420    pub async fn connect_source_bytes(
421        &self,
422        port: ExternalBytesPort,
423    ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
424        self.connect(port).await.0
425    }
426
427    #[deprecated(note = "use `connect` instead")]
428    pub async fn connect_source_bincode<
429        T: Serialize + DeserializeOwned + 'static,
430        O: Ordering,
431        R: Retries,
432    >(
433        &self,
434        port: ExternalBincodeStream<T, O, R>,
435    ) -> Pin<Box<dyn Stream<Item = T>>> {
436        self.connect(port).await
437    }
438
439    pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
440        &'b self,
441        port: P,
442    ) -> <P as ConnectableAsync<&'b Self>>::Output {
443        port.connect(self).await
444    }
445}
446
447pub trait ConnectableAsync<Ctx> {
448    type Output;
449
450    fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
451}
452
453impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
454    type Output = (
455        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
456        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
457    );
458
459    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
460        ctx.externals
461            .get(&self.process_id)
462            .unwrap()
463            .as_bytes_bidi(self.port_id)
464            .await
465    }
466}
467
468impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
469    ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
470{
471    type Output = Pin<Box<dyn Stream<Item = T>>>;
472
473    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
474        ctx.externals
475            .get(&self.process_id)
476            .unwrap()
477            .as_bincode_source(self.port_id)
478            .await
479    }
480}
481
482impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
483    for ExternalBincodeSink<T, Many>
484{
485    type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
486
487    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
488        ctx.externals
489            .get(&self.process_id)
490            .unwrap()
491            .as_bincode_sink(self.port_id)
492            .await
493    }
494}