1use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46pub enum EmbeddedDeploy {}
50
51#[derive(Clone)]
53pub struct EmbeddedNode {
54 pub fn_name: String,
56 pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61 type Port = ();
62 type Meta = ();
63 type InstantiateEnv = EmbeddedInstantiateEnv;
64
65 fn next_port(&self) -> Self::Port {}
66
67 fn update_meta(&self, _meta: &Self::Meta) {}
68
69 fn instantiate(
70 &self,
71 _env: &mut Self::InstantiateEnv,
72 _meta: &mut Self::Meta,
73 _graph: DfirGraph,
74 _extra_stmts: &[syn::Stmt],
75 _sidecars: &[syn::Expr],
76 ) {
77 }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83 panic!("EmbeddedDeploy does not support external ports");
84 }
85
86 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87 fn as_bytes_bidi(
88 &self,
89 _external_port_id: ExternalPortId,
90 ) -> impl Future<
91 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92 > + 'a {
93 async { panic!("EmbeddedDeploy does not support external ports") }
94 }
95
96 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97 fn as_bincode_bidi<InT, OutT>(
98 &self,
99 _external_port_id: ExternalPortId,
100 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101 where
102 InT: Serialize + 'static,
103 OutT: DeserializeOwned + 'static,
104 {
105 async { panic!("EmbeddedDeploy does not support external ports") }
106 }
107
108 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109 fn as_bincode_sink<T>(
110 &self,
111 _external_port_id: ExternalPortId,
112 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113 where
114 T: Serialize + 'static,
115 {
116 async { panic!("EmbeddedDeploy does not support external ports") }
117 }
118
119 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120 fn as_bincode_source<T>(
121 &self,
122 _external_port_id: ExternalPortId,
123 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124 where
125 T: DeserializeOwned + 'static,
126 {
127 async { panic!("EmbeddedDeploy does not support external ports") }
128 }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133 EmbeddedNode {
134 fn_name: self.into(),
135 location_key,
136 }
137 }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142 EmbeddedNode {
143 fn_name: self.into(),
144 location_key,
145 }
146 }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151 EmbeddedNode {
152 fn_name: self.into(),
153 location_key,
154 }
155 }
156}
157
158#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166 pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168 pub singleton_inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170 pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
172 pub network_outputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
175 pub network_inputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
178 pub membership_streams: SparseSecondaryMap<LocationKey, Vec<LocationKey>>,
181}
182
183impl<'a> Deploy<'a> for EmbeddedDeploy {
184 type Meta = ();
185 type InstantiateEnv = EmbeddedInstantiateEnv;
186
187 type Process = EmbeddedNode;
188 type Cluster = EmbeddedNode;
189 type External = EmbeddedNode;
190
191 fn o2o_sink_source(
192 env: &mut Self::InstantiateEnv,
193 p1: &Self::Process,
194 _p1_port: &(),
195 p2: &Self::Process,
196 _p2_port: &(),
197 name: Option<&str>,
198 _networking_info: &crate::networking::NetworkingInfo,
199 ) -> (syn::Expr, syn::Expr) {
200 let name = name.expect(
201 "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
202 );
203
204 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
205 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
206
207 env.network_outputs
208 .entry(p1.location_key)
209 .unwrap()
210 .or_default()
211 .push((name.to_owned(), false));
212 env.network_inputs
213 .entry(p2.location_key)
214 .unwrap()
215 .or_default()
216 .push((name.to_owned(), false));
217
218 (
219 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
220 syn::parse_quote!(#source_ident),
221 )
222 }
223
224 fn o2o_connect(
225 _p1: &Self::Process,
226 _p1_port: &(),
227 _p2: &Self::Process,
228 _p2_port: &(),
229 ) -> Box<dyn FnOnce()> {
230 Box::new(|| {})
231 }
232
233 fn o2m_sink_source(
234 env: &mut Self::InstantiateEnv,
235 p1: &Self::Process,
236 _p1_port: &(),
237 c2: &Self::Cluster,
238 _c2_port: &(),
239 name: Option<&str>,
240 _networking_info: &crate::networking::NetworkingInfo,
241 ) -> (syn::Expr, syn::Expr) {
242 let name = name.expect("EmbeddedDeploy o2m networking requires a channel name.");
243 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
244 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
245 env.network_outputs
246 .entry(p1.location_key)
247 .unwrap()
248 .or_default()
249 .push((name.to_owned(), true));
250 env.network_inputs
251 .entry(c2.location_key)
252 .unwrap()
253 .or_default()
254 .push((name.to_owned(), false));
255 (
256 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
257 syn::parse_quote!(#source_ident),
258 )
259 }
260
261 fn o2m_connect(
262 _p1: &Self::Process,
263 _p1_port: &(),
264 _c2: &Self::Cluster,
265 _c2_port: &(),
266 ) -> Box<dyn FnOnce()> {
267 Box::new(|| {})
268 }
269
270 fn m2o_sink_source(
271 env: &mut Self::InstantiateEnv,
272 c1: &Self::Cluster,
273 _c1_port: &(),
274 p2: &Self::Process,
275 _p2_port: &(),
276 name: Option<&str>,
277 _networking_info: &crate::networking::NetworkingInfo,
278 ) -> (syn::Expr, syn::Expr) {
279 let name = name.expect("EmbeddedDeploy m2o networking requires a channel name.");
280 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
281 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
282 env.network_outputs
283 .entry(c1.location_key)
284 .unwrap()
285 .or_default()
286 .push((name.to_owned(), false));
287 env.network_inputs
288 .entry(p2.location_key)
289 .unwrap()
290 .or_default()
291 .push((name.to_owned(), true));
292 (
293 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
294 syn::parse_quote!(#source_ident),
295 )
296 }
297
298 fn m2o_connect(
299 _c1: &Self::Cluster,
300 _c1_port: &(),
301 _p2: &Self::Process,
302 _p2_port: &(),
303 ) -> Box<dyn FnOnce()> {
304 Box::new(|| {})
305 }
306
307 fn m2m_sink_source(
308 env: &mut Self::InstantiateEnv,
309 c1: &Self::Cluster,
310 _c1_port: &(),
311 c2: &Self::Cluster,
312 _c2_port: &(),
313 name: Option<&str>,
314 _networking_info: &crate::networking::NetworkingInfo,
315 ) -> (syn::Expr, syn::Expr) {
316 let name = name.expect("EmbeddedDeploy m2m networking requires a channel name.");
317 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
318 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
319 env.network_outputs
320 .entry(c1.location_key)
321 .unwrap()
322 .or_default()
323 .push((name.to_owned(), true));
324 env.network_inputs
325 .entry(c2.location_key)
326 .unwrap()
327 .or_default()
328 .push((name.to_owned(), true));
329 (
330 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
331 syn::parse_quote!(#source_ident),
332 )
333 }
334
335 fn m2m_connect(
336 _c1: &Self::Cluster,
337 _c1_port: &(),
338 _c2: &Self::Cluster,
339 _c2_port: &(),
340 ) -> Box<dyn FnOnce()> {
341 Box::new(|| {})
342 }
343
344 fn e2o_many_source(
345 _extra_stmts: &mut Vec<syn::Stmt>,
346 _p2: &Self::Process,
347 _p2_port: &(),
348 _codec_type: &syn::Type,
349 _shared_handle: String,
350 ) -> syn::Expr {
351 panic!("EmbeddedDeploy does not support networking (e2o)")
352 }
353
354 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
355 panic!("EmbeddedDeploy does not support networking (e2o)")
356 }
357
358 fn e2o_source(
359 _extra_stmts: &mut Vec<syn::Stmt>,
360 _p1: &Self::External,
361 _p1_port: &(),
362 _p2: &Self::Process,
363 _p2_port: &(),
364 _codec_type: &syn::Type,
365 _shared_handle: String,
366 ) -> syn::Expr {
367 panic!("EmbeddedDeploy does not support networking (e2o)")
368 }
369
370 fn e2o_connect(
371 _p1: &Self::External,
372 _p1_port: &(),
373 _p2: &Self::Process,
374 _p2_port: &(),
375 _many: bool,
376 _server_hint: NetworkHint,
377 ) -> Box<dyn FnOnce()> {
378 panic!("EmbeddedDeploy does not support networking (e2o)")
379 }
380
381 fn o2e_sink(
382 _p1: &Self::Process,
383 _p1_port: &(),
384 _p2: &Self::External,
385 _p2_port: &(),
386 _shared_handle: String,
387 ) -> syn::Expr {
388 panic!("EmbeddedDeploy does not support networking (o2e)")
389 }
390
391 #[expect(
392 unreachable_code,
393 reason = "panic before q! which is only for return type"
394 )]
395 fn cluster_ids(
396 _of_cluster: LocationKey,
397 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
398 panic!("EmbeddedDeploy does not support cluster IDs");
399 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
400 }
401
402 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
403 super::embedded_runtime::embedded_cluster_self_id()
404 }
405
406 fn cluster_membership_stream(
407 env: &mut Self::InstantiateEnv,
408 at_location: &LocationId,
409 location_id: &LocationId,
410 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
411 {
412 let at_key = match at_location {
413 LocationId::Process(key) | LocationId::Cluster(key) => *key,
414 _ => panic!("cluster_membership_stream must be called from a process or cluster"),
415 };
416 let cluster_key = match location_id {
417 LocationId::Cluster(key) => *key,
418 _ => panic!("cluster_membership_stream target must be a cluster"),
419 };
420 let vec = env.membership_streams.entry(at_key).unwrap().or_default();
421 let idx = if let Some(pos) = vec.iter().position(|k| *k == cluster_key) {
422 pos
423 } else {
424 vec.push(cluster_key);
425 vec.len() - 1
426 };
427
428 super::embedded_runtime::embedded_cluster_membership_stream(idx)
429 }
430
431 fn register_embedded_stream_input(
432 env: &mut Self::InstantiateEnv,
433 location_key: LocationKey,
434 ident: &syn::Ident,
435 element_type: &syn::Type,
436 ) {
437 env.inputs
438 .entry(location_key)
439 .unwrap()
440 .or_default()
441 .push((ident.clone(), element_type.clone()));
442 }
443
444 fn register_embedded_singleton_input(
445 env: &mut Self::InstantiateEnv,
446 location_key: LocationKey,
447 ident: &syn::Ident,
448 element_type: &syn::Type,
449 ) {
450 env.singleton_inputs
451 .entry(location_key)
452 .unwrap()
453 .or_default()
454 .push((ident.clone(), element_type.clone()));
455 }
456
457 fn register_embedded_output(
458 env: &mut Self::InstantiateEnv,
459 location_key: LocationKey,
460 ident: &syn::Ident,
461 element_type: &syn::Type,
462 ) {
463 env.outputs
464 .entry(location_key)
465 .unwrap()
466 .or_default()
467 .push((ident.clone(), element_type.clone()));
468 }
469}
470
471impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
472 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
503 let mut env = EmbeddedInstantiateEnv::default();
504 let compiled = self.compile_internal(&mut env);
505
506 let root = crate::staging_util::get_this_crate();
507 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
508
509 let mut items: Vec<syn::Item> = Vec::new();
510
511 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
513 location_keys.sort();
514
515 let fn_names: SparseSecondaryMap<LocationKey, &str> = location_keys
517 .iter()
518 .map(|&k| {
519 let name = self
520 .processes
521 .get(k)
522 .map(|n| n.fn_name.as_str())
523 .or_else(|| self.clusters.get(k).map(|n| n.fn_name.as_str()))
524 .or_else(|| self.externals.get(k).map(|n| n.fn_name.as_str()))
525 .expect("location key not found in any node map");
526 (k, name)
527 })
528 .collect();
529
530 for location_key in location_keys {
531 let graph = &compiled.all_dfir()[location_key];
532
533 let fn_name = fn_names[location_key];
535 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
536
537 let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
539 loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
540
541 let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
543 loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
544
545 let mut diagnostics = Diagnostics::new();
546 let dfir_tokens = graph
547 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
548 .expect("DFIR code generation failed with diagnostics.");
549
550 let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
552 let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
553 let mut cluster_params: Vec<proc_macro2::TokenStream> = Vec::new();
554 let mut output_params: Vec<proc_macro2::TokenStream> = Vec::new();
555 let mut net_out_params: Vec<proc_macro2::TokenStream> = Vec::new();
556 let mut net_in_params: Vec<proc_macro2::TokenStream> = Vec::new();
557 let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
558
559 if self.clusters.contains_key(location_key) {
561 cluster_params.push(quote! {
562 __cluster_self_id: &'a #root::location::member_id::TaglessMemberId
563 });
564 let self_id_ident = syn::Ident::new(
566 &format!("__hydro_lang_cluster_self_id_{}", location_key),
567 Span::call_site(),
568 );
569 extra_destructure.push(quote! {
570 let #self_id_ident = __cluster_self_id;
571 });
572 }
573
574 if let Some(loc_memberships) = env.membership_streams.get(location_key) {
576 let membership_struct_ident =
577 syn::Ident::new("EmbeddedMembershipStreams", Span::call_site());
578
579 let mem_generic_idents: Vec<syn::Ident> = loc_memberships
580 .iter()
581 .enumerate()
582 .map(|(i, _)| quote::format_ident!("__Mem{}", i))
583 .collect();
584
585 let mem_field_names: Vec<syn::Ident> = loc_memberships
586 .iter()
587 .map(|k| {
588 let cluster_fn_name = fn_names[*k];
589 syn::Ident::new(cluster_fn_name, Span::call_site())
590 })
591 .collect();
592
593 let struct_fields: Vec<proc_macro2::TokenStream> = mem_field_names
594 .iter()
595 .zip(mem_generic_idents.iter())
596 .map(|(field, generic)| {
597 quote! { pub #field: #generic }
598 })
599 .collect();
600
601 let struct_generics: Vec<proc_macro2::TokenStream> = mem_generic_idents
602 .iter()
603 .map(|generic| {
604 quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin }
605 })
606 .collect();
607
608 for generic in &mem_generic_idents {
609 extra_fn_generics.push(
610 quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin + 'a },
611 );
612 }
613
614 cluster_params.push(quote! {
615 __membership: #fn_ident::#membership_struct_ident<#(#mem_generic_idents),*>
616 });
617
618 for (i, field) in mem_field_names.iter().enumerate() {
619 let var_ident =
620 syn::Ident::new(&format!("__membership_{}", i), Span::call_site());
621 extra_destructure.push(quote! {
622 let #var_ident = __membership.#field;
623 });
624 }
625
626 mod_items.push(quote! {
627 pub struct #membership_struct_ident<#(#struct_generics),*> {
628 #(#struct_fields),*
629 }
630 });
631 }
632
633 let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
635 .iter()
636 .map(|(ident, element_type)| {
637 quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
638 })
639 .collect();
640
641 let mut loc_singleton_inputs = env
643 .singleton_inputs
644 .get(location_key)
645 .cloned()
646 .unwrap_or_default();
647 loc_singleton_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
648
649 let singleton_input_params: Vec<proc_macro2::TokenStream> = loc_singleton_inputs
650 .iter()
651 .map(|(ident, element_type)| {
652 quote! { #ident: #element_type }
653 })
654 .collect();
655
656 if !loc_outputs.is_empty() {
658 let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
659
660 let output_generic_idents: Vec<syn::Ident> = loc_outputs
661 .iter()
662 .enumerate()
663 .map(|(i, _)| quote::format_ident!("__Out{}", i))
664 .collect();
665
666 let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
667 .iter()
668 .zip(output_generic_idents.iter())
669 .map(|((ident, _), generic)| {
670 quote! { pub #ident: #generic }
671 })
672 .collect();
673
674 let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
675 .iter()
676 .zip(output_generic_idents.iter())
677 .map(|((_, element_type), generic)| {
678 quote! { #generic: FnMut(#element_type) }
679 })
680 .collect();
681
682 for ((_, element_type), generic) in
683 loc_outputs.iter().zip(output_generic_idents.iter())
684 {
685 extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
686 }
687
688 output_params.push(quote! {
689 __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
690 });
691
692 for (ident, _) in &loc_outputs {
693 extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
694 }
695
696 mod_items.push(quote! {
697 pub struct #output_struct_ident<#(#struct_generics),*> {
698 #(#struct_fields),*
699 }
700 });
701 }
702
703 if let Some(mut loc_net_outputs) = env.network_outputs.remove(location_key) {
705 loc_net_outputs.sort();
706
707 let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
708
709 let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
710 .iter()
711 .enumerate()
712 .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
713 .collect();
714
715 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
716 .iter()
717 .zip(net_out_generic_idents.iter())
718 .map(|((name, _), generic)| {
719 let field_ident = syn::Ident::new(name, Span::call_site());
720 quote! { pub #field_ident: #generic }
721 })
722 .collect();
723
724 let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_outputs
725 .iter()
726 .zip(net_out_generic_idents.iter())
727 .map(|((_, is_tagged), generic)| {
728 if *is_tagged {
729 quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) }
730 } else {
731 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
732 }
733 })
734 .collect();
735
736 for ((_, is_tagged), generic) in
737 loc_net_outputs.iter().zip(net_out_generic_idents.iter())
738 {
739 if *is_tagged {
740 extra_fn_generics.push(
741 quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) + 'a },
742 );
743 } else {
744 extra_fn_generics.push(
745 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
746 );
747 }
748 }
749
750 net_out_params.push(quote! {
751 __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
752 });
753
754 for (name, _) in &loc_net_outputs {
755 let field_ident = syn::Ident::new(name, Span::call_site());
756 let var_ident =
757 syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
758 extra_destructure
759 .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
760 }
761
762 mod_items.push(quote! {
763 pub struct #net_out_struct_ident<#(#struct_generics),*> {
764 #(#struct_fields),*
765 }
766 });
767 }
768
769 if let Some(mut loc_net_inputs) = env.network_inputs.remove(location_key) {
771 loc_net_inputs.sort();
772
773 let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
774
775 let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
776 .iter()
777 .enumerate()
778 .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
779 .collect();
780
781 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
782 .iter()
783 .zip(net_in_generic_idents.iter())
784 .map(|((name, _), generic)| {
785 let field_ident = syn::Ident::new(name, Span::call_site());
786 quote! { pub #field_ident: #generic }
787 })
788 .collect();
789
790 let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_inputs
791 .iter()
792 .zip(net_in_generic_idents.iter())
793 .map(|((_, is_tagged), generic)| {
794 if *is_tagged {
795 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin }
796 } else {
797 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
798 }
799 })
800 .collect();
801
802 for ((_, is_tagged), generic) in
803 loc_net_inputs.iter().zip(net_in_generic_idents.iter())
804 {
805 if *is_tagged {
806 extra_fn_generics.push(
807 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin + 'a },
808 );
809 } else {
810 extra_fn_generics.push(
811 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
812 );
813 }
814 }
815
816 net_in_params.push(quote! {
817 __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
818 });
819
820 for (name, _) in &loc_net_inputs {
821 let field_ident = syn::Ident::new(name, Span::call_site());
822 let var_ident =
823 syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
824 extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
825 }
826
827 mod_items.push(quote! {
828 pub struct #net_in_struct_ident<#(#struct_generics),*> {
829 #(#struct_fields),*
830 }
831 });
832 }
833
834 if !mod_items.is_empty() {
836 let output_mod: syn::Item = syn::parse_quote! {
837 pub mod #fn_ident {
838 use super::*;
839 #(#mod_items)*
840 }
841 };
842 items.push(output_mod);
843 }
844
845 let all_params: Vec<proc_macro2::TokenStream> = cluster_params
847 .into_iter()
848 .chain(singleton_input_params)
849 .chain(input_params)
850 .chain(output_params)
851 .chain(net_in_params)
852 .chain(net_out_params)
853 .collect();
854
855 let func = if !extra_fn_generics.is_empty() {
856 syn::parse_quote! {
857 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
858 pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
859 #(#extra_destructure)*
860 #dfir_tokens
861 }
862 }
863 } else {
864 syn::parse_quote! {
865 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
866 pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
867 #dfir_tokens
868 }
869 }
870 };
871
872 items.push(func);
873 }
874
875 syn::parse_quote! {
876 use #orig_crate_name::__staged::__deps::*;
877 use #root::prelude::*;
878 use #root::runtime_support::dfir_rs as __root_dfir_rs;
879 pub use #orig_crate_name::__staged;
880
881 #( #items )*
882 }
883 }
884}