hydro_lang/compile/
deploy.rs

1use std::collections::{BTreeMap, HashMap};
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::QuotedWithContext;
12
13use super::built::build_inner;
14use super::compiled::CompiledFlow;
15use super::deploy_provider::{
16    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
17};
18use super::ir::HydroRoot;
19use crate::live_collections::stream::{Ordering, Retries};
20use crate::location::dynamic::LocationId;
21use crate::location::external_process::{
22    ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
23};
24use crate::location::{Cluster, External, Location, Process};
25use crate::staging_util::Invariant;
26
27pub struct DeployFlow<'a, D>
28where
29    D: Deploy<'a>,
30{
31    pub(super) ir: Vec<HydroRoot>,
32
33    /// Deployed instances of each process in the flow
34    pub(super) processes: HashMap<usize, D::Process>,
35
36    /// Lists all the processes that were created in the flow, same ID as `processes`
37    /// but with the type name of the tag.
38    pub(super) process_id_name: Vec<(usize, String)>,
39
40    pub(super) externals: HashMap<usize, D::External>,
41    pub(super) external_id_name: Vec<(usize, String)>,
42
43    pub(super) clusters: HashMap<usize, D::Cluster>,
44    pub(super) cluster_id_name: Vec<(usize, String)>,
45
46    pub(super) _phantom: Invariant<'a, D>,
47}
48
49impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
50    pub fn ir(&self) -> &Vec<HydroRoot> {
51        &self.ir
52    }
53
54    pub fn with_process_id_name(
55        mut self,
56        process_id: usize,
57        process_name: String,
58        spec: impl IntoProcessSpec<'a, D>,
59    ) -> Self {
60        self.processes.insert(
61            process_id,
62            spec.into_process_spec().build(process_id, &process_name),
63        );
64        self
65    }
66
67    pub fn with_process<P>(self, process: &Process<P>, spec: impl IntoProcessSpec<'a, D>) -> Self {
68        self.with_process_id_name(process.id, std::any::type_name::<P>().to_string(), spec)
69    }
70
71    pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
72        mut self,
73        spec: impl Fn() -> S,
74    ) -> Self {
75        for (id, name) in &self.process_id_name {
76            self.processes
77                .insert(*id, spec().into_process_spec().build(*id, name));
78        }
79
80        self
81    }
82
83    pub fn with_external<P>(
84        mut self,
85        process: &External<P>,
86        spec: impl ExternalSpec<'a, D>,
87    ) -> Self {
88        let tag_name = std::any::type_name::<P>().to_string();
89        self.externals
90            .insert(process.id, spec.build(process.id, &tag_name));
91        self
92    }
93
94    pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
95        mut self,
96        spec: impl Fn() -> S,
97    ) -> Self {
98        for (id, name) in &self.external_id_name {
99            self.externals.insert(*id, spec().build(*id, name));
100        }
101
102        self
103    }
104
105    pub fn with_cluster_id_name(
106        mut self,
107        cluster_id: usize,
108        cluster_name: String,
109        spec: impl ClusterSpec<'a, D>,
110    ) -> Self {
111        self.clusters
112            .insert(cluster_id, spec.build(cluster_id, &cluster_name));
113        self
114    }
115
116    pub fn with_cluster<C>(self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
117        self.with_cluster_id_name(cluster.id, std::any::type_name::<C>().to_string(), spec)
118    }
119
120    pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
121        mut self,
122        spec: impl Fn() -> S,
123    ) -> Self {
124        for (id, name) in &self.cluster_id_name {
125            self.clusters.insert(*id, spec().build(*id, name));
126        }
127
128        self
129    }
130
131    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) without networking.
132    /// Useful for generating Mermaid diagrams of the DFIR.
133    ///
134    /// (This returned DFIR will not compile due to the networking missing).
135    pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
136        // NOTE: `build_inner` does not actually mutate the IR, but `&mut` is required
137        // only because the shared traversal logic requires it
138        CompiledFlow {
139            dfir: build_inner::<D>(&mut self.ir),
140            extra_stmts: BTreeMap::new(),
141            _phantom: PhantomData,
142        }
143    }
144}
145
146impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
147    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking.
148    ///
149    /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR).
150    pub fn compile(&mut self) -> CompiledFlow<'a> {
151        let mut seen_tees: HashMap<_, _> = HashMap::new();
152        let mut extra_stmts = BTreeMap::new();
153        self.ir.iter_mut().for_each(|leaf| {
154            leaf.compile_network::<D>(
155                &mut extra_stmts,
156                &mut seen_tees,
157                &self.processes,
158                &self.clusters,
159                &self.externals,
160            );
161        });
162
163        CompiledFlow {
164            dfir: build_inner::<D>(&mut self.ir),
165            extra_stmts,
166            _phantom: PhantomData,
167        }
168    }
169
170    /// Creates the variables for cluster IDs and adds them into `extra_stmts`.
171    fn cluster_id_stmts(&self, extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>) {
172        let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
173        all_clusters_sorted.sort();
174
175        for &c_id in all_clusters_sorted {
176            let self_id_ident = syn::Ident::new(
177                &format!("__hydro_lang_cluster_self_id_{}", c_id),
178                Span::call_site(),
179            );
180            let self_id_expr = D::cluster_self_id().splice_untyped();
181            extra_stmts
182                .entry(c_id)
183                .or_default()
184                .push(syn::parse_quote! {
185                    let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
186                });
187
188            for other_location in self.processes.keys().chain(self.clusters.keys()) {
189                let other_id_ident = syn::Ident::new(
190                    &format!("__hydro_lang_cluster_ids_{}", c_id),
191                    Span::call_site(),
192                );
193                let other_id_expr = D::cluster_ids(c_id).splice_untyped();
194                extra_stmts
195                    .entry(*other_location)
196                    .or_default()
197                    .push(syn::parse_quote! {
198                        let #other_id_ident = #other_id_expr;
199                    });
200            }
201        }
202    }
203
204    /// Compiles and deploys the flow.
205    ///
206    /// Rough outline of steps:
207    /// * Compiles the Hydro into DFIR.
208    /// * Instantiates nodes as configured.
209    /// * Compiles the corresponding DFIR into binaries for nodes as needed.
210    /// * Connects up networking as needed.
211    #[must_use]
212    pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
213        let CompiledFlow {
214            dfir,
215            mut extra_stmts,
216            _phantom,
217        } = self.compile();
218
219        let mut compiled = dfir;
220        self.cluster_id_stmts(&mut extra_stmts);
221        let mut meta = D::Meta::default();
222
223        let (mut processes, mut clusters, mut externals) = (
224            std::mem::take(&mut self.processes)
225                .into_iter()
226                .filter_map(|(node_id, node)| {
227                    if let Some(ir) = compiled.remove(&node_id) {
228                        node.instantiate(
229                            env,
230                            &mut meta,
231                            ir,
232                            extra_stmts.remove(&node_id).unwrap_or_default(),
233                        );
234                        Some((node_id, node))
235                    } else {
236                        None
237                    }
238                })
239                .collect::<HashMap<_, _>>(),
240            std::mem::take(&mut self.clusters)
241                .into_iter()
242                .filter_map(|(cluster_id, cluster)| {
243                    if let Some(ir) = compiled.remove(&cluster_id) {
244                        cluster.instantiate(
245                            env,
246                            &mut meta,
247                            ir,
248                            extra_stmts.remove(&cluster_id).unwrap_or_default(),
249                        );
250                        Some((cluster_id, cluster))
251                    } else {
252                        None
253                    }
254                })
255                .collect::<HashMap<_, _>>(),
256            std::mem::take(&mut self.externals)
257                .into_iter()
258                .map(|(external_id, external)| {
259                    external.instantiate(
260                        env,
261                        &mut meta,
262                        Default::default(),
263                        extra_stmts.remove(&external_id).unwrap_or_default(),
264                    );
265                    (external_id, external)
266                })
267                .collect::<HashMap<_, _>>(),
268        );
269
270        for node in processes.values_mut() {
271            node.update_meta(&meta);
272        }
273
274        for cluster in clusters.values_mut() {
275            cluster.update_meta(&meta);
276        }
277
278        for external in externals.values_mut() {
279            external.update_meta(&meta);
280        }
281
282        let mut seen_tees_connect = HashMap::new();
283        self.ir.iter_mut().for_each(|leaf| {
284            leaf.connect_network(&mut seen_tees_connect);
285        });
286
287        DeployResult {
288            processes,
289            clusters,
290            externals,
291            cluster_id_name: std::mem::take(&mut self.cluster_id_name)
292                .into_iter()
293                .collect(),
294            process_id_name: std::mem::take(&mut self.process_id_name)
295                .into_iter()
296                .collect(),
297        }
298    }
299}
300
301pub struct DeployResult<'a, D: Deploy<'a>> {
302    processes: HashMap<usize, D::Process>,
303    clusters: HashMap<usize, D::Cluster>,
304    externals: HashMap<usize, D::External>,
305    cluster_id_name: HashMap<usize, String>,
306    process_id_name: HashMap<usize, String>,
307}
308
309impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
310    pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
311        let id = match p.id() {
312            LocationId::Process(id) => id,
313            _ => panic!("Process ID expected"),
314        };
315
316        self.processes.get(&id).unwrap()
317    }
318
319    pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
320        let id = match c.id() {
321            LocationId::Cluster(id) => id,
322            _ => panic!("Cluster ID expected"),
323        };
324
325        self.clusters.get(&id).unwrap()
326    }
327
328    pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, String, &D::Cluster)> {
329        self.clusters.iter().map(|(&id, c)| {
330            (
331                LocationId::Cluster(id),
332                self.cluster_id_name.get(&id).unwrap().clone(),
333                c,
334            )
335        })
336    }
337
338    pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, String, &D::Process)> {
339        self.processes.iter().map(|(&id, p)| {
340            (
341                LocationId::Process(id),
342                self.process_id_name.get(&id).unwrap().clone(),
343                p,
344            )
345        })
346    }
347
348    pub fn get_external<P>(&self, p: &External<P>) -> &D::External {
349        self.externals.get(&p.id).unwrap()
350    }
351
352    #[deprecated(note = "use `connect` instead")]
353    pub async fn connect_bytes<M>(
354        &self,
355        port: ExternalBytesPort<M>,
356    ) -> (
357        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
358        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
359    ) {
360        self.connect(port).await
361    }
362
363    #[deprecated(note = "use `connect` instead")]
364    pub async fn connect_sink_bytes<M>(
365        &self,
366        port: ExternalBytesPort<M>,
367    ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
368        self.connect(port).await.1
369    }
370
371    pub async fn connect_bincode<
372        InT: Serialize + 'static,
373        OutT: DeserializeOwned + 'static,
374        Many,
375    >(
376        &self,
377        port: ExternalBincodeBidi<InT, OutT, Many>,
378    ) -> (
379        Pin<Box<dyn Stream<Item = OutT>>>,
380        Pin<Box<dyn Sink<InT, Error = Error>>>,
381    ) {
382        self.externals
383            .get(&port.process_id)
384            .unwrap()
385            .as_bincode_bidi(port.port_id)
386            .await
387    }
388
389    #[deprecated(note = "use `connect` instead")]
390    pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
391        &self,
392        port: ExternalBincodeSink<T, Many>,
393    ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
394        self.connect(port).await
395    }
396
397    #[deprecated(note = "use `connect` instead")]
398    pub async fn connect_source_bytes(
399        &self,
400        port: ExternalBytesPort,
401    ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
402        self.connect(port).await.0
403    }
404
405    #[deprecated(note = "use `connect` instead")]
406    pub async fn connect_source_bincode<
407        T: Serialize + DeserializeOwned + 'static,
408        O: Ordering,
409        R: Retries,
410    >(
411        &self,
412        port: ExternalBincodeStream<T, O, R>,
413    ) -> Pin<Box<dyn Stream<Item = T>>> {
414        self.connect(port).await
415    }
416
417    pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
418        &'b self,
419        port: P,
420    ) -> <P as ConnectableAsync<&'b Self>>::Output {
421        port.connect(self).await
422    }
423}
424
425#[cfg(stageleft_runtime)]
426#[cfg(feature = "deploy")]
427#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
428impl DeployResult<'_, crate::deploy::HydroDeploy> {
429    /// Get the raw port handle.
430    pub fn raw_port<M>(
431        &self,
432        port: ExternalBytesPort<M>,
433    ) -> hydro_deploy::custom_service::CustomClientPort {
434        self.externals
435            .get(&port.process_id)
436            .unwrap()
437            .raw_port(port.port_id)
438    }
439}
440
441pub trait ConnectableAsync<Ctx> {
442    type Output;
443
444    fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
445}
446
447impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
448    type Output = (
449        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
450        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
451    );
452
453    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
454        ctx.externals
455            .get(&self.process_id)
456            .unwrap()
457            .as_bytes_bidi(self.port_id)
458            .await
459    }
460}
461
462impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
463    ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
464{
465    type Output = Pin<Box<dyn Stream<Item = T>>>;
466
467    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
468        ctx.externals
469            .get(&self.process_id)
470            .unwrap()
471            .as_bincode_source(self.port_id)
472            .await
473    }
474}
475
476impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
477    for ExternalBincodeSink<T, Many>
478{
479    type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
480
481    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
482        ctx.externals
483            .get(&self.process_id)
484            .unwrap()
485            .as_bincode_sink(self.port_id)
486            .await
487    }
488}