Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Networking
11//!
12//! Process-to-process (o2o) networking is supported. When a location has network
13//! sends or receives, the generated function takes additional `network_out` and
14//! `network_in` parameters whose types are generated structs with one field per
15//! network port (named after the channel). Network channels must be named via
16//! `.name()` on the networking config.
17//!
18//! - Sinks (`EmbeddedNetworkOut`): one `FnMut(Bytes)` field per outgoing channel.
19//! - Sources (`EmbeddedNetworkIn`): one `Stream<Item = Result<BytesMut, io::Error>>`
20//!   field per incoming channel.
21//!
22//! The caller is responsible for wiring these together (e.g. via in-memory channels,
23//! sockets, etc.). Cluster networking and external ports are not supported.
24
25use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46/// Marker type for the embedded deployment backend.
47///
48/// All networking methods panic — this backend only supports pure local computation.
49pub enum EmbeddedDeploy {}
50
51/// A trivial node type for embedded deployment. Stores a user-provided function name.
52#[derive(Clone)]
53pub struct EmbeddedNode {
54    /// The function name to use in the generated code for this location.
55    pub fn_name: String,
56    /// The location key for this node, used to register network ports.
57    pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61    type Port = ();
62    type Meta = ();
63    type InstantiateEnv = EmbeddedInstantiateEnv;
64
65    fn next_port(&self) -> Self::Port {}
66
67    fn update_meta(&self, _meta: &Self::Meta) {}
68
69    fn instantiate(
70        &self,
71        _env: &mut Self::InstantiateEnv,
72        _meta: &mut Self::Meta,
73        _graph: DfirGraph,
74        _extra_stmts: &[syn::Stmt],
75        _sidecars: &[syn::Expr],
76    ) {
77        // No-op: embedded mode doesn't instantiate nodes at deploy time.
78    }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83        panic!("EmbeddedDeploy does not support external ports");
84    }
85
86    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87    fn as_bytes_bidi(
88        &self,
89        _external_port_id: ExternalPortId,
90    ) -> impl Future<
91        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92    > + 'a {
93        async { panic!("EmbeddedDeploy does not support external ports") }
94    }
95
96    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97    fn as_bincode_bidi<InT, OutT>(
98        &self,
99        _external_port_id: ExternalPortId,
100    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101    where
102        InT: Serialize + 'static,
103        OutT: DeserializeOwned + 'static,
104    {
105        async { panic!("EmbeddedDeploy does not support external ports") }
106    }
107
108    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109    fn as_bincode_sink<T>(
110        &self,
111        _external_port_id: ExternalPortId,
112    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113    where
114        T: Serialize + 'static,
115    {
116        async { panic!("EmbeddedDeploy does not support external ports") }
117    }
118
119    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120    fn as_bincode_source<T>(
121        &self,
122        _external_port_id: ExternalPortId,
123    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124    where
125        T: DeserializeOwned + 'static,
126    {
127        async { panic!("EmbeddedDeploy does not support external ports") }
128    }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133        EmbeddedNode {
134            fn_name: self.into(),
135            location_key,
136        }
137    }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142        EmbeddedNode {
143            fn_name: self.into(),
144            location_key,
145        }
146    }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151        EmbeddedNode {
152            fn_name: self.into(),
153            location_key,
154        }
155    }
156}
157
158/// Collected embedded input/output registrations, keyed by location.
159///
160/// During `compile_network`, each `HydroSource::Embedded` and `HydroRoot::EmbeddedOutput`
161/// IR node registers its ident, element type, and location key here.
162/// `generate_embedded` then uses this to add the appropriate parameters
163/// to each generated function.
164#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166    /// (ident name, element type) pairs per location key, for inputs.
167    pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168    /// (ident name, element type) pairs per location key, for outputs.
169    pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170    /// Network output port names per location key (sender side of channels).
171    /// Each entry is (port_name, is_tagged) where is_tagged means the type is (TaglessMemberId, Bytes).
172    pub network_outputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
173    /// Network input port names per location key (receiver side of channels).
174    /// Each entry is (port_name, is_tagged) where is_tagged means the type is Result<(TaglessMemberId, BytesMut), Error>.
175    pub network_inputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
176    /// Cluster membership streams needed per location key.
177    /// Maps location_key -> vec of cluster LocationKeys whose membership is needed.
178    pub membership_streams: SparseSecondaryMap<LocationKey, Vec<LocationKey>>,
179}
180
181impl<'a> Deploy<'a> for EmbeddedDeploy {
182    type Meta = ();
183    type InstantiateEnv = EmbeddedInstantiateEnv;
184
185    type Process = EmbeddedNode;
186    type Cluster = EmbeddedNode;
187    type External = EmbeddedNode;
188
189    fn o2o_sink_source(
190        env: &mut Self::InstantiateEnv,
191        p1: &Self::Process,
192        _p1_port: &(),
193        p2: &Self::Process,
194        _p2_port: &(),
195        name: Option<&str>,
196        _networking_info: &crate::networking::NetworkingInfo,
197    ) -> (syn::Expr, syn::Expr) {
198        let name = name.expect(
199            "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
200        );
201
202        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
203        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
204
205        env.network_outputs
206            .entry(p1.location_key)
207            .unwrap()
208            .or_default()
209            .push((name.to_owned(), false));
210        env.network_inputs
211            .entry(p2.location_key)
212            .unwrap()
213            .or_default()
214            .push((name.to_owned(), false));
215
216        (
217            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
218            syn::parse_quote!(#source_ident),
219        )
220    }
221
222    fn o2o_connect(
223        _p1: &Self::Process,
224        _p1_port: &(),
225        _p2: &Self::Process,
226        _p2_port: &(),
227    ) -> Box<dyn FnOnce()> {
228        Box::new(|| {})
229    }
230
231    fn o2m_sink_source(
232        env: &mut Self::InstantiateEnv,
233        p1: &Self::Process,
234        _p1_port: &(),
235        c2: &Self::Cluster,
236        _c2_port: &(),
237        name: Option<&str>,
238        _networking_info: &crate::networking::NetworkingInfo,
239    ) -> (syn::Expr, syn::Expr) {
240        let name = name.expect("EmbeddedDeploy o2m networking requires a channel name.");
241        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
242        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
243        env.network_outputs
244            .entry(p1.location_key)
245            .unwrap()
246            .or_default()
247            .push((name.to_owned(), true));
248        env.network_inputs
249            .entry(c2.location_key)
250            .unwrap()
251            .or_default()
252            .push((name.to_owned(), false));
253        (
254            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
255            syn::parse_quote!(#source_ident),
256        )
257    }
258
259    fn o2m_connect(
260        _p1: &Self::Process,
261        _p1_port: &(),
262        _c2: &Self::Cluster,
263        _c2_port: &(),
264    ) -> Box<dyn FnOnce()> {
265        Box::new(|| {})
266    }
267
268    fn m2o_sink_source(
269        env: &mut Self::InstantiateEnv,
270        c1: &Self::Cluster,
271        _c1_port: &(),
272        p2: &Self::Process,
273        _p2_port: &(),
274        name: Option<&str>,
275        _networking_info: &crate::networking::NetworkingInfo,
276    ) -> (syn::Expr, syn::Expr) {
277        let name = name.expect("EmbeddedDeploy m2o networking requires a channel name.");
278        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
279        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
280        env.network_outputs
281            .entry(c1.location_key)
282            .unwrap()
283            .or_default()
284            .push((name.to_owned(), false));
285        env.network_inputs
286            .entry(p2.location_key)
287            .unwrap()
288            .or_default()
289            .push((name.to_owned(), true));
290        (
291            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
292            syn::parse_quote!(#source_ident),
293        )
294    }
295
296    fn m2o_connect(
297        _c1: &Self::Cluster,
298        _c1_port: &(),
299        _p2: &Self::Process,
300        _p2_port: &(),
301    ) -> Box<dyn FnOnce()> {
302        Box::new(|| {})
303    }
304
305    fn m2m_sink_source(
306        env: &mut Self::InstantiateEnv,
307        c1: &Self::Cluster,
308        _c1_port: &(),
309        c2: &Self::Cluster,
310        _c2_port: &(),
311        name: Option<&str>,
312        _networking_info: &crate::networking::NetworkingInfo,
313    ) -> (syn::Expr, syn::Expr) {
314        let name = name.expect("EmbeddedDeploy m2m networking requires a channel name.");
315        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
316        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
317        env.network_outputs
318            .entry(c1.location_key)
319            .unwrap()
320            .or_default()
321            .push((name.to_owned(), true));
322        env.network_inputs
323            .entry(c2.location_key)
324            .unwrap()
325            .or_default()
326            .push((name.to_owned(), true));
327        (
328            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
329            syn::parse_quote!(#source_ident),
330        )
331    }
332
333    fn m2m_connect(
334        _c1: &Self::Cluster,
335        _c1_port: &(),
336        _c2: &Self::Cluster,
337        _c2_port: &(),
338    ) -> Box<dyn FnOnce()> {
339        Box::new(|| {})
340    }
341
342    fn e2o_many_source(
343        _extra_stmts: &mut Vec<syn::Stmt>,
344        _p2: &Self::Process,
345        _p2_port: &(),
346        _codec_type: &syn::Type,
347        _shared_handle: String,
348    ) -> syn::Expr {
349        panic!("EmbeddedDeploy does not support networking (e2o)")
350    }
351
352    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
353        panic!("EmbeddedDeploy does not support networking (e2o)")
354    }
355
356    fn e2o_source(
357        _extra_stmts: &mut Vec<syn::Stmt>,
358        _p1: &Self::External,
359        _p1_port: &(),
360        _p2: &Self::Process,
361        _p2_port: &(),
362        _codec_type: &syn::Type,
363        _shared_handle: String,
364    ) -> syn::Expr {
365        panic!("EmbeddedDeploy does not support networking (e2o)")
366    }
367
368    fn e2o_connect(
369        _p1: &Self::External,
370        _p1_port: &(),
371        _p2: &Self::Process,
372        _p2_port: &(),
373        _many: bool,
374        _server_hint: NetworkHint,
375    ) -> Box<dyn FnOnce()> {
376        panic!("EmbeddedDeploy does not support networking (e2o)")
377    }
378
379    fn o2e_sink(
380        _p1: &Self::Process,
381        _p1_port: &(),
382        _p2: &Self::External,
383        _p2_port: &(),
384        _shared_handle: String,
385    ) -> syn::Expr {
386        panic!("EmbeddedDeploy does not support networking (o2e)")
387    }
388
389    #[expect(
390        unreachable_code,
391        reason = "panic before q! which is only for return type"
392    )]
393    fn cluster_ids(
394        _of_cluster: LocationKey,
395    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
396        panic!("EmbeddedDeploy does not support cluster IDs");
397        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
398    }
399
400    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
401        super::embedded_runtime::embedded_cluster_self_id()
402    }
403
404    fn cluster_membership_stream(
405        env: &mut Self::InstantiateEnv,
406        at_location: &LocationId,
407        location_id: &LocationId,
408    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
409    {
410        let at_key = match at_location {
411            LocationId::Process(key) | LocationId::Cluster(key) => *key,
412            _ => panic!("cluster_membership_stream must be called from a process or cluster"),
413        };
414        let cluster_key = match location_id {
415            LocationId::Cluster(key) => *key,
416            _ => panic!("cluster_membership_stream target must be a cluster"),
417        };
418        let vec = env.membership_streams.entry(at_key).unwrap().or_default();
419        let idx = if let Some(pos) = vec.iter().position(|k| *k == cluster_key) {
420            pos
421        } else {
422            vec.push(cluster_key);
423            vec.len() - 1
424        };
425
426        super::embedded_runtime::embedded_cluster_membership_stream(idx)
427    }
428
429    fn register_embedded_input(
430        env: &mut Self::InstantiateEnv,
431        location_key: LocationKey,
432        ident: &syn::Ident,
433        element_type: &syn::Type,
434    ) {
435        env.inputs
436            .entry(location_key)
437            .unwrap()
438            .or_default()
439            .push((ident.clone(), element_type.clone()));
440    }
441
442    fn register_embedded_output(
443        env: &mut Self::InstantiateEnv,
444        location_key: LocationKey,
445        ident: &syn::Ident,
446        element_type: &syn::Type,
447    ) {
448        env.outputs
449            .entry(location_key)
450            .unwrap()
451            .or_default()
452            .push((ident.clone(), element_type.clone()));
453    }
454}
455
456impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
457    /// Generates a `syn::File` containing one function per location in the flow.
458    ///
459    /// Each generated function has the signature:
460    /// ```ignore
461    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
462    /// ```
463    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
464    ///
465    /// The returned `Dfir` can be manually executed by the caller.
466    ///
467    /// # Arguments
468    ///
469    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
470    ///   re-exports). Hyphens will be replaced with underscores.
471    ///
472    /// # Usage
473    ///
474    /// Typically called from a `build.rs` in a wrapper crate:
475    /// ```ignore
476    /// // build.rs
477    /// let deploy = flow.with_process(&process, "my_fn".to_string());
478    /// let code = deploy.generate_embedded("my_hydro_crate");
479    /// let out_dir = std::env::var("OUT_DIR").unwrap();
480    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
481    /// ```
482    ///
483    /// Then in `lib.rs`:
484    /// ```ignore
485    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
486    /// ```
487    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
488        let mut env = EmbeddedInstantiateEnv::default();
489        let compiled = self.compile_internal(&mut env);
490
491        let root = crate::staging_util::get_this_crate();
492        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
493
494        let mut items: Vec<syn::Item> = Vec::new();
495
496        // Sort location keys for deterministic output.
497        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
498        location_keys.sort();
499
500        // Build a map from location key to fn_name for lookups.
501        let fn_names: SparseSecondaryMap<LocationKey, &str> = location_keys
502            .iter()
503            .map(|&k| {
504                let name = self
505                    .processes
506                    .get(k)
507                    .map(|n| n.fn_name.as_str())
508                    .or_else(|| self.clusters.get(k).map(|n| n.fn_name.as_str()))
509                    .or_else(|| self.externals.get(k).map(|n| n.fn_name.as_str()))
510                    .expect("location key not found in any node map");
511                (k, name)
512            })
513            .collect();
514
515        for location_key in location_keys {
516            let graph = &compiled.all_dfir()[location_key];
517
518            // Get the user-provided function name from the node.
519            let fn_name = fn_names[location_key];
520            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
521
522            // Get inputs for this location, sorted by name.
523            let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
524            loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
525
526            // Get outputs for this location, sorted by name.
527            let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
528            loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
529
530            let mut diagnostics = Diagnostics::new();
531            let dfir_tokens = graph
532                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
533                .expect("DFIR code generation failed with diagnostics.");
534
535            // --- Build module items (cluster info, output struct, network structs) ---
536            let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
537            let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
538            let mut cluster_params: Vec<proc_macro2::TokenStream> = Vec::new();
539            let mut output_params: Vec<proc_macro2::TokenStream> = Vec::new();
540            let mut net_out_params: Vec<proc_macro2::TokenStream> = Vec::new();
541            let mut net_in_params: Vec<proc_macro2::TokenStream> = Vec::new();
542            let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
543
544            // For cluster locations, add self_id parameter.
545            if self.clusters.contains_key(location_key) {
546                cluster_params.push(quote! {
547                    __cluster_self_id: &'a #root::location::member_id::TaglessMemberId
548                });
549                // Alias to the name the generated DFIR code expects.
550                let self_id_ident = syn::Ident::new(
551                    &format!("__hydro_lang_cluster_self_id_{}", location_key),
552                    Span::call_site(),
553                );
554                extra_destructure.push(quote! {
555                    let #self_id_ident = __cluster_self_id;
556                });
557            }
558
559            // For any location that needs cluster membership streams, add parameters.
560            if let Some(loc_memberships) = env.membership_streams.get(location_key) {
561                let membership_struct_ident =
562                    syn::Ident::new("EmbeddedMembershipStreams", Span::call_site());
563
564                let mem_generic_idents: Vec<syn::Ident> = loc_memberships
565                    .iter()
566                    .enumerate()
567                    .map(|(i, _)| quote::format_ident!("__Mem{}", i))
568                    .collect();
569
570                let mem_field_names: Vec<syn::Ident> = loc_memberships
571                    .iter()
572                    .map(|k| {
573                        let cluster_fn_name = fn_names[*k];
574                        syn::Ident::new(cluster_fn_name, Span::call_site())
575                    })
576                    .collect();
577
578                let struct_fields: Vec<proc_macro2::TokenStream> = mem_field_names
579                    .iter()
580                    .zip(mem_generic_idents.iter())
581                    .map(|(field, generic)| {
582                        quote! { pub #field: #generic }
583                    })
584                    .collect();
585
586                let struct_generics: Vec<proc_macro2::TokenStream> = mem_generic_idents
587                    .iter()
588                    .map(|generic| {
589                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin }
590                    })
591                    .collect();
592
593                for generic in &mem_generic_idents {
594                    extra_fn_generics.push(
595                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin + 'a },
596                    );
597                }
598
599                cluster_params.push(quote! {
600                    __membership: #fn_ident::#membership_struct_ident<#(#mem_generic_idents),*>
601                });
602
603                for (i, field) in mem_field_names.iter().enumerate() {
604                    let var_ident =
605                        syn::Ident::new(&format!("__membership_{}", i), Span::call_site());
606                    extra_destructure.push(quote! {
607                        let #var_ident = __membership.#field;
608                    });
609                }
610
611                mod_items.push(quote! {
612                    pub struct #membership_struct_ident<#(#struct_generics),*> {
613                        #(#struct_fields),*
614                    }
615                });
616            }
617
618            // Embedded inputs (Stream sources).
619            let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
620                .iter()
621                .map(|(ident, element_type)| {
622                    quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
623                })
624                .collect();
625
626            // Embedded outputs (FnMut callbacks).
627            if !loc_outputs.is_empty() {
628                let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
629
630                let output_generic_idents: Vec<syn::Ident> = loc_outputs
631                    .iter()
632                    .enumerate()
633                    .map(|(i, _)| quote::format_ident!("__Out{}", i))
634                    .collect();
635
636                let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
637                    .iter()
638                    .zip(output_generic_idents.iter())
639                    .map(|((ident, _), generic)| {
640                        quote! { pub #ident: #generic }
641                    })
642                    .collect();
643
644                let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
645                    .iter()
646                    .zip(output_generic_idents.iter())
647                    .map(|((_, element_type), generic)| {
648                        quote! { #generic: FnMut(#element_type) }
649                    })
650                    .collect();
651
652                for ((_, element_type), generic) in
653                    loc_outputs.iter().zip(output_generic_idents.iter())
654                {
655                    extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
656                }
657
658                output_params.push(quote! {
659                    __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
660                });
661
662                for (ident, _) in &loc_outputs {
663                    extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
664                }
665
666                mod_items.push(quote! {
667                    pub struct #output_struct_ident<#(#struct_generics),*> {
668                        #(#struct_fields),*
669                    }
670                });
671            }
672
673            // Network outputs (FnMut sinks).
674            if let Some(mut loc_net_outputs) = env.network_outputs.remove(location_key) {
675                loc_net_outputs.sort();
676
677                let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
678
679                let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
680                    .iter()
681                    .enumerate()
682                    .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
683                    .collect();
684
685                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
686                    .iter()
687                    .zip(net_out_generic_idents.iter())
688                    .map(|((name, _), generic)| {
689                        let field_ident = syn::Ident::new(name, Span::call_site());
690                        quote! { pub #field_ident: #generic }
691                    })
692                    .collect();
693
694                let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_outputs
695                    .iter()
696                    .zip(net_out_generic_idents.iter())
697                    .map(|((_, is_tagged), generic)| {
698                        if *is_tagged {
699                            quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) }
700                        } else {
701                            quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
702                        }
703                    })
704                    .collect();
705
706                for ((_, is_tagged), generic) in
707                    loc_net_outputs.iter().zip(net_out_generic_idents.iter())
708                {
709                    if *is_tagged {
710                        extra_fn_generics.push(
711                            quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) + 'a },
712                        );
713                    } else {
714                        extra_fn_generics.push(
715                            quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
716                        );
717                    }
718                }
719
720                net_out_params.push(quote! {
721                    __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
722                });
723
724                for (name, _) in &loc_net_outputs {
725                    let field_ident = syn::Ident::new(name, Span::call_site());
726                    let var_ident =
727                        syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
728                    extra_destructure
729                        .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
730                }
731
732                mod_items.push(quote! {
733                    pub struct #net_out_struct_ident<#(#struct_generics),*> {
734                        #(#struct_fields),*
735                    }
736                });
737            }
738
739            // Network inputs (Stream sources).
740            if let Some(mut loc_net_inputs) = env.network_inputs.remove(location_key) {
741                loc_net_inputs.sort();
742
743                let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
744
745                let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
746                    .iter()
747                    .enumerate()
748                    .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
749                    .collect();
750
751                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
752                    .iter()
753                    .zip(net_in_generic_idents.iter())
754                    .map(|((name, _), generic)| {
755                        let field_ident = syn::Ident::new(name, Span::call_site());
756                        quote! { pub #field_ident: #generic }
757                    })
758                    .collect();
759
760                let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_inputs
761                    .iter()
762                    .zip(net_in_generic_idents.iter())
763                    .map(|((_, is_tagged), generic)| {
764                        if *is_tagged {
765                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin }
766                        } else {
767                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
768                        }
769                    })
770                    .collect();
771
772                for ((_, is_tagged), generic) in
773                    loc_net_inputs.iter().zip(net_in_generic_idents.iter())
774                {
775                    if *is_tagged {
776                        extra_fn_generics.push(
777                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin + 'a },
778                        );
779                    } else {
780                        extra_fn_generics.push(
781                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
782                        );
783                    }
784                }
785
786                net_in_params.push(quote! {
787                    __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
788                });
789
790                for (name, _) in &loc_net_inputs {
791                    let field_ident = syn::Ident::new(name, Span::call_site());
792                    let var_ident =
793                        syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
794                    extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
795                }
796
797                mod_items.push(quote! {
798                    pub struct #net_in_struct_ident<#(#struct_generics),*> {
799                        #(#struct_fields),*
800                    }
801                });
802            }
803
804            // Emit the module if there are any structs.
805            if !mod_items.is_empty() {
806                let output_mod: syn::Item = syn::parse_quote! {
807                    pub mod #fn_ident {
808                        use super::*;
809                        #(#mod_items)*
810                    }
811                };
812                items.push(output_mod);
813            }
814
815            // Build the function.
816            let all_params: Vec<proc_macro2::TokenStream> = cluster_params
817                .into_iter()
818                .chain(input_params)
819                .chain(output_params)
820                .chain(net_in_params)
821                .chain(net_out_params)
822                .collect();
823
824            let func = if !extra_fn_generics.is_empty() {
825                syn::parse_quote! {
826                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
827                    pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
828                        #(#extra_destructure)*
829                        #dfir_tokens
830                    }
831                }
832            } else {
833                syn::parse_quote! {
834                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
835                    pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
836                        #dfir_tokens
837                    }
838                }
839            };
840
841            items.push(func);
842        }
843
844        syn::parse_quote! {
845            use #orig_crate_name::__staged::__deps::*;
846            use #root::prelude::*;
847            use #root::runtime_support::dfir_rs as __root_dfir_rs;
848            pub use #orig_crate_name::__staged;
849
850            #( #items )*
851        }
852    }
853}