Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Networking
11//!
12//! Process-to-process (o2o) networking is supported. When a location has network
13//! sends or receives, the generated function takes additional `network_out` and
14//! `network_in` parameters whose types are generated structs with one field per
15//! network port (named after the channel). Network channels must be named via
16//! `.name()` on the networking config.
17//!
18//! - Sinks (`EmbeddedNetworkOut`): one `FnMut(Bytes)` field per outgoing channel.
19//! - Sources (`EmbeddedNetworkIn`): one `Stream<Item = Result<BytesMut, io::Error>>`
20//!   field per incoming channel.
21//!
22//! The caller is responsible for wiring these together (e.g. via in-memory channels,
23//! sockets, etc.). Cluster networking and external ports are not supported.
24
25use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46/// Marker type for the embedded deployment backend.
47///
48/// All networking methods panic — this backend only supports pure local computation.
49pub enum EmbeddedDeploy {}
50
51/// A trivial node type for embedded deployment. Stores a user-provided function name.
52#[derive(Clone)]
53pub struct EmbeddedNode {
54    /// The function name to use in the generated code for this location.
55    pub fn_name: String,
56    /// The location key for this node, used to register network ports.
57    pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61    type Port = ();
62    type Meta = ();
63    type InstantiateEnv = EmbeddedInstantiateEnv;
64
65    fn next_port(&self) -> Self::Port {}
66
67    fn update_meta(&self, _meta: &Self::Meta) {}
68
69    fn instantiate(
70        &self,
71        _env: &mut Self::InstantiateEnv,
72        _meta: &mut Self::Meta,
73        _graph: DfirGraph,
74        _extra_stmts: &[syn::Stmt],
75        _sidecars: &[syn::Expr],
76    ) {
77        // No-op: embedded mode doesn't instantiate nodes at deploy time.
78    }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83        panic!("EmbeddedDeploy does not support external ports");
84    }
85
86    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87    fn as_bytes_bidi(
88        &self,
89        _external_port_id: ExternalPortId,
90    ) -> impl Future<
91        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92    > + 'a {
93        async { panic!("EmbeddedDeploy does not support external ports") }
94    }
95
96    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97    fn as_bincode_bidi<InT, OutT>(
98        &self,
99        _external_port_id: ExternalPortId,
100    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101    where
102        InT: Serialize + 'static,
103        OutT: DeserializeOwned + 'static,
104    {
105        async { panic!("EmbeddedDeploy does not support external ports") }
106    }
107
108    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109    fn as_bincode_sink<T>(
110        &self,
111        _external_port_id: ExternalPortId,
112    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113    where
114        T: Serialize + 'static,
115    {
116        async { panic!("EmbeddedDeploy does not support external ports") }
117    }
118
119    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120    fn as_bincode_source<T>(
121        &self,
122        _external_port_id: ExternalPortId,
123    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124    where
125        T: DeserializeOwned + 'static,
126    {
127        async { panic!("EmbeddedDeploy does not support external ports") }
128    }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133        EmbeddedNode {
134            fn_name: self.into(),
135            location_key,
136        }
137    }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142        EmbeddedNode {
143            fn_name: self.into(),
144            location_key,
145        }
146    }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151        EmbeddedNode {
152            fn_name: self.into(),
153            location_key,
154        }
155    }
156}
157
158/// Collected embedded input/output registrations, keyed by location.
159///
160/// During `compile_network`, each `HydroSource::Embedded` and `HydroRoot::EmbeddedOutput`
161/// IR node registers its ident, element type, and location key here.
162/// `generate_embedded` then uses this to add the appropriate parameters
163/// to each generated function.
164#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166    /// (ident name, element type) pairs per location key, for inputs.
167    pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168    /// (ident name, element type) pairs per location key, for singleton inputs.
169    pub singleton_inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170    /// (ident name, element type) pairs per location key, for outputs.
171    pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
172    /// Network output port names per location key (sender side of channels).
173    /// Each entry is (port_name, is_tagged) where is_tagged means the type is (TaglessMemberId, Bytes).
174    pub network_outputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
175    /// Network input port names per location key (receiver side of channels).
176    /// Each entry is (port_name, is_tagged) where is_tagged means the type is Result<(TaglessMemberId, BytesMut), Error>.
177    pub network_inputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
178    /// Cluster membership streams needed per location key.
179    /// Maps location_key -> vec of cluster LocationKeys whose membership is needed.
180    pub membership_streams: SparseSecondaryMap<LocationKey, Vec<LocationKey>>,
181}
182
183impl<'a> Deploy<'a> for EmbeddedDeploy {
184    type Meta = ();
185    type InstantiateEnv = EmbeddedInstantiateEnv;
186
187    type Process = EmbeddedNode;
188    type Cluster = EmbeddedNode;
189    type External = EmbeddedNode;
190
191    fn o2o_sink_source(
192        env: &mut Self::InstantiateEnv,
193        p1: &Self::Process,
194        _p1_port: &(),
195        p2: &Self::Process,
196        _p2_port: &(),
197        name: Option<&str>,
198        _networking_info: &crate::networking::NetworkingInfo,
199    ) -> (syn::Expr, syn::Expr) {
200        let name = name.expect(
201            "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
202        );
203
204        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
205        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
206
207        env.network_outputs
208            .entry(p1.location_key)
209            .unwrap()
210            .or_default()
211            .push((name.to_owned(), false));
212        env.network_inputs
213            .entry(p2.location_key)
214            .unwrap()
215            .or_default()
216            .push((name.to_owned(), false));
217
218        (
219            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
220            syn::parse_quote!(#source_ident),
221        )
222    }
223
224    fn o2o_connect(
225        _p1: &Self::Process,
226        _p1_port: &(),
227        _p2: &Self::Process,
228        _p2_port: &(),
229    ) -> Box<dyn FnOnce()> {
230        Box::new(|| {})
231    }
232
233    fn o2m_sink_source(
234        env: &mut Self::InstantiateEnv,
235        p1: &Self::Process,
236        _p1_port: &(),
237        c2: &Self::Cluster,
238        _c2_port: &(),
239        name: Option<&str>,
240        _networking_info: &crate::networking::NetworkingInfo,
241    ) -> (syn::Expr, syn::Expr) {
242        let name = name.expect("EmbeddedDeploy o2m networking requires a channel name.");
243        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
244        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
245        env.network_outputs
246            .entry(p1.location_key)
247            .unwrap()
248            .or_default()
249            .push((name.to_owned(), true));
250        env.network_inputs
251            .entry(c2.location_key)
252            .unwrap()
253            .or_default()
254            .push((name.to_owned(), false));
255        (
256            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
257            syn::parse_quote!(#source_ident),
258        )
259    }
260
261    fn o2m_connect(
262        _p1: &Self::Process,
263        _p1_port: &(),
264        _c2: &Self::Cluster,
265        _c2_port: &(),
266    ) -> Box<dyn FnOnce()> {
267        Box::new(|| {})
268    }
269
270    fn m2o_sink_source(
271        env: &mut Self::InstantiateEnv,
272        c1: &Self::Cluster,
273        _c1_port: &(),
274        p2: &Self::Process,
275        _p2_port: &(),
276        name: Option<&str>,
277        _networking_info: &crate::networking::NetworkingInfo,
278    ) -> (syn::Expr, syn::Expr) {
279        let name = name.expect("EmbeddedDeploy m2o networking requires a channel name.");
280        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
281        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
282        env.network_outputs
283            .entry(c1.location_key)
284            .unwrap()
285            .or_default()
286            .push((name.to_owned(), false));
287        env.network_inputs
288            .entry(p2.location_key)
289            .unwrap()
290            .or_default()
291            .push((name.to_owned(), true));
292        (
293            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
294            syn::parse_quote!(#source_ident),
295        )
296    }
297
298    fn m2o_connect(
299        _c1: &Self::Cluster,
300        _c1_port: &(),
301        _p2: &Self::Process,
302        _p2_port: &(),
303    ) -> Box<dyn FnOnce()> {
304        Box::new(|| {})
305    }
306
307    fn m2m_sink_source(
308        env: &mut Self::InstantiateEnv,
309        c1: &Self::Cluster,
310        _c1_port: &(),
311        c2: &Self::Cluster,
312        _c2_port: &(),
313        name: Option<&str>,
314        _networking_info: &crate::networking::NetworkingInfo,
315    ) -> (syn::Expr, syn::Expr) {
316        let name = name.expect("EmbeddedDeploy m2m networking requires a channel name.");
317        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
318        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
319        env.network_outputs
320            .entry(c1.location_key)
321            .unwrap()
322            .or_default()
323            .push((name.to_owned(), true));
324        env.network_inputs
325            .entry(c2.location_key)
326            .unwrap()
327            .or_default()
328            .push((name.to_owned(), true));
329        (
330            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
331            syn::parse_quote!(#source_ident),
332        )
333    }
334
335    fn m2m_connect(
336        _c1: &Self::Cluster,
337        _c1_port: &(),
338        _c2: &Self::Cluster,
339        _c2_port: &(),
340    ) -> Box<dyn FnOnce()> {
341        Box::new(|| {})
342    }
343
344    fn e2o_many_source(
345        _extra_stmts: &mut Vec<syn::Stmt>,
346        _p2: &Self::Process,
347        _p2_port: &(),
348        _codec_type: &syn::Type,
349        _shared_handle: String,
350    ) -> syn::Expr {
351        panic!("EmbeddedDeploy does not support networking (e2o)")
352    }
353
354    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
355        panic!("EmbeddedDeploy does not support networking (e2o)")
356    }
357
358    fn e2o_source(
359        _extra_stmts: &mut Vec<syn::Stmt>,
360        _p1: &Self::External,
361        _p1_port: &(),
362        _p2: &Self::Process,
363        _p2_port: &(),
364        _codec_type: &syn::Type,
365        _shared_handle: String,
366    ) -> syn::Expr {
367        panic!("EmbeddedDeploy does not support networking (e2o)")
368    }
369
370    fn e2o_connect(
371        _p1: &Self::External,
372        _p1_port: &(),
373        _p2: &Self::Process,
374        _p2_port: &(),
375        _many: bool,
376        _server_hint: NetworkHint,
377    ) -> Box<dyn FnOnce()> {
378        panic!("EmbeddedDeploy does not support networking (e2o)")
379    }
380
381    fn o2e_sink(
382        _p1: &Self::Process,
383        _p1_port: &(),
384        _p2: &Self::External,
385        _p2_port: &(),
386        _shared_handle: String,
387    ) -> syn::Expr {
388        panic!("EmbeddedDeploy does not support networking (o2e)")
389    }
390
391    #[expect(
392        unreachable_code,
393        reason = "panic before q! which is only for return type"
394    )]
395    fn cluster_ids(
396        _of_cluster: LocationKey,
397    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
398        panic!("EmbeddedDeploy does not support cluster IDs");
399        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
400    }
401
402    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
403        super::embedded_runtime::embedded_cluster_self_id()
404    }
405
406    fn cluster_membership_stream(
407        env: &mut Self::InstantiateEnv,
408        at_location: &LocationId,
409        location_id: &LocationId,
410    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
411    {
412        let at_key = match at_location {
413            LocationId::Process(key) | LocationId::Cluster(key) => *key,
414            _ => panic!("cluster_membership_stream must be called from a process or cluster"),
415        };
416        let cluster_key = match location_id {
417            LocationId::Cluster(key) => *key,
418            _ => panic!("cluster_membership_stream target must be a cluster"),
419        };
420        let vec = env.membership_streams.entry(at_key).unwrap().or_default();
421        let idx = if let Some(pos) = vec.iter().position(|k| *k == cluster_key) {
422            pos
423        } else {
424            vec.push(cluster_key);
425            vec.len() - 1
426        };
427
428        super::embedded_runtime::embedded_cluster_membership_stream(idx)
429    }
430
431    fn register_embedded_stream_input(
432        env: &mut Self::InstantiateEnv,
433        location_key: LocationKey,
434        ident: &syn::Ident,
435        element_type: &syn::Type,
436    ) {
437        env.inputs
438            .entry(location_key)
439            .unwrap()
440            .or_default()
441            .push((ident.clone(), element_type.clone()));
442    }
443
444    fn register_embedded_singleton_input(
445        env: &mut Self::InstantiateEnv,
446        location_key: LocationKey,
447        ident: &syn::Ident,
448        element_type: &syn::Type,
449    ) {
450        env.singleton_inputs
451            .entry(location_key)
452            .unwrap()
453            .or_default()
454            .push((ident.clone(), element_type.clone()));
455    }
456
457    fn register_embedded_output(
458        env: &mut Self::InstantiateEnv,
459        location_key: LocationKey,
460        ident: &syn::Ident,
461        element_type: &syn::Type,
462    ) {
463        env.outputs
464            .entry(location_key)
465            .unwrap()
466            .or_default()
467            .push((ident.clone(), element_type.clone()));
468    }
469}
470
471impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
472    /// Generates a `syn::File` containing one function per location in the flow.
473    ///
474    /// Each generated function has the signature:
475    /// ```ignore
476    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
477    /// ```
478    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
479    ///
480    /// The returned `Dfir` can be manually executed by the caller.
481    ///
482    /// # Arguments
483    ///
484    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
485    ///   re-exports). Hyphens will be replaced with underscores.
486    ///
487    /// # Usage
488    ///
489    /// Typically called from a `build.rs` in a wrapper crate:
490    /// ```ignore
491    /// // build.rs
492    /// let deploy = flow.with_process(&process, "my_fn".to_string());
493    /// let code = deploy.generate_embedded("my_hydro_crate");
494    /// let out_dir = std::env::var("OUT_DIR").unwrap();
495    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
496    /// ```
497    ///
498    /// Then in `lib.rs`:
499    /// ```ignore
500    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
501    /// ```
502    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
503        let mut env = EmbeddedInstantiateEnv::default();
504        let compiled = self.compile_internal(&mut env);
505
506        let root = crate::staging_util::get_this_crate();
507        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
508
509        let mut items: Vec<syn::Item> = Vec::new();
510
511        // Sort location keys for deterministic output.
512        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
513        location_keys.sort();
514
515        // Build a map from location key to fn_name for lookups.
516        let fn_names: SparseSecondaryMap<LocationKey, &str> = location_keys
517            .iter()
518            .map(|&k| {
519                let name = self
520                    .processes
521                    .get(k)
522                    .map(|n| n.fn_name.as_str())
523                    .or_else(|| self.clusters.get(k).map(|n| n.fn_name.as_str()))
524                    .or_else(|| self.externals.get(k).map(|n| n.fn_name.as_str()))
525                    .expect("location key not found in any node map");
526                (k, name)
527            })
528            .collect();
529
530        for location_key in location_keys {
531            let graph = &compiled.all_dfir()[location_key];
532
533            // Get the user-provided function name from the node.
534            let fn_name = fn_names[location_key];
535            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
536
537            // Get inputs for this location, sorted by name.
538            let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
539            loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
540
541            // Get outputs for this location, sorted by name.
542            let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
543            loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
544
545            let mut diagnostics = Diagnostics::new();
546            let dfir_tokens = graph
547                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
548                .expect("DFIR code generation failed with diagnostics.");
549
550            // --- Build module items (cluster info, output struct, network structs) ---
551            let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
552            let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
553            let mut cluster_params: Vec<proc_macro2::TokenStream> = Vec::new();
554            let mut output_params: Vec<proc_macro2::TokenStream> = Vec::new();
555            let mut net_out_params: Vec<proc_macro2::TokenStream> = Vec::new();
556            let mut net_in_params: Vec<proc_macro2::TokenStream> = Vec::new();
557            let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
558
559            // For cluster locations, add self_id parameter.
560            if self.clusters.contains_key(location_key) {
561                cluster_params.push(quote! {
562                    __cluster_self_id: &'a #root::location::member_id::TaglessMemberId
563                });
564                // Alias to the name the generated DFIR code expects.
565                let self_id_ident = syn::Ident::new(
566                    &format!("__hydro_lang_cluster_self_id_{}", location_key),
567                    Span::call_site(),
568                );
569                extra_destructure.push(quote! {
570                    let #self_id_ident = __cluster_self_id;
571                });
572            }
573
574            // For any location that needs cluster membership streams, add parameters.
575            if let Some(loc_memberships) = env.membership_streams.get(location_key) {
576                let membership_struct_ident =
577                    syn::Ident::new("EmbeddedMembershipStreams", Span::call_site());
578
579                let mem_generic_idents: Vec<syn::Ident> = loc_memberships
580                    .iter()
581                    .enumerate()
582                    .map(|(i, _)| quote::format_ident!("__Mem{}", i))
583                    .collect();
584
585                let mem_field_names: Vec<syn::Ident> = loc_memberships
586                    .iter()
587                    .map(|k| {
588                        let cluster_fn_name = fn_names[*k];
589                        syn::Ident::new(cluster_fn_name, Span::call_site())
590                    })
591                    .collect();
592
593                let struct_fields: Vec<proc_macro2::TokenStream> = mem_field_names
594                    .iter()
595                    .zip(mem_generic_idents.iter())
596                    .map(|(field, generic)| {
597                        quote! { pub #field: #generic }
598                    })
599                    .collect();
600
601                let struct_generics: Vec<proc_macro2::TokenStream> = mem_generic_idents
602                    .iter()
603                    .map(|generic| {
604                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin }
605                    })
606                    .collect();
607
608                for generic in &mem_generic_idents {
609                    extra_fn_generics.push(
610                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin + 'a },
611                    );
612                }
613
614                cluster_params.push(quote! {
615                    __membership: #fn_ident::#membership_struct_ident<#(#mem_generic_idents),*>
616                });
617
618                for (i, field) in mem_field_names.iter().enumerate() {
619                    let var_ident =
620                        syn::Ident::new(&format!("__membership_{}", i), Span::call_site());
621                    extra_destructure.push(quote! {
622                        let #var_ident = __membership.#field;
623                    });
624                }
625
626                mod_items.push(quote! {
627                    pub struct #membership_struct_ident<#(#struct_generics),*> {
628                        #(#struct_fields),*
629                    }
630                });
631            }
632
633            // Embedded inputs (Stream sources).
634            let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
635                .iter()
636                .map(|(ident, element_type)| {
637                    quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
638                })
639                .collect();
640
641            // Embedded singleton inputs (plain value parameters).
642            let mut loc_singleton_inputs = env
643                .singleton_inputs
644                .get(location_key)
645                .cloned()
646                .unwrap_or_default();
647            loc_singleton_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
648
649            let singleton_input_params: Vec<proc_macro2::TokenStream> = loc_singleton_inputs
650                .iter()
651                .map(|(ident, element_type)| {
652                    quote! { #ident: #element_type }
653                })
654                .collect();
655
656            // Embedded outputs (FnMut callbacks).
657            if !loc_outputs.is_empty() {
658                let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
659
660                let output_generic_idents: Vec<syn::Ident> = loc_outputs
661                    .iter()
662                    .enumerate()
663                    .map(|(i, _)| quote::format_ident!("__Out{}", i))
664                    .collect();
665
666                let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
667                    .iter()
668                    .zip(output_generic_idents.iter())
669                    .map(|((ident, _), generic)| {
670                        quote! { pub #ident: #generic }
671                    })
672                    .collect();
673
674                let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
675                    .iter()
676                    .zip(output_generic_idents.iter())
677                    .map(|((_, element_type), generic)| {
678                        quote! { #generic: FnMut(#element_type) }
679                    })
680                    .collect();
681
682                for ((_, element_type), generic) in
683                    loc_outputs.iter().zip(output_generic_idents.iter())
684                {
685                    extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
686                }
687
688                output_params.push(quote! {
689                    __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
690                });
691
692                for (ident, _) in &loc_outputs {
693                    extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
694                }
695
696                mod_items.push(quote! {
697                    pub struct #output_struct_ident<#(#struct_generics),*> {
698                        #(#struct_fields),*
699                    }
700                });
701            }
702
703            // Network outputs (FnMut sinks).
704            if let Some(mut loc_net_outputs) = env.network_outputs.remove(location_key) {
705                loc_net_outputs.sort();
706
707                let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
708
709                let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
710                    .iter()
711                    .enumerate()
712                    .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
713                    .collect();
714
715                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
716                    .iter()
717                    .zip(net_out_generic_idents.iter())
718                    .map(|((name, _), generic)| {
719                        let field_ident = syn::Ident::new(name, Span::call_site());
720                        quote! { pub #field_ident: #generic }
721                    })
722                    .collect();
723
724                let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_outputs
725                    .iter()
726                    .zip(net_out_generic_idents.iter())
727                    .map(|((_, is_tagged), generic)| {
728                        if *is_tagged {
729                            quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) }
730                        } else {
731                            quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
732                        }
733                    })
734                    .collect();
735
736                for ((_, is_tagged), generic) in
737                    loc_net_outputs.iter().zip(net_out_generic_idents.iter())
738                {
739                    if *is_tagged {
740                        extra_fn_generics.push(
741                            quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) + 'a },
742                        );
743                    } else {
744                        extra_fn_generics.push(
745                            quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
746                        );
747                    }
748                }
749
750                net_out_params.push(quote! {
751                    __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
752                });
753
754                for (name, _) in &loc_net_outputs {
755                    let field_ident = syn::Ident::new(name, Span::call_site());
756                    let var_ident =
757                        syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
758                    extra_destructure
759                        .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
760                }
761
762                mod_items.push(quote! {
763                    pub struct #net_out_struct_ident<#(#struct_generics),*> {
764                        #(#struct_fields),*
765                    }
766                });
767            }
768
769            // Network inputs (Stream sources).
770            if let Some(mut loc_net_inputs) = env.network_inputs.remove(location_key) {
771                loc_net_inputs.sort();
772
773                let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
774
775                let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
776                    .iter()
777                    .enumerate()
778                    .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
779                    .collect();
780
781                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
782                    .iter()
783                    .zip(net_in_generic_idents.iter())
784                    .map(|((name, _), generic)| {
785                        let field_ident = syn::Ident::new(name, Span::call_site());
786                        quote! { pub #field_ident: #generic }
787                    })
788                    .collect();
789
790                let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_inputs
791                    .iter()
792                    .zip(net_in_generic_idents.iter())
793                    .map(|((_, is_tagged), generic)| {
794                        if *is_tagged {
795                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin }
796                        } else {
797                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
798                        }
799                    })
800                    .collect();
801
802                for ((_, is_tagged), generic) in
803                    loc_net_inputs.iter().zip(net_in_generic_idents.iter())
804                {
805                    if *is_tagged {
806                        extra_fn_generics.push(
807                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin + 'a },
808                        );
809                    } else {
810                        extra_fn_generics.push(
811                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
812                        );
813                    }
814                }
815
816                net_in_params.push(quote! {
817                    __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
818                });
819
820                for (name, _) in &loc_net_inputs {
821                    let field_ident = syn::Ident::new(name, Span::call_site());
822                    let var_ident =
823                        syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
824                    extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
825                }
826
827                mod_items.push(quote! {
828                    pub struct #net_in_struct_ident<#(#struct_generics),*> {
829                        #(#struct_fields),*
830                    }
831                });
832            }
833
834            // Emit the module if there are any structs.
835            if !mod_items.is_empty() {
836                let output_mod: syn::Item = syn::parse_quote! {
837                    pub mod #fn_ident {
838                        use super::*;
839                        #(#mod_items)*
840                    }
841                };
842                items.push(output_mod);
843            }
844
845            // Build the function.
846            let all_params: Vec<proc_macro2::TokenStream> = cluster_params
847                .into_iter()
848                .chain(singleton_input_params)
849                .chain(input_params)
850                .chain(output_params)
851                .chain(net_in_params)
852                .chain(net_out_params)
853                .collect();
854
855            let func = if !extra_fn_generics.is_empty() {
856                syn::parse_quote! {
857                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
858                    pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
859                        #(#extra_destructure)*
860                        #dfir_tokens
861                    }
862                }
863            } else {
864                syn::parse_quote! {
865                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
866                    pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
867                        #dfir_tokens
868                    }
869                }
870            };
871
872            items.push(func);
873        }
874
875        syn::parse_quote! {
876            use #orig_crate_name::__staged::__deps::*;
877            use #root::prelude::*;
878            use #root::runtime_support::dfir_rs as __root_dfir_rs;
879            pub use #orig_crate_name::__staged;
880
881            #( #items )*
882        }
883    }
884}