Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Limitations
11//!
12//! Networking is **not** supported. All `Deploy` networking trait methods will panic
13//! if called. Only pure local computations (with data embedded in the Hydro program)
14//! are supported.
15
16use std::future::Future;
17use std::io::Error;
18use std::pin::Pin;
19
20use bytes::{Bytes, BytesMut};
21use dfir_lang::diagnostic::Diagnostics;
22use dfir_lang::graph::DfirGraph;
23use futures::{Sink, Stream};
24use proc_macro2::Span;
25use quote::quote;
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use slotmap::SparseSecondaryMap;
29use stageleft::{QuotedWithContext, q};
30
31use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
32use crate::compile::builder::ExternalPortId;
33use crate::location::dynamic::LocationId;
34use crate::location::member_id::TaglessMemberId;
35use crate::location::{LocationKey, MembershipEvent, NetworkHint};
36
37/// Marker type for the embedded deployment backend.
38///
39/// All networking methods panic — this backend only supports pure local computation.
40pub enum EmbeddedDeploy {}
41
42/// A trivial node type for embedded deployment. Stores a user-provided function name.
43#[derive(Clone)]
44pub struct EmbeddedNode {
45    /// The function name to use in the generated code for this location.
46    pub fn_name: String,
47}
48
49impl Node for EmbeddedNode {
50    type Port = ();
51    type Meta = ();
52    type InstantiateEnv = EmbeddedInstantiateEnv;
53
54    fn next_port(&self) -> Self::Port {}
55
56    fn update_meta(&self, _meta: &Self::Meta) {}
57
58    fn instantiate(
59        &self,
60        _env: &mut Self::InstantiateEnv,
61        _meta: &mut Self::Meta,
62        _graph: DfirGraph,
63        _extra_stmts: &[syn::Stmt],
64        _sidecars: &[syn::Expr],
65    ) {
66        // No-op: embedded mode doesn't instantiate nodes at deploy time.
67    }
68}
69
70impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
71    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
72        panic!("EmbeddedDeploy does not support external ports");
73    }
74
75    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
76    fn as_bytes_bidi(
77        &self,
78        _external_port_id: ExternalPortId,
79    ) -> impl Future<
80        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
81    > + 'a {
82        async { panic!("EmbeddedDeploy does not support external ports") }
83    }
84
85    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
86    fn as_bincode_bidi<InT, OutT>(
87        &self,
88        _external_port_id: ExternalPortId,
89    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
90    where
91        InT: Serialize + 'static,
92        OutT: DeserializeOwned + 'static,
93    {
94        async { panic!("EmbeddedDeploy does not support external ports") }
95    }
96
97    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
98    fn as_bincode_sink<T>(
99        &self,
100        _external_port_id: ExternalPortId,
101    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
102    where
103        T: Serialize + 'static,
104    {
105        async { panic!("EmbeddedDeploy does not support external ports") }
106    }
107
108    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109    fn as_bincode_source<T>(
110        &self,
111        _external_port_id: ExternalPortId,
112    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
113    where
114        T: DeserializeOwned + 'static,
115    {
116        async { panic!("EmbeddedDeploy does not support external ports") }
117    }
118}
119
120impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
121    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
122        EmbeddedNode {
123            fn_name: self.into(),
124        }
125    }
126}
127
128impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
129    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
130        EmbeddedNode {
131            fn_name: self.into(),
132        }
133    }
134}
135
136impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
137    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
138        EmbeddedNode {
139            fn_name: self.into(),
140        }
141    }
142}
143
144/// Collected embedded input/output registrations, keyed by location.
145///
146/// During `compile_network`, each `HydroSource::Embedded` and `HydroRoot::EmbeddedOutput`
147/// IR node registers its ident, element type, and location key here.
148/// `generate_embedded` then uses this to add the appropriate parameters
149/// to each generated function.
150#[derive(Default)]
151pub struct EmbeddedInstantiateEnv {
152    /// (ident name, element type) pairs per location key, for inputs.
153    pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
154    /// (ident name, element type) pairs per location key, for outputs.
155    pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
156}
157
158impl<'a> Deploy<'a> for EmbeddedDeploy {
159    type Meta = ();
160    type InstantiateEnv = EmbeddedInstantiateEnv;
161
162    type Process = EmbeddedNode;
163    type Cluster = EmbeddedNode;
164    type External = EmbeddedNode;
165
166    fn o2o_sink_source(
167        _p1: &Self::Process,
168        _p1_port: &(),
169        _p2: &Self::Process,
170        _p2_port: &(),
171    ) -> (syn::Expr, syn::Expr) {
172        panic!("EmbeddedDeploy does not support networking (o2o)")
173    }
174
175    fn o2o_connect(
176        _p1: &Self::Process,
177        _p1_port: &(),
178        _p2: &Self::Process,
179        _p2_port: &(),
180    ) -> Box<dyn FnOnce()> {
181        panic!("EmbeddedDeploy does not support networking (o2o)")
182    }
183
184    fn o2m_sink_source(
185        _p1: &Self::Process,
186        _p1_port: &(),
187        _c2: &Self::Cluster,
188        _c2_port: &(),
189    ) -> (syn::Expr, syn::Expr) {
190        panic!("EmbeddedDeploy does not support networking (o2m)")
191    }
192
193    fn o2m_connect(
194        _p1: &Self::Process,
195        _p1_port: &(),
196        _c2: &Self::Cluster,
197        _c2_port: &(),
198    ) -> Box<dyn FnOnce()> {
199        panic!("EmbeddedDeploy does not support networking (o2m)")
200    }
201
202    fn m2o_sink_source(
203        _c1: &Self::Cluster,
204        _c1_port: &(),
205        _p2: &Self::Process,
206        _p2_port: &(),
207    ) -> (syn::Expr, syn::Expr) {
208        panic!("EmbeddedDeploy does not support networking (m2o)")
209    }
210
211    fn m2o_connect(
212        _c1: &Self::Cluster,
213        _c1_port: &(),
214        _p2: &Self::Process,
215        _p2_port: &(),
216    ) -> Box<dyn FnOnce()> {
217        panic!("EmbeddedDeploy does not support networking (m2o)")
218    }
219
220    fn m2m_sink_source(
221        _c1: &Self::Cluster,
222        _c1_port: &(),
223        _c2: &Self::Cluster,
224        _c2_port: &(),
225    ) -> (syn::Expr, syn::Expr) {
226        panic!("EmbeddedDeploy does not support networking (m2m)")
227    }
228
229    fn m2m_connect(
230        _c1: &Self::Cluster,
231        _c1_port: &(),
232        _c2: &Self::Cluster,
233        _c2_port: &(),
234    ) -> Box<dyn FnOnce()> {
235        panic!("EmbeddedDeploy does not support networking (m2m)")
236    }
237
238    fn e2o_many_source(
239        _extra_stmts: &mut Vec<syn::Stmt>,
240        _p2: &Self::Process,
241        _p2_port: &(),
242        _codec_type: &syn::Type,
243        _shared_handle: String,
244    ) -> syn::Expr {
245        panic!("EmbeddedDeploy does not support networking (e2o)")
246    }
247
248    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
249        panic!("EmbeddedDeploy does not support networking (e2o)")
250    }
251
252    fn e2o_source(
253        _extra_stmts: &mut Vec<syn::Stmt>,
254        _p1: &Self::External,
255        _p1_port: &(),
256        _p2: &Self::Process,
257        _p2_port: &(),
258        _codec_type: &syn::Type,
259        _shared_handle: String,
260    ) -> syn::Expr {
261        panic!("EmbeddedDeploy does not support networking (e2o)")
262    }
263
264    fn e2o_connect(
265        _p1: &Self::External,
266        _p1_port: &(),
267        _p2: &Self::Process,
268        _p2_port: &(),
269        _many: bool,
270        _server_hint: NetworkHint,
271    ) -> Box<dyn FnOnce()> {
272        panic!("EmbeddedDeploy does not support networking (e2o)")
273    }
274
275    fn o2e_sink(
276        _p1: &Self::Process,
277        _p1_port: &(),
278        _p2: &Self::External,
279        _p2_port: &(),
280        _shared_handle: String,
281    ) -> syn::Expr {
282        panic!("EmbeddedDeploy does not support networking (o2e)")
283    }
284
285    #[expect(
286        unreachable_code,
287        reason = "panic before q! which is only for return type"
288    )]
289    fn cluster_ids(
290        _of_cluster: LocationKey,
291    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
292        panic!("EmbeddedDeploy does not support cluster IDs");
293        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
294    }
295
296    #[expect(
297        unreachable_code,
298        reason = "panic before q! which is only for return type"
299    )]
300    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
301        panic!("EmbeddedDeploy does not support cluster self ID");
302        q!(unreachable!(
303            "EmbeddedDeploy does not support cluster self ID"
304        ))
305    }
306
307    #[expect(
308        unreachable_code,
309        reason = "panic before q! which is only for return type"
310    )]
311    fn cluster_membership_stream(
312        _location_id: &LocationId,
313    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
314    {
315        panic!("EmbeddedDeploy does not support cluster membership streams");
316        q!(unreachable!(
317            "EmbeddedDeploy does not support cluster membership streams"
318        ))
319    }
320
321    fn register_embedded_input(
322        env: &mut Self::InstantiateEnv,
323        location_key: LocationKey,
324        ident: &syn::Ident,
325        element_type: &syn::Type,
326    ) {
327        env.inputs
328            .entry(location_key)
329            .unwrap()
330            .or_default()
331            .push((ident.clone(), element_type.clone()));
332    }
333
334    fn register_embedded_output(
335        env: &mut Self::InstantiateEnv,
336        location_key: LocationKey,
337        ident: &syn::Ident,
338        element_type: &syn::Type,
339    ) {
340        env.outputs
341            .entry(location_key)
342            .unwrap()
343            .or_default()
344            .push((ident.clone(), element_type.clone()));
345    }
346}
347
348impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
349    /// Generates a `syn::File` containing one function per location in the flow.
350    ///
351    /// Each generated function has the signature:
352    /// ```ignore
353    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
354    /// ```
355    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
356    ///
357    /// The returned `Dfir` can be manually executed by the caller.
358    ///
359    /// # Arguments
360    ///
361    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
362    ///   re-exports). Hyphens will be replaced with underscores.
363    ///
364    /// # Usage
365    ///
366    /// Typically called from a `build.rs` in a wrapper crate:
367    /// ```ignore
368    /// // build.rs
369    /// let deploy = flow.with_process(&process, "my_fn".to_string());
370    /// let code = deploy.generate_embedded("my_hydro_crate");
371    /// let out_dir = std::env::var("OUT_DIR").unwrap();
372    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
373    /// ```
374    ///
375    /// Then in `lib.rs`:
376    /// ```ignore
377    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
378    /// ```
379    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
380        let mut env = EmbeddedInstantiateEnv::default();
381        let compiled = self.compile_internal(&mut env);
382
383        let root = crate::staging_util::get_this_crate();
384        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
385
386        let mut items: Vec<syn::Item> = Vec::new();
387
388        // Sort location keys for deterministic output.
389        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
390        location_keys.sort();
391
392        for location_key in location_keys {
393            let graph = &compiled.all_dfir()[location_key];
394
395            // Get the user-provided function name from the node.
396            let fn_name = self
397                .processes
398                .get(location_key)
399                .map(|n| &n.fn_name)
400                .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
401                .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
402                .expect("location key not found in any node map");
403
404            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
405
406            // Get inputs for this location, sorted by name.
407            let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
408            loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
409
410            // Get outputs for this location, sorted by name.
411            let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
412            loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
413
414            let mut diagnostics = Diagnostics::new();
415            let dfir_tokens = graph
416                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
417                .expect("DFIR code generation failed with diagnostics.");
418
419            // Build the input parameters.
420            let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
421                .iter()
422                .map(|(ident, element_type)| {
423                    quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
424                })
425                .collect();
426
427            let has_outputs = !loc_outputs.is_empty();
428
429            if has_outputs {
430                let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
431
432                // One generic per output field.
433                let output_generic_idents: Vec<syn::Ident> = loc_outputs
434                    .iter()
435                    .enumerate()
436                    .map(|(i, _)| quote::format_ident!("__Out{}", i))
437                    .collect();
438
439                let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
440                    .iter()
441                    .zip(output_generic_idents.iter())
442                    .map(|((ident, _), generic)| {
443                        quote! { pub #ident: #generic }
444                    })
445                    .collect();
446
447                let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
448                    .iter()
449                    .zip(output_generic_idents.iter())
450                    .map(|((_, element_type), generic)| {
451                        quote! { #generic: FnMut(#element_type) }
452                    })
453                    .collect();
454
455                let fn_generics: Vec<proc_macro2::TokenStream> = loc_outputs
456                    .iter()
457                    .zip(output_generic_idents.iter())
458                    .map(|((_, element_type), generic)| {
459                        quote! { #generic: FnMut(#element_type) + 'a }
460                    })
461                    .collect();
462
463                let output_param = quote! {
464                    __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
465                };
466
467                let output_destructure: Vec<proc_macro2::TokenStream> = loc_outputs
468                    .iter()
469                    .map(|(ident, _)| {
470                        quote! { let mut #ident = &mut __outputs.#ident; }
471                    })
472                    .collect();
473
474                let all_params: Vec<proc_macro2::TokenStream> = input_params
475                    .into_iter()
476                    .chain(std::iter::once(output_param))
477                    .collect();
478
479                // Module containing the outputs struct.
480                let output_mod: syn::Item = syn::parse_quote! {
481                    pub mod #fn_ident {
482                        pub struct #output_struct_ident<#(#struct_generics),*> {
483                            #(#struct_fields),*
484                        }
485                    }
486                };
487                items.push(output_mod);
488
489                let func: syn::Item = syn::parse_quote! {
490                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
491                    pub fn #fn_ident<'a, #(#fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
492                        #(#output_destructure)*
493                        #dfir_tokens
494                    }
495                };
496                items.push(func);
497            } else {
498                let func: syn::Item = syn::parse_quote! {
499                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
500                    pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
501                        #dfir_tokens
502                    }
503                };
504                items.push(func);
505            }
506        }
507
508        syn::parse_quote! {
509            use #orig_crate_name::__staged::__deps::*;
510            use #root::prelude::*;
511            use #root::runtime_support::dfir_rs as __root_dfir_rs;
512            pub use #orig_crate_name::__staged;
513
514            #( #items )*
515        }
516    }
517}