1use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46pub enum EmbeddedDeploy {}
50
51#[derive(Clone)]
53pub struct EmbeddedNode {
54 pub fn_name: String,
56 pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61 type Port = ();
62 type Meta = ();
63 type InstantiateEnv = EmbeddedInstantiateEnv;
64
65 fn next_port(&self) -> Self::Port {}
66
67 fn update_meta(&self, _meta: &Self::Meta) {}
68
69 fn instantiate(
70 &self,
71 _env: &mut Self::InstantiateEnv,
72 _meta: &mut Self::Meta,
73 _graph: DfirGraph,
74 _extra_stmts: &[syn::Stmt],
75 _sidecars: &[syn::Expr],
76 ) {
77 }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83 panic!("EmbeddedDeploy does not support external ports");
84 }
85
86 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87 fn as_bytes_bidi(
88 &self,
89 _external_port_id: ExternalPortId,
90 ) -> impl Future<
91 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92 > + 'a {
93 async { panic!("EmbeddedDeploy does not support external ports") }
94 }
95
96 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97 fn as_bincode_bidi<InT, OutT>(
98 &self,
99 _external_port_id: ExternalPortId,
100 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101 where
102 InT: Serialize + 'static,
103 OutT: DeserializeOwned + 'static,
104 {
105 async { panic!("EmbeddedDeploy does not support external ports") }
106 }
107
108 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109 fn as_bincode_sink<T>(
110 &self,
111 _external_port_id: ExternalPortId,
112 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113 where
114 T: Serialize + 'static,
115 {
116 async { panic!("EmbeddedDeploy does not support external ports") }
117 }
118
119 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120 fn as_bincode_source<T>(
121 &self,
122 _external_port_id: ExternalPortId,
123 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124 where
125 T: DeserializeOwned + 'static,
126 {
127 async { panic!("EmbeddedDeploy does not support external ports") }
128 }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133 EmbeddedNode {
134 fn_name: self.into(),
135 location_key,
136 }
137 }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142 EmbeddedNode {
143 fn_name: self.into(),
144 location_key,
145 }
146 }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151 EmbeddedNode {
152 fn_name: self.into(),
153 location_key,
154 }
155 }
156}
157
158#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166 pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168 pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170 pub network_outputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
173 pub network_inputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
176 pub membership_streams: SparseSecondaryMap<LocationKey, Vec<LocationKey>>,
179}
180
181impl<'a> Deploy<'a> for EmbeddedDeploy {
182 type Meta = ();
183 type InstantiateEnv = EmbeddedInstantiateEnv;
184
185 type Process = EmbeddedNode;
186 type Cluster = EmbeddedNode;
187 type External = EmbeddedNode;
188
189 fn o2o_sink_source(
190 env: &mut Self::InstantiateEnv,
191 p1: &Self::Process,
192 _p1_port: &(),
193 p2: &Self::Process,
194 _p2_port: &(),
195 name: Option<&str>,
196 _networking_info: &crate::networking::NetworkingInfo,
197 ) -> (syn::Expr, syn::Expr) {
198 let name = name.expect(
199 "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
200 );
201
202 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
203 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
204
205 env.network_outputs
206 .entry(p1.location_key)
207 .unwrap()
208 .or_default()
209 .push((name.to_owned(), false));
210 env.network_inputs
211 .entry(p2.location_key)
212 .unwrap()
213 .or_default()
214 .push((name.to_owned(), false));
215
216 (
217 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
218 syn::parse_quote!(#source_ident),
219 )
220 }
221
222 fn o2o_connect(
223 _p1: &Self::Process,
224 _p1_port: &(),
225 _p2: &Self::Process,
226 _p2_port: &(),
227 ) -> Box<dyn FnOnce()> {
228 Box::new(|| {})
229 }
230
231 fn o2m_sink_source(
232 env: &mut Self::InstantiateEnv,
233 p1: &Self::Process,
234 _p1_port: &(),
235 c2: &Self::Cluster,
236 _c2_port: &(),
237 name: Option<&str>,
238 _networking_info: &crate::networking::NetworkingInfo,
239 ) -> (syn::Expr, syn::Expr) {
240 let name = name.expect("EmbeddedDeploy o2m networking requires a channel name.");
241 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
242 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
243 env.network_outputs
244 .entry(p1.location_key)
245 .unwrap()
246 .or_default()
247 .push((name.to_owned(), true));
248 env.network_inputs
249 .entry(c2.location_key)
250 .unwrap()
251 .or_default()
252 .push((name.to_owned(), false));
253 (
254 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
255 syn::parse_quote!(#source_ident),
256 )
257 }
258
259 fn o2m_connect(
260 _p1: &Self::Process,
261 _p1_port: &(),
262 _c2: &Self::Cluster,
263 _c2_port: &(),
264 ) -> Box<dyn FnOnce()> {
265 Box::new(|| {})
266 }
267
268 fn m2o_sink_source(
269 env: &mut Self::InstantiateEnv,
270 c1: &Self::Cluster,
271 _c1_port: &(),
272 p2: &Self::Process,
273 _p2_port: &(),
274 name: Option<&str>,
275 _networking_info: &crate::networking::NetworkingInfo,
276 ) -> (syn::Expr, syn::Expr) {
277 let name = name.expect("EmbeddedDeploy m2o networking requires a channel name.");
278 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
279 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
280 env.network_outputs
281 .entry(c1.location_key)
282 .unwrap()
283 .or_default()
284 .push((name.to_owned(), false));
285 env.network_inputs
286 .entry(p2.location_key)
287 .unwrap()
288 .or_default()
289 .push((name.to_owned(), true));
290 (
291 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
292 syn::parse_quote!(#source_ident),
293 )
294 }
295
296 fn m2o_connect(
297 _c1: &Self::Cluster,
298 _c1_port: &(),
299 _p2: &Self::Process,
300 _p2_port: &(),
301 ) -> Box<dyn FnOnce()> {
302 Box::new(|| {})
303 }
304
305 fn m2m_sink_source(
306 env: &mut Self::InstantiateEnv,
307 c1: &Self::Cluster,
308 _c1_port: &(),
309 c2: &Self::Cluster,
310 _c2_port: &(),
311 name: Option<&str>,
312 _networking_info: &crate::networking::NetworkingInfo,
313 ) -> (syn::Expr, syn::Expr) {
314 let name = name.expect("EmbeddedDeploy m2m networking requires a channel name.");
315 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
316 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
317 env.network_outputs
318 .entry(c1.location_key)
319 .unwrap()
320 .or_default()
321 .push((name.to_owned(), true));
322 env.network_inputs
323 .entry(c2.location_key)
324 .unwrap()
325 .or_default()
326 .push((name.to_owned(), true));
327 (
328 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
329 syn::parse_quote!(#source_ident),
330 )
331 }
332
333 fn m2m_connect(
334 _c1: &Self::Cluster,
335 _c1_port: &(),
336 _c2: &Self::Cluster,
337 _c2_port: &(),
338 ) -> Box<dyn FnOnce()> {
339 Box::new(|| {})
340 }
341
342 fn e2o_many_source(
343 _extra_stmts: &mut Vec<syn::Stmt>,
344 _p2: &Self::Process,
345 _p2_port: &(),
346 _codec_type: &syn::Type,
347 _shared_handle: String,
348 ) -> syn::Expr {
349 panic!("EmbeddedDeploy does not support networking (e2o)")
350 }
351
352 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
353 panic!("EmbeddedDeploy does not support networking (e2o)")
354 }
355
356 fn e2o_source(
357 _extra_stmts: &mut Vec<syn::Stmt>,
358 _p1: &Self::External,
359 _p1_port: &(),
360 _p2: &Self::Process,
361 _p2_port: &(),
362 _codec_type: &syn::Type,
363 _shared_handle: String,
364 ) -> syn::Expr {
365 panic!("EmbeddedDeploy does not support networking (e2o)")
366 }
367
368 fn e2o_connect(
369 _p1: &Self::External,
370 _p1_port: &(),
371 _p2: &Self::Process,
372 _p2_port: &(),
373 _many: bool,
374 _server_hint: NetworkHint,
375 ) -> Box<dyn FnOnce()> {
376 panic!("EmbeddedDeploy does not support networking (e2o)")
377 }
378
379 fn o2e_sink(
380 _p1: &Self::Process,
381 _p1_port: &(),
382 _p2: &Self::External,
383 _p2_port: &(),
384 _shared_handle: String,
385 ) -> syn::Expr {
386 panic!("EmbeddedDeploy does not support networking (o2e)")
387 }
388
389 #[expect(
390 unreachable_code,
391 reason = "panic before q! which is only for return type"
392 )]
393 fn cluster_ids(
394 _of_cluster: LocationKey,
395 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
396 panic!("EmbeddedDeploy does not support cluster IDs");
397 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
398 }
399
400 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
401 super::embedded_runtime::embedded_cluster_self_id()
402 }
403
404 fn cluster_membership_stream(
405 env: &mut Self::InstantiateEnv,
406 at_location: &LocationId,
407 location_id: &LocationId,
408 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
409 {
410 let at_key = match at_location {
411 LocationId::Process(key) | LocationId::Cluster(key) => *key,
412 _ => panic!("cluster_membership_stream must be called from a process or cluster"),
413 };
414 let cluster_key = match location_id {
415 LocationId::Cluster(key) => *key,
416 _ => panic!("cluster_membership_stream target must be a cluster"),
417 };
418 let vec = env.membership_streams.entry(at_key).unwrap().or_default();
419 let idx = if let Some(pos) = vec.iter().position(|k| *k == cluster_key) {
420 pos
421 } else {
422 vec.push(cluster_key);
423 vec.len() - 1
424 };
425
426 super::embedded_runtime::embedded_cluster_membership_stream(idx)
427 }
428
429 fn register_embedded_input(
430 env: &mut Self::InstantiateEnv,
431 location_key: LocationKey,
432 ident: &syn::Ident,
433 element_type: &syn::Type,
434 ) {
435 env.inputs
436 .entry(location_key)
437 .unwrap()
438 .or_default()
439 .push((ident.clone(), element_type.clone()));
440 }
441
442 fn register_embedded_output(
443 env: &mut Self::InstantiateEnv,
444 location_key: LocationKey,
445 ident: &syn::Ident,
446 element_type: &syn::Type,
447 ) {
448 env.outputs
449 .entry(location_key)
450 .unwrap()
451 .or_default()
452 .push((ident.clone(), element_type.clone()));
453 }
454}
455
456impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
457 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
488 let mut env = EmbeddedInstantiateEnv::default();
489 let compiled = self.compile_internal(&mut env);
490
491 let root = crate::staging_util::get_this_crate();
492 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
493
494 let mut items: Vec<syn::Item> = Vec::new();
495
496 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
498 location_keys.sort();
499
500 let fn_names: SparseSecondaryMap<LocationKey, &str> = location_keys
502 .iter()
503 .map(|&k| {
504 let name = self
505 .processes
506 .get(k)
507 .map(|n| n.fn_name.as_str())
508 .or_else(|| self.clusters.get(k).map(|n| n.fn_name.as_str()))
509 .or_else(|| self.externals.get(k).map(|n| n.fn_name.as_str()))
510 .expect("location key not found in any node map");
511 (k, name)
512 })
513 .collect();
514
515 for location_key in location_keys {
516 let graph = &compiled.all_dfir()[location_key];
517
518 let fn_name = fn_names[location_key];
520 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
521
522 let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
524 loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
525
526 let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
528 loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
529
530 let mut diagnostics = Diagnostics::new();
531 let dfir_tokens = graph
532 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
533 .expect("DFIR code generation failed with diagnostics.");
534
535 let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
537 let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
538 let mut cluster_params: Vec<proc_macro2::TokenStream> = Vec::new();
539 let mut output_params: Vec<proc_macro2::TokenStream> = Vec::new();
540 let mut net_out_params: Vec<proc_macro2::TokenStream> = Vec::new();
541 let mut net_in_params: Vec<proc_macro2::TokenStream> = Vec::new();
542 let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
543
544 if self.clusters.contains_key(location_key) {
546 cluster_params.push(quote! {
547 __cluster_self_id: &'a #root::location::member_id::TaglessMemberId
548 });
549 let self_id_ident = syn::Ident::new(
551 &format!("__hydro_lang_cluster_self_id_{}", location_key),
552 Span::call_site(),
553 );
554 extra_destructure.push(quote! {
555 let #self_id_ident = __cluster_self_id;
556 });
557 }
558
559 if let Some(loc_memberships) = env.membership_streams.get(location_key) {
561 let membership_struct_ident =
562 syn::Ident::new("EmbeddedMembershipStreams", Span::call_site());
563
564 let mem_generic_idents: Vec<syn::Ident> = loc_memberships
565 .iter()
566 .enumerate()
567 .map(|(i, _)| quote::format_ident!("__Mem{}", i))
568 .collect();
569
570 let mem_field_names: Vec<syn::Ident> = loc_memberships
571 .iter()
572 .map(|k| {
573 let cluster_fn_name = fn_names[*k];
574 syn::Ident::new(cluster_fn_name, Span::call_site())
575 })
576 .collect();
577
578 let struct_fields: Vec<proc_macro2::TokenStream> = mem_field_names
579 .iter()
580 .zip(mem_generic_idents.iter())
581 .map(|(field, generic)| {
582 quote! { pub #field: #generic }
583 })
584 .collect();
585
586 let struct_generics: Vec<proc_macro2::TokenStream> = mem_generic_idents
587 .iter()
588 .map(|generic| {
589 quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin }
590 })
591 .collect();
592
593 for generic in &mem_generic_idents {
594 extra_fn_generics.push(
595 quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin + 'a },
596 );
597 }
598
599 cluster_params.push(quote! {
600 __membership: #fn_ident::#membership_struct_ident<#(#mem_generic_idents),*>
601 });
602
603 for (i, field) in mem_field_names.iter().enumerate() {
604 let var_ident =
605 syn::Ident::new(&format!("__membership_{}", i), Span::call_site());
606 extra_destructure.push(quote! {
607 let #var_ident = __membership.#field;
608 });
609 }
610
611 mod_items.push(quote! {
612 pub struct #membership_struct_ident<#(#struct_generics),*> {
613 #(#struct_fields),*
614 }
615 });
616 }
617
618 let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
620 .iter()
621 .map(|(ident, element_type)| {
622 quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
623 })
624 .collect();
625
626 if !loc_outputs.is_empty() {
628 let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
629
630 let output_generic_idents: Vec<syn::Ident> = loc_outputs
631 .iter()
632 .enumerate()
633 .map(|(i, _)| quote::format_ident!("__Out{}", i))
634 .collect();
635
636 let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
637 .iter()
638 .zip(output_generic_idents.iter())
639 .map(|((ident, _), generic)| {
640 quote! { pub #ident: #generic }
641 })
642 .collect();
643
644 let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
645 .iter()
646 .zip(output_generic_idents.iter())
647 .map(|((_, element_type), generic)| {
648 quote! { #generic: FnMut(#element_type) }
649 })
650 .collect();
651
652 for ((_, element_type), generic) in
653 loc_outputs.iter().zip(output_generic_idents.iter())
654 {
655 extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
656 }
657
658 output_params.push(quote! {
659 __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
660 });
661
662 for (ident, _) in &loc_outputs {
663 extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
664 }
665
666 mod_items.push(quote! {
667 pub struct #output_struct_ident<#(#struct_generics),*> {
668 #(#struct_fields),*
669 }
670 });
671 }
672
673 if let Some(mut loc_net_outputs) = env.network_outputs.remove(location_key) {
675 loc_net_outputs.sort();
676
677 let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
678
679 let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
680 .iter()
681 .enumerate()
682 .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
683 .collect();
684
685 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
686 .iter()
687 .zip(net_out_generic_idents.iter())
688 .map(|((name, _), generic)| {
689 let field_ident = syn::Ident::new(name, Span::call_site());
690 quote! { pub #field_ident: #generic }
691 })
692 .collect();
693
694 let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_outputs
695 .iter()
696 .zip(net_out_generic_idents.iter())
697 .map(|((_, is_tagged), generic)| {
698 if *is_tagged {
699 quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) }
700 } else {
701 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
702 }
703 })
704 .collect();
705
706 for ((_, is_tagged), generic) in
707 loc_net_outputs.iter().zip(net_out_generic_idents.iter())
708 {
709 if *is_tagged {
710 extra_fn_generics.push(
711 quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) + 'a },
712 );
713 } else {
714 extra_fn_generics.push(
715 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
716 );
717 }
718 }
719
720 net_out_params.push(quote! {
721 __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
722 });
723
724 for (name, _) in &loc_net_outputs {
725 let field_ident = syn::Ident::new(name, Span::call_site());
726 let var_ident =
727 syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
728 extra_destructure
729 .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
730 }
731
732 mod_items.push(quote! {
733 pub struct #net_out_struct_ident<#(#struct_generics),*> {
734 #(#struct_fields),*
735 }
736 });
737 }
738
739 if let Some(mut loc_net_inputs) = env.network_inputs.remove(location_key) {
741 loc_net_inputs.sort();
742
743 let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
744
745 let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
746 .iter()
747 .enumerate()
748 .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
749 .collect();
750
751 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
752 .iter()
753 .zip(net_in_generic_idents.iter())
754 .map(|((name, _), generic)| {
755 let field_ident = syn::Ident::new(name, Span::call_site());
756 quote! { pub #field_ident: #generic }
757 })
758 .collect();
759
760 let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_inputs
761 .iter()
762 .zip(net_in_generic_idents.iter())
763 .map(|((_, is_tagged), generic)| {
764 if *is_tagged {
765 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin }
766 } else {
767 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
768 }
769 })
770 .collect();
771
772 for ((_, is_tagged), generic) in
773 loc_net_inputs.iter().zip(net_in_generic_idents.iter())
774 {
775 if *is_tagged {
776 extra_fn_generics.push(
777 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin + 'a },
778 );
779 } else {
780 extra_fn_generics.push(
781 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
782 );
783 }
784 }
785
786 net_in_params.push(quote! {
787 __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
788 });
789
790 for (name, _) in &loc_net_inputs {
791 let field_ident = syn::Ident::new(name, Span::call_site());
792 let var_ident =
793 syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
794 extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
795 }
796
797 mod_items.push(quote! {
798 pub struct #net_in_struct_ident<#(#struct_generics),*> {
799 #(#struct_fields),*
800 }
801 });
802 }
803
804 if !mod_items.is_empty() {
806 let output_mod: syn::Item = syn::parse_quote! {
807 pub mod #fn_ident {
808 use super::*;
809 #(#mod_items)*
810 }
811 };
812 items.push(output_mod);
813 }
814
815 let all_params: Vec<proc_macro2::TokenStream> = cluster_params
817 .into_iter()
818 .chain(input_params)
819 .chain(output_params)
820 .chain(net_in_params)
821 .chain(net_out_params)
822 .collect();
823
824 let func = if !extra_fn_generics.is_empty() {
825 syn::parse_quote! {
826 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
827 pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
828 #(#extra_destructure)*
829 #dfir_tokens
830 }
831 }
832 } else {
833 syn::parse_quote! {
834 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
835 pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
836 #dfir_tokens
837 }
838 }
839 };
840
841 items.push(func);
842 }
843
844 syn::parse_quote! {
845 use #orig_crate_name::__staged::__deps::*;
846 use #root::prelude::*;
847 use #root::runtime_support::dfir_rs as __root_dfir_rs;
848 pub use #orig_crate_name::__staged;
849
850 #( #items )*
851 }
852 }
853}