1use std::future::Future;
17use std::io::Error;
18use std::pin::Pin;
19
20use bytes::{Bytes, BytesMut};
21use dfir_lang::diagnostic::Diagnostics;
22use dfir_lang::graph::DfirGraph;
23use futures::{Sink, Stream};
24use proc_macro2::Span;
25use quote::quote;
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use slotmap::SparseSecondaryMap;
29use stageleft::{QuotedWithContext, q};
30
31use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
32use crate::compile::builder::ExternalPortId;
33use crate::location::dynamic::LocationId;
34use crate::location::member_id::TaglessMemberId;
35use crate::location::{LocationKey, MembershipEvent, NetworkHint};
36
37pub enum EmbeddedDeploy {}
41
42#[derive(Clone)]
44pub struct EmbeddedNode {
45 pub fn_name: String,
47}
48
49impl Node for EmbeddedNode {
50 type Port = ();
51 type Meta = ();
52 type InstantiateEnv = EmbeddedInstantiateEnv;
53
54 fn next_port(&self) -> Self::Port {}
55
56 fn update_meta(&self, _meta: &Self::Meta) {}
57
58 fn instantiate(
59 &self,
60 _env: &mut Self::InstantiateEnv,
61 _meta: &mut Self::Meta,
62 _graph: DfirGraph,
63 _extra_stmts: &[syn::Stmt],
64 _sidecars: &[syn::Expr],
65 ) {
66 }
68}
69
70impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
71 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
72 panic!("EmbeddedDeploy does not support external ports");
73 }
74
75 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
76 fn as_bytes_bidi(
77 &self,
78 _external_port_id: ExternalPortId,
79 ) -> impl Future<
80 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
81 > + 'a {
82 async { panic!("EmbeddedDeploy does not support external ports") }
83 }
84
85 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
86 fn as_bincode_bidi<InT, OutT>(
87 &self,
88 _external_port_id: ExternalPortId,
89 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
90 where
91 InT: Serialize + 'static,
92 OutT: DeserializeOwned + 'static,
93 {
94 async { panic!("EmbeddedDeploy does not support external ports") }
95 }
96
97 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
98 fn as_bincode_sink<T>(
99 &self,
100 _external_port_id: ExternalPortId,
101 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
102 where
103 T: Serialize + 'static,
104 {
105 async { panic!("EmbeddedDeploy does not support external ports") }
106 }
107
108 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109 fn as_bincode_source<T>(
110 &self,
111 _external_port_id: ExternalPortId,
112 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
113 where
114 T: DeserializeOwned + 'static,
115 {
116 async { panic!("EmbeddedDeploy does not support external ports") }
117 }
118}
119
120impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
121 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
122 EmbeddedNode {
123 fn_name: self.into(),
124 }
125 }
126}
127
128impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
129 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
130 EmbeddedNode {
131 fn_name: self.into(),
132 }
133 }
134}
135
136impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
137 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
138 EmbeddedNode {
139 fn_name: self.into(),
140 }
141 }
142}
143
144#[derive(Default)]
151pub struct EmbeddedInstantiateEnv {
152 pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
154 pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
156}
157
158impl<'a> Deploy<'a> for EmbeddedDeploy {
159 type Meta = ();
160 type InstantiateEnv = EmbeddedInstantiateEnv;
161
162 type Process = EmbeddedNode;
163 type Cluster = EmbeddedNode;
164 type External = EmbeddedNode;
165
166 fn o2o_sink_source(
167 _p1: &Self::Process,
168 _p1_port: &(),
169 _p2: &Self::Process,
170 _p2_port: &(),
171 ) -> (syn::Expr, syn::Expr) {
172 panic!("EmbeddedDeploy does not support networking (o2o)")
173 }
174
175 fn o2o_connect(
176 _p1: &Self::Process,
177 _p1_port: &(),
178 _p2: &Self::Process,
179 _p2_port: &(),
180 ) -> Box<dyn FnOnce()> {
181 panic!("EmbeddedDeploy does not support networking (o2o)")
182 }
183
184 fn o2m_sink_source(
185 _p1: &Self::Process,
186 _p1_port: &(),
187 _c2: &Self::Cluster,
188 _c2_port: &(),
189 ) -> (syn::Expr, syn::Expr) {
190 panic!("EmbeddedDeploy does not support networking (o2m)")
191 }
192
193 fn o2m_connect(
194 _p1: &Self::Process,
195 _p1_port: &(),
196 _c2: &Self::Cluster,
197 _c2_port: &(),
198 ) -> Box<dyn FnOnce()> {
199 panic!("EmbeddedDeploy does not support networking (o2m)")
200 }
201
202 fn m2o_sink_source(
203 _c1: &Self::Cluster,
204 _c1_port: &(),
205 _p2: &Self::Process,
206 _p2_port: &(),
207 ) -> (syn::Expr, syn::Expr) {
208 panic!("EmbeddedDeploy does not support networking (m2o)")
209 }
210
211 fn m2o_connect(
212 _c1: &Self::Cluster,
213 _c1_port: &(),
214 _p2: &Self::Process,
215 _p2_port: &(),
216 ) -> Box<dyn FnOnce()> {
217 panic!("EmbeddedDeploy does not support networking (m2o)")
218 }
219
220 fn m2m_sink_source(
221 _c1: &Self::Cluster,
222 _c1_port: &(),
223 _c2: &Self::Cluster,
224 _c2_port: &(),
225 ) -> (syn::Expr, syn::Expr) {
226 panic!("EmbeddedDeploy does not support networking (m2m)")
227 }
228
229 fn m2m_connect(
230 _c1: &Self::Cluster,
231 _c1_port: &(),
232 _c2: &Self::Cluster,
233 _c2_port: &(),
234 ) -> Box<dyn FnOnce()> {
235 panic!("EmbeddedDeploy does not support networking (m2m)")
236 }
237
238 fn e2o_many_source(
239 _extra_stmts: &mut Vec<syn::Stmt>,
240 _p2: &Self::Process,
241 _p2_port: &(),
242 _codec_type: &syn::Type,
243 _shared_handle: String,
244 ) -> syn::Expr {
245 panic!("EmbeddedDeploy does not support networking (e2o)")
246 }
247
248 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
249 panic!("EmbeddedDeploy does not support networking (e2o)")
250 }
251
252 fn e2o_source(
253 _extra_stmts: &mut Vec<syn::Stmt>,
254 _p1: &Self::External,
255 _p1_port: &(),
256 _p2: &Self::Process,
257 _p2_port: &(),
258 _codec_type: &syn::Type,
259 _shared_handle: String,
260 ) -> syn::Expr {
261 panic!("EmbeddedDeploy does not support networking (e2o)")
262 }
263
264 fn e2o_connect(
265 _p1: &Self::External,
266 _p1_port: &(),
267 _p2: &Self::Process,
268 _p2_port: &(),
269 _many: bool,
270 _server_hint: NetworkHint,
271 ) -> Box<dyn FnOnce()> {
272 panic!("EmbeddedDeploy does not support networking (e2o)")
273 }
274
275 fn o2e_sink(
276 _p1: &Self::Process,
277 _p1_port: &(),
278 _p2: &Self::External,
279 _p2_port: &(),
280 _shared_handle: String,
281 ) -> syn::Expr {
282 panic!("EmbeddedDeploy does not support networking (o2e)")
283 }
284
285 #[expect(
286 unreachable_code,
287 reason = "panic before q! which is only for return type"
288 )]
289 fn cluster_ids(
290 _of_cluster: LocationKey,
291 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
292 panic!("EmbeddedDeploy does not support cluster IDs");
293 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
294 }
295
296 #[expect(
297 unreachable_code,
298 reason = "panic before q! which is only for return type"
299 )]
300 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
301 panic!("EmbeddedDeploy does not support cluster self ID");
302 q!(unreachable!(
303 "EmbeddedDeploy does not support cluster self ID"
304 ))
305 }
306
307 #[expect(
308 unreachable_code,
309 reason = "panic before q! which is only for return type"
310 )]
311 fn cluster_membership_stream(
312 _location_id: &LocationId,
313 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
314 {
315 panic!("EmbeddedDeploy does not support cluster membership streams");
316 q!(unreachable!(
317 "EmbeddedDeploy does not support cluster membership streams"
318 ))
319 }
320
321 fn register_embedded_input(
322 env: &mut Self::InstantiateEnv,
323 location_key: LocationKey,
324 ident: &syn::Ident,
325 element_type: &syn::Type,
326 ) {
327 env.inputs
328 .entry(location_key)
329 .unwrap()
330 .or_default()
331 .push((ident.clone(), element_type.clone()));
332 }
333
334 fn register_embedded_output(
335 env: &mut Self::InstantiateEnv,
336 location_key: LocationKey,
337 ident: &syn::Ident,
338 element_type: &syn::Type,
339 ) {
340 env.outputs
341 .entry(location_key)
342 .unwrap()
343 .or_default()
344 .push((ident.clone(), element_type.clone()));
345 }
346}
347
348impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
349 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
380 let mut env = EmbeddedInstantiateEnv::default();
381 let compiled = self.compile_internal(&mut env);
382
383 let root = crate::staging_util::get_this_crate();
384 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
385
386 let mut items: Vec<syn::Item> = Vec::new();
387
388 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
390 location_keys.sort();
391
392 for location_key in location_keys {
393 let graph = &compiled.all_dfir()[location_key];
394
395 let fn_name = self
397 .processes
398 .get(location_key)
399 .map(|n| &n.fn_name)
400 .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
401 .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
402 .expect("location key not found in any node map");
403
404 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
405
406 let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
408 loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
409
410 let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
412 loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
413
414 let mut diagnostics = Diagnostics::new();
415 let dfir_tokens = graph
416 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
417 .expect("DFIR code generation failed with diagnostics.");
418
419 let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
421 .iter()
422 .map(|(ident, element_type)| {
423 quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
424 })
425 .collect();
426
427 let has_outputs = !loc_outputs.is_empty();
428
429 if has_outputs {
430 let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
431
432 let output_generic_idents: Vec<syn::Ident> = loc_outputs
434 .iter()
435 .enumerate()
436 .map(|(i, _)| quote::format_ident!("__Out{}", i))
437 .collect();
438
439 let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
440 .iter()
441 .zip(output_generic_idents.iter())
442 .map(|((ident, _), generic)| {
443 quote! { pub #ident: #generic }
444 })
445 .collect();
446
447 let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
448 .iter()
449 .zip(output_generic_idents.iter())
450 .map(|((_, element_type), generic)| {
451 quote! { #generic: FnMut(#element_type) }
452 })
453 .collect();
454
455 let fn_generics: Vec<proc_macro2::TokenStream> = loc_outputs
456 .iter()
457 .zip(output_generic_idents.iter())
458 .map(|((_, element_type), generic)| {
459 quote! { #generic: FnMut(#element_type) + 'a }
460 })
461 .collect();
462
463 let output_param = quote! {
464 __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
465 };
466
467 let output_destructure: Vec<proc_macro2::TokenStream> = loc_outputs
468 .iter()
469 .map(|(ident, _)| {
470 quote! { let mut #ident = &mut __outputs.#ident; }
471 })
472 .collect();
473
474 let all_params: Vec<proc_macro2::TokenStream> = input_params
475 .into_iter()
476 .chain(std::iter::once(output_param))
477 .collect();
478
479 let output_mod: syn::Item = syn::parse_quote! {
481 pub mod #fn_ident {
482 pub struct #output_struct_ident<#(#struct_generics),*> {
483 #(#struct_fields),*
484 }
485 }
486 };
487 items.push(output_mod);
488
489 let func: syn::Item = syn::parse_quote! {
490 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
491 pub fn #fn_ident<'a, #(#fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
492 #(#output_destructure)*
493 #dfir_tokens
494 }
495 };
496 items.push(func);
497 } else {
498 let func: syn::Item = syn::parse_quote! {
499 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
500 pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
501 #dfir_tokens
502 }
503 };
504 items.push(func);
505 }
506 }
507
508 syn::parse_quote! {
509 use #orig_crate_name::__staged::__deps::*;
510 use #root::prelude::*;
511 use #root::runtime_support::dfir_rs as __root_dfir_rs;
512 pub use #orig_crate_name::__staged;
513
514 #( #items )*
515 }
516 }
517}