hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26
27use super::deploy_runtime::*;
28use crate::compile::builder::ExternalPortId;
29use crate::compile::deploy_provider::{
30    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
31};
32use crate::compile::trybuild::generate::{
33    HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
34};
35use crate::location::dynamic::LocationId;
36use crate::location::member_id::TaglessMemberId;
37use crate::location::{MembershipEvent, NetworkHint};
38use crate::staging_util::get_this_crate;
39
40/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
41///
42/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
43/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
44pub enum HydroDeploy {}
45
46impl<'a> Deploy<'a> for HydroDeploy {
47    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
48    type InstantiateEnv = Deployment;
49
50    type Process = DeployNode;
51    type Cluster = DeployCluster;
52    type External = DeployExternal;
53
54    fn o2o_sink_source(
55        _p1: &Self::Process,
56        p1_port: &<Self::Process as Node>::Port,
57        _p2: &Self::Process,
58        p2_port: &<Self::Process as Node>::Port,
59    ) -> (syn::Expr, syn::Expr) {
60        let p1_port = p1_port.as_str();
61        let p2_port = p2_port.as_str();
62        deploy_o2o(
63            RuntimeData::new("__hydro_lang_trybuild_cli"),
64            p1_port,
65            p2_port,
66        )
67    }
68
69    fn o2o_connect(
70        p1: &Self::Process,
71        p1_port: &<Self::Process as Node>::Port,
72        p2: &Self::Process,
73        p2_port: &<Self::Process as Node>::Port,
74    ) -> Box<dyn FnOnce()> {
75        let p1 = p1.clone();
76        let p1_port = p1_port.clone();
77        let p2 = p2.clone();
78        let p2_port = p2_port.clone();
79
80        Box::new(move || {
81            let self_underlying_borrow = p1.underlying.borrow();
82            let self_underlying = self_underlying_borrow.as_ref().unwrap();
83            let source_port = self_underlying.get_port(p1_port.clone());
84
85            let other_underlying_borrow = p2.underlying.borrow();
86            let other_underlying = other_underlying_borrow.as_ref().unwrap();
87            let recipient_port = other_underlying.get_port(p2_port.clone());
88
89            source_port.send_to(&recipient_port)
90        })
91    }
92
93    fn o2m_sink_source(
94        _p1: &Self::Process,
95        p1_port: &<Self::Process as Node>::Port,
96        _c2: &Self::Cluster,
97        c2_port: &<Self::Cluster as Node>::Port,
98    ) -> (syn::Expr, syn::Expr) {
99        let p1_port = p1_port.as_str();
100        let c2_port = c2_port.as_str();
101        deploy_o2m(
102            RuntimeData::new("__hydro_lang_trybuild_cli"),
103            p1_port,
104            c2_port,
105        )
106    }
107
108    fn o2m_connect(
109        p1: &Self::Process,
110        p1_port: &<Self::Process as Node>::Port,
111        c2: &Self::Cluster,
112        c2_port: &<Self::Cluster as Node>::Port,
113    ) -> Box<dyn FnOnce()> {
114        let p1 = p1.clone();
115        let p1_port = p1_port.clone();
116        let c2 = c2.clone();
117        let c2_port = c2_port.clone();
118
119        Box::new(move || {
120            let self_underlying_borrow = p1.underlying.borrow();
121            let self_underlying = self_underlying_borrow.as_ref().unwrap();
122            let source_port = self_underlying.get_port(p1_port.clone());
123
124            let recipient_port = DemuxSink {
125                demux: c2
126                    .members
127                    .borrow()
128                    .iter()
129                    .enumerate()
130                    .map(|(id, c)| {
131                        (
132                            id as u32,
133                            Arc::new(c.underlying.get_port(c2_port.clone()))
134                                as Arc<dyn RustCrateSink + 'static>,
135                        )
136                    })
137                    .collect(),
138            };
139
140            source_port.send_to(&recipient_port)
141        })
142    }
143
144    fn m2o_sink_source(
145        _c1: &Self::Cluster,
146        c1_port: &<Self::Cluster as Node>::Port,
147        _p2: &Self::Process,
148        p2_port: &<Self::Process as Node>::Port,
149    ) -> (syn::Expr, syn::Expr) {
150        let c1_port = c1_port.as_str();
151        let p2_port = p2_port.as_str();
152        deploy_m2o(
153            RuntimeData::new("__hydro_lang_trybuild_cli"),
154            c1_port,
155            p2_port,
156        )
157    }
158
159    fn m2o_connect(
160        c1: &Self::Cluster,
161        c1_port: &<Self::Cluster as Node>::Port,
162        p2: &Self::Process,
163        p2_port: &<Self::Process as Node>::Port,
164    ) -> Box<dyn FnOnce()> {
165        let c1 = c1.clone();
166        let c1_port = c1_port.clone();
167        let p2 = p2.clone();
168        let p2_port = p2_port.clone();
169
170        Box::new(move || {
171            let other_underlying_borrow = p2.underlying.borrow();
172            let other_underlying = other_underlying_borrow.as_ref().unwrap();
173            let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
174
175            for (i, node) in c1.members.borrow().iter().enumerate() {
176                let source_port = node.underlying.get_port(c1_port.clone());
177
178                TaggedSource {
179                    source: Arc::new(source_port),
180                    tag: i as u32,
181                }
182                .send_to(&recipient_port);
183            }
184        })
185    }
186
187    fn m2m_sink_source(
188        _c1: &Self::Cluster,
189        c1_port: &<Self::Cluster as Node>::Port,
190        _c2: &Self::Cluster,
191        c2_port: &<Self::Cluster as Node>::Port,
192    ) -> (syn::Expr, syn::Expr) {
193        let c1_port = c1_port.as_str();
194        let c2_port = c2_port.as_str();
195        deploy_m2m(
196            RuntimeData::new("__hydro_lang_trybuild_cli"),
197            c1_port,
198            c2_port,
199        )
200    }
201
202    fn m2m_connect(
203        c1: &Self::Cluster,
204        c1_port: &<Self::Cluster as Node>::Port,
205        c2: &Self::Cluster,
206        c2_port: &<Self::Cluster as Node>::Port,
207    ) -> Box<dyn FnOnce()> {
208        let c1 = c1.clone();
209        let c1_port = c1_port.clone();
210        let c2 = c2.clone();
211        let c2_port = c2_port.clone();
212
213        Box::new(move || {
214            for (i, sender) in c1.members.borrow().iter().enumerate() {
215                let source_port = sender.underlying.get_port(c1_port.clone());
216
217                let recipient_port = DemuxSink {
218                    demux: c2
219                        .members
220                        .borrow()
221                        .iter()
222                        .enumerate()
223                        .map(|(id, c)| {
224                            (
225                                id as u32,
226                                Arc::new(c.underlying.get_port(c2_port.clone()).merge())
227                                    as Arc<dyn RustCrateSink + 'static>,
228                            )
229                        })
230                        .collect(),
231                };
232
233                TaggedSource {
234                    source: Arc::new(source_port),
235                    tag: i as u32,
236                }
237                .send_to(&recipient_port);
238            }
239        })
240    }
241
242    fn e2o_many_source(
243        extra_stmts: &mut Vec<syn::Stmt>,
244        _p2: &Self::Process,
245        p2_port: &<Self::Process as Node>::Port,
246        codec_type: &syn::Type,
247        shared_handle: String,
248    ) -> syn::Expr {
249        let connect_ident = syn::Ident::new(
250            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
251            Span::call_site(),
252        );
253        let source_ident = syn::Ident::new(
254            &format!("__hydro_deploy_many_{}_source", &shared_handle),
255            Span::call_site(),
256        );
257        let sink_ident = syn::Ident::new(
258            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
259            Span::call_site(),
260        );
261        let membership_ident = syn::Ident::new(
262            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
263            Span::call_site(),
264        );
265
266        let root = get_this_crate();
267
268        extra_stmts.push(syn::parse_quote! {
269            let #connect_ident = __hydro_lang_trybuild_cli
270                .port(#p2_port)
271                .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
272        });
273
274        extra_stmts.push(syn::parse_quote! {
275            let #source_ident = #connect_ident.source;
276        });
277
278        extra_stmts.push(syn::parse_quote! {
279            let #sink_ident = #connect_ident.sink;
280        });
281
282        extra_stmts.push(syn::parse_quote! {
283            let #membership_ident = #connect_ident.membership;
284        });
285
286        parse_quote!(#source_ident)
287    }
288
289    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
290        let sink_ident = syn::Ident::new(
291            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
292            Span::call_site(),
293        );
294        parse_quote!(#sink_ident)
295    }
296
297    fn e2o_source(
298        extra_stmts: &mut Vec<syn::Stmt>,
299        _p1: &Self::External,
300        _p1_port: &<Self::External as Node>::Port,
301        _p2: &Self::Process,
302        p2_port: &<Self::Process as Node>::Port,
303        codec_type: &syn::Type,
304        shared_handle: String,
305    ) -> syn::Expr {
306        let connect_ident = syn::Ident::new(
307            &format!("__hydro_deploy_{}_connect", &shared_handle),
308            Span::call_site(),
309        );
310        let source_ident = syn::Ident::new(
311            &format!("__hydro_deploy_{}_source", &shared_handle),
312            Span::call_site(),
313        );
314        let sink_ident = syn::Ident::new(
315            &format!("__hydro_deploy_{}_sink", &shared_handle),
316            Span::call_site(),
317        );
318
319        let root = get_this_crate();
320
321        extra_stmts.push(syn::parse_quote! {
322            let #connect_ident = __hydro_lang_trybuild_cli
323                .port(#p2_port)
324                .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
325        });
326
327        extra_stmts.push(syn::parse_quote! {
328            let #source_ident = #connect_ident.source;
329        });
330
331        extra_stmts.push(syn::parse_quote! {
332            let #sink_ident = #connect_ident.sink;
333        });
334
335        parse_quote!(#source_ident)
336    }
337
338    fn e2o_connect(
339        p1: &Self::External,
340        p1_port: &<Self::External as Node>::Port,
341        p2: &Self::Process,
342        p2_port: &<Self::Process as Node>::Port,
343        _many: bool,
344        server_hint: NetworkHint,
345    ) -> Box<dyn FnOnce()> {
346        let p1 = p1.clone();
347        let p1_port = p1_port.clone();
348        let p2 = p2.clone();
349        let p2_port = p2_port.clone();
350
351        Box::new(move || {
352            let self_underlying_borrow = p1.underlying.borrow();
353            let self_underlying = self_underlying_borrow.as_ref().unwrap();
354            let source_port = self_underlying.declare_many_client();
355
356            let other_underlying_borrow = p2.underlying.borrow();
357            let other_underlying = other_underlying_borrow.as_ref().unwrap();
358            let recipient_port = other_underlying.get_port_with_hint(
359                p2_port.clone(),
360                match server_hint {
361                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
362                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
363                },
364            );
365
366            source_port.send_to(&recipient_port);
367
368            p1.client_ports
369                .borrow_mut()
370                .insert(p1_port.clone(), source_port);
371        })
372    }
373
374    fn o2e_sink(
375        _p1: &Self::Process,
376        _p1_port: &<Self::Process as Node>::Port,
377        _p2: &Self::External,
378        _p2_port: &<Self::External as Node>::Port,
379        shared_handle: String,
380    ) -> syn::Expr {
381        let sink_ident = syn::Ident::new(
382            &format!("__hydro_deploy_{}_sink", &shared_handle),
383            Span::call_site(),
384        );
385        parse_quote!(#sink_ident)
386    }
387
388    fn cluster_ids(
389        of_cluster: usize,
390    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
391        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
392    }
393
394    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
395        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
396    }
397
398    fn cluster_membership_stream(
399        location_id: &LocationId,
400    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
401    {
402        cluster_membership_stream(location_id)
403    }
404}
405
406#[expect(missing_docs, reason = "TODO")]
407pub trait DeployCrateWrapper {
408    fn underlying(&self) -> Arc<RustCrateService>;
409
410    fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
411        self.underlying().stdout()
412    }
413
414    fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
415        self.underlying().stderr()
416    }
417
418    fn stdout_filter(
419        &self,
420        prefix: impl Into<String>,
421    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
422        self.underlying().stdout_filter(prefix.into())
423    }
424
425    fn stderr_filter(
426        &self,
427        prefix: impl Into<String>,
428    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
429        self.underlying().stderr_filter(prefix.into())
430    }
431}
432
433#[expect(missing_docs, reason = "TODO")]
434#[derive(Clone)]
435pub struct TrybuildHost {
436    host: Arc<dyn Host>,
437    display_name: Option<String>,
438    rustflags: Option<String>,
439    profile: Option<String>,
440    additional_hydro_features: Vec<String>,
441    features: Vec<String>,
442    tracing: Option<TracingOptions>,
443    build_envs: Vec<(String, String)>,
444    name_hint: Option<String>,
445    cluster_idx: Option<usize>,
446}
447
448impl From<Arc<dyn Host>> for TrybuildHost {
449    fn from(host: Arc<dyn Host>) -> Self {
450        Self {
451            host,
452            display_name: None,
453            rustflags: None,
454            profile: None,
455            additional_hydro_features: vec![],
456            features: vec![],
457            tracing: None,
458            build_envs: vec![],
459            name_hint: None,
460            cluster_idx: None,
461        }
462    }
463}
464
465impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
466    fn from(host: Arc<H>) -> Self {
467        Self {
468            host,
469            display_name: None,
470            rustflags: None,
471            profile: None,
472            additional_hydro_features: vec![],
473            features: vec![],
474            tracing: None,
475            build_envs: vec![],
476            name_hint: None,
477            cluster_idx: None,
478        }
479    }
480}
481
482#[expect(missing_docs, reason = "TODO")]
483impl TrybuildHost {
484    pub fn new(host: Arc<dyn Host>) -> Self {
485        Self {
486            host,
487            display_name: None,
488            rustflags: None,
489            profile: None,
490            additional_hydro_features: vec![],
491            features: vec![],
492            tracing: None,
493            build_envs: vec![],
494            name_hint: None,
495            cluster_idx: None,
496        }
497    }
498
499    pub fn display_name(self, display_name: impl Into<String>) -> Self {
500        if self.display_name.is_some() {
501            panic!("{} already set", name_of!(display_name in Self));
502        }
503
504        Self {
505            display_name: Some(display_name.into()),
506            ..self
507        }
508    }
509
510    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
511        if self.rustflags.is_some() {
512            panic!("{} already set", name_of!(rustflags in Self));
513        }
514
515        Self {
516            rustflags: Some(rustflags.into()),
517            ..self
518        }
519    }
520
521    pub fn profile(self, profile: impl Into<String>) -> Self {
522        if self.profile.is_some() {
523            panic!("{} already set", name_of!(profile in Self));
524        }
525
526        Self {
527            profile: Some(profile.into()),
528            ..self
529        }
530    }
531
532    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
533        Self {
534            additional_hydro_features,
535            ..self
536        }
537    }
538
539    pub fn features(self, features: Vec<String>) -> Self {
540        Self {
541            features: self.features.into_iter().chain(features).collect(),
542            ..self
543        }
544    }
545
546    pub fn tracing(self, tracing: TracingOptions) -> Self {
547        if self.tracing.is_some() {
548            panic!("{} already set", name_of!(tracing in Self));
549        }
550
551        Self {
552            tracing: Some(tracing),
553            ..self
554        }
555    }
556
557    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
558        Self {
559            build_envs: self
560                .build_envs
561                .into_iter()
562                .chain(std::iter::once((key.into(), value.into())))
563                .collect(),
564            ..self
565        }
566    }
567}
568
569impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
570    type ProcessSpec = TrybuildHost;
571    fn into_process_spec(self) -> TrybuildHost {
572        TrybuildHost {
573            host: self,
574            display_name: None,
575            rustflags: None,
576            profile: None,
577            additional_hydro_features: vec![],
578            features: vec![],
579            tracing: None,
580            build_envs: vec![],
581            name_hint: None,
582            cluster_idx: None,
583        }
584    }
585}
586
587impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
588    type ProcessSpec = TrybuildHost;
589    fn into_process_spec(self) -> TrybuildHost {
590        TrybuildHost {
591            host: self,
592            display_name: None,
593            rustflags: None,
594            profile: None,
595            additional_hydro_features: vec![],
596            features: vec![],
597            tracing: None,
598            build_envs: vec![],
599            name_hint: None,
600            cluster_idx: None,
601        }
602    }
603}
604
605#[expect(missing_docs, reason = "TODO")]
606#[derive(Clone)]
607pub struct DeployExternal {
608    next_port: Rc<RefCell<usize>>,
609    host: Arc<dyn Host>,
610    underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
611    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
612    allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
613}
614
615impl DeployExternal {
616    pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
617        self.client_ports
618            .borrow()
619            .get(
620                self.allocated_ports
621                    .borrow()
622                    .get(&external_port_id)
623                    .unwrap(),
624            )
625            .unwrap()
626            .clone()
627    }
628}
629
630impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
631    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
632        assert!(
633            self.allocated_ports
634                .borrow_mut()
635                .insert(external_port_id, port.clone())
636                .is_none_or(|old| old == port)
637        );
638    }
639
640    fn as_bytes_bidi(
641        &self,
642        external_port_id: ExternalPortId,
643    ) -> impl Future<
644        Output = (
645            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
646            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
647        ),
648    > + 'a {
649        let port = self.raw_port(external_port_id);
650
651        async move {
652            let (source, sink) = port.connect().await.into_source_sink();
653            (
654                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
655                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
656            )
657        }
658    }
659
660    fn as_bincode_bidi<InT, OutT>(
661        &self,
662        external_port_id: ExternalPortId,
663    ) -> impl Future<
664        Output = (
665            Pin<Box<dyn Stream<Item = OutT>>>,
666            Pin<Box<dyn Sink<InT, Error = Error>>>,
667        ),
668    > + 'a
669    where
670        InT: Serialize + 'static,
671        OutT: DeserializeOwned + 'static,
672    {
673        let port = self.raw_port(external_port_id);
674        async move {
675            let (source, sink) = port.connect().await.into_source_sink();
676            (
677                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
678                    as Pin<Box<dyn Stream<Item = OutT>>>,
679                Box::pin(
680                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
681                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
682            )
683        }
684    }
685
686    fn as_bincode_sink<T: Serialize + 'static>(
687        &self,
688        external_port_id: ExternalPortId,
689    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
690        let port = self.raw_port(external_port_id);
691        async move {
692            let sink = port.connect().await.into_sink();
693            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
694                as Pin<Box<dyn Sink<T, Error = Error>>>
695        }
696    }
697
698    fn as_bincode_source<T: DeserializeOwned + 'static>(
699        &self,
700        external_port_id: ExternalPortId,
701    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
702        let port = self.raw_port(external_port_id);
703        async move {
704            let source = port.connect().await.into_source();
705            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
706                as Pin<Box<dyn Stream<Item = T>>>
707        }
708    }
709}
710
711impl Node for DeployExternal {
712    type Port = String;
713    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
714    type InstantiateEnv = Deployment;
715
716    fn next_port(&self) -> Self::Port {
717        let next_port = *self.next_port.borrow();
718        *self.next_port.borrow_mut() += 1;
719
720        format!("port_{}", next_port)
721    }
722
723    fn instantiate(
724        &self,
725        env: &mut Self::InstantiateEnv,
726        _meta: &mut Self::Meta,
727        _graph: DfirGraph,
728        _extra_stmts: Vec<syn::Stmt>,
729    ) {
730        let service = env.CustomService(self.host.clone(), vec![]);
731        *self.underlying.borrow_mut() = Some(service);
732    }
733
734    fn update_meta(&self, _meta: &Self::Meta) {}
735}
736
737impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
738    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
739        DeployExternal {
740            next_port: Rc::new(RefCell::new(0)),
741            host: self,
742            underlying: Rc::new(RefCell::new(None)),
743            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
744            client_ports: Rc::new(RefCell::new(HashMap::new())),
745        }
746    }
747}
748
749impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
750    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
751        DeployExternal {
752            next_port: Rc::new(RefCell::new(0)),
753            host: self,
754            underlying: Rc::new(RefCell::new(None)),
755            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
756            client_ports: Rc::new(RefCell::new(HashMap::new())),
757        }
758    }
759}
760
761pub(crate) enum CrateOrTrybuild {
762    Crate(RustCrate, Arc<dyn Host>),
763    Trybuild(TrybuildHost),
764}
765
766#[expect(missing_docs, reason = "TODO")]
767#[derive(Clone)]
768pub struct DeployNode {
769    next_port: Rc<RefCell<usize>>,
770    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
771    underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
772}
773
774impl DeployCrateWrapper for DeployNode {
775    fn underlying(&self) -> Arc<RustCrateService> {
776        Arc::clone(self.underlying.borrow().as_ref().unwrap())
777    }
778}
779
780impl Node for DeployNode {
781    type Port = String;
782    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
783    type InstantiateEnv = Deployment;
784
785    fn next_port(&self) -> String {
786        let next_port = *self.next_port.borrow();
787        *self.next_port.borrow_mut() += 1;
788
789        format!("port_{}", next_port)
790    }
791
792    fn update_meta(&self, meta: &Self::Meta) {
793        let underlying_node = self.underlying.borrow();
794        underlying_node.as_ref().unwrap().update_meta(HydroMeta {
795            clusters: meta.clone(),
796            cluster_id: None,
797        });
798    }
799
800    fn instantiate(
801        &self,
802        env: &mut Self::InstantiateEnv,
803        _meta: &mut Self::Meta,
804        graph: DfirGraph,
805        extra_stmts: Vec<syn::Stmt>,
806    ) {
807        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
808            CrateOrTrybuild::Crate(c, host) => (c, host),
809            CrateOrTrybuild::Trybuild(trybuild) => {
810                // Determine linking mode based on host target type
811                let linking_mode = if !cfg!(target_os = "windows")
812                    && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
813                {
814                    // When compiling for local, prefer dynamic linking to reduce binary size
815                    // Windows is currently not supported due to https://github.com/bevyengine/bevy/pull/2016
816                    LinkingMode::Dynamic
817                } else {
818                    LinkingMode::Static
819                };
820                let (bin_name, config) = create_graph_trybuild(
821                    graph,
822                    extra_stmts,
823                    &trybuild.name_hint,
824                    false,
825                    linking_mode,
826                );
827                let host = trybuild.host.clone();
828                (
829                    create_trybuild_service(
830                        trybuild,
831                        &config.project_dir,
832                        &config.target_dir,
833                        &config.features,
834                        &bin_name,
835                        &config.linking_mode,
836                    ),
837                    host,
838                )
839            }
840        };
841
842        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
843    }
844}
845
846#[expect(missing_docs, reason = "TODO")]
847#[derive(Clone)]
848pub struct DeployClusterNode {
849    underlying: Arc<RustCrateService>,
850}
851
852impl DeployCrateWrapper for DeployClusterNode {
853    fn underlying(&self) -> Arc<RustCrateService> {
854        self.underlying.clone()
855    }
856}
857#[expect(missing_docs, reason = "TODO")]
858#[derive(Clone)]
859pub struct DeployCluster {
860    id: usize,
861    next_port: Rc<RefCell<usize>>,
862    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
863    members: Rc<RefCell<Vec<DeployClusterNode>>>,
864    name_hint: Option<String>,
865}
866
867impl DeployCluster {
868    #[expect(missing_docs, reason = "TODO")]
869    pub fn members(&self) -> Vec<DeployClusterNode> {
870        self.members.borrow().clone()
871    }
872}
873
874impl Node for DeployCluster {
875    type Port = String;
876    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
877    type InstantiateEnv = Deployment;
878
879    fn next_port(&self) -> String {
880        let next_port = *self.next_port.borrow();
881        *self.next_port.borrow_mut() += 1;
882
883        format!("port_{}", next_port)
884    }
885
886    fn instantiate(
887        &self,
888        env: &mut Self::InstantiateEnv,
889        meta: &mut Self::Meta,
890        graph: DfirGraph,
891        extra_stmts: Vec<syn::Stmt>,
892    ) {
893        let has_trybuild = self
894            .cluster_spec
895            .borrow()
896            .as_ref()
897            .unwrap()
898            .iter()
899            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
900
901        // For clusters, use static linking if ANY host is non-local (conservative approach)
902        let linking_mode = if !cfg!(target_os = "windows")
903            && self
904                .cluster_spec
905                .borrow()
906                .as_ref()
907                .unwrap()
908                .iter()
909                .all(|spec| match spec {
910                    CrateOrTrybuild::Crate(_, _) => true, // crates handle their own linking
911                    CrateOrTrybuild::Trybuild(t) => {
912                        t.host.target_type() == hydro_deploy::HostTargetType::Local
913                    }
914                }) {
915            // See comment above for Windows exception
916            LinkingMode::Dynamic
917        } else {
918            LinkingMode::Static
919        };
920
921        let maybe_trybuild = if has_trybuild {
922            Some(create_graph_trybuild(
923                graph,
924                extra_stmts,
925                &self.name_hint,
926                false,
927                linking_mode,
928            ))
929        } else {
930            None
931        };
932
933        let cluster_nodes = self
934            .cluster_spec
935            .borrow_mut()
936            .take()
937            .unwrap()
938            .into_iter()
939            .map(|spec| {
940                let (service, host) = match spec {
941                    CrateOrTrybuild::Crate(c, host) => (c, host),
942                    CrateOrTrybuild::Trybuild(trybuild) => {
943                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
944                        let host = trybuild.host.clone();
945                        (
946                            create_trybuild_service(
947                                trybuild,
948                                &config.project_dir,
949                                &config.target_dir,
950                                &config.features,
951                                bin_name,
952                                &config.linking_mode,
953                            ),
954                            host,
955                        )
956                    }
957                };
958
959                env.add_service(service, host)
960            })
961            .collect::<Vec<_>>();
962        meta.insert(
963            self.id,
964            (0..(cluster_nodes.len() as u32))
965                .map(TaglessMemberId::from_raw_id)
966                .collect(),
967        );
968        *self.members.borrow_mut() = cluster_nodes
969            .into_iter()
970            .map(|n| DeployClusterNode { underlying: n })
971            .collect();
972    }
973
974    fn update_meta(&self, meta: &Self::Meta) {
975        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
976            node.underlying.update_meta(HydroMeta {
977                clusters: meta.clone(),
978                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
979            });
980        }
981    }
982}
983
984#[expect(missing_docs, reason = "TODO")]
985#[derive(Clone)]
986pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
987
988impl DeployProcessSpec {
989    #[expect(missing_docs, reason = "TODO")]
990    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
991        Self(t, host)
992    }
993}
994
995impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
996    fn build(self, _id: usize, _name_hint: &str) -> DeployNode {
997        DeployNode {
998            next_port: Rc::new(RefCell::new(0)),
999            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1000            underlying: Rc::new(RefCell::new(None)),
1001        }
1002    }
1003}
1004
1005impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1006    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
1007        self.name_hint = Some(format!("{} (process {id})", name_hint));
1008        DeployNode {
1009            next_port: Rc::new(RefCell::new(0)),
1010            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1011            underlying: Rc::new(RefCell::new(None)),
1012        }
1013    }
1014}
1015
1016#[expect(missing_docs, reason = "TODO")]
1017#[derive(Clone)]
1018pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1019
1020impl DeployClusterSpec {
1021    #[expect(missing_docs, reason = "TODO")]
1022    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1023        Self(crates)
1024    }
1025}
1026
1027impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1028    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1029        DeployCluster {
1030            id,
1031            next_port: Rc::new(RefCell::new(0)),
1032            cluster_spec: Rc::new(RefCell::new(Some(
1033                self.0
1034                    .into_iter()
1035                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1036                    .collect(),
1037            ))),
1038            members: Rc::new(RefCell::new(vec![])),
1039            name_hint: None,
1040        }
1041    }
1042}
1043
1044impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1045    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1046        let name_hint = format!("{} (cluster {id})", name_hint);
1047        DeployCluster {
1048            id,
1049            next_port: Rc::new(RefCell::new(0)),
1050            cluster_spec: Rc::new(RefCell::new(Some(
1051                self.into_iter()
1052                    .enumerate()
1053                    .map(|(idx, b)| {
1054                        let mut b = b.into();
1055                        b.name_hint = Some(name_hint.clone());
1056                        b.cluster_idx = Some(idx);
1057                        CrateOrTrybuild::Trybuild(b)
1058                    })
1059                    .collect(),
1060            ))),
1061            members: Rc::new(RefCell::new(vec![])),
1062            name_hint: Some(name_hint),
1063        }
1064    }
1065}
1066
1067fn create_trybuild_service(
1068    trybuild: TrybuildHost,
1069    dir: &std::path::Path,
1070    target_dir: &std::path::PathBuf,
1071    features: &Option<Vec<String>>,
1072    bin_name: &str,
1073    linking_mode: &LinkingMode,
1074) -> RustCrate {
1075    // For dynamic linking, use the dylib-examples crate; for static, use the base crate
1076    let crate_dir = match linking_mode {
1077        LinkingMode::Dynamic => dir.join("dylib-examples"),
1078        LinkingMode::Static => dir.to_path_buf(),
1079    };
1080
1081    let mut ret = RustCrate::new(&crate_dir)
1082        .target_dir(target_dir)
1083        .example(bin_name)
1084        .no_default_features();
1085
1086    ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1087
1088    if let Some(display_name) = trybuild.display_name {
1089        ret = ret.display_name(display_name);
1090    } else if let Some(name_hint) = trybuild.name_hint {
1091        if let Some(cluster_idx) = trybuild.cluster_idx {
1092            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1093        } else {
1094            ret = ret.display_name(name_hint);
1095        }
1096    }
1097
1098    if let Some(rustflags) = trybuild.rustflags {
1099        ret = ret.rustflags(rustflags);
1100    }
1101
1102    if let Some(profile) = trybuild.profile {
1103        ret = ret.profile(profile);
1104    }
1105
1106    if let Some(tracing) = trybuild.tracing {
1107        ret = ret.tracing(tracing);
1108    }
1109
1110    ret = ret.features(
1111        vec!["hydro___feature_deploy_integration".to_string()]
1112            .into_iter()
1113            .chain(
1114                trybuild
1115                    .additional_hydro_features
1116                    .into_iter()
1117                    .map(|runtime_feature| {
1118                        assert!(
1119                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1120                            "{runtime_feature} is not a valid Hydro runtime feature"
1121                        );
1122                        format!("hydro___feature_{runtime_feature}")
1123                    }),
1124            )
1125            .chain(trybuild.features),
1126    );
1127
1128    for (key, value) in trybuild.build_envs {
1129        ret = ret.build_env(key, value);
1130    }
1131
1132    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1133    ret = ret.config("build.incremental = false");
1134
1135    if let Some(features) = features {
1136        ret = ret.features(features);
1137    }
1138
1139    ret
1140}