hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26use tokio::sync::RwLock;
27
28use super::deploy_runtime::*;
29use crate::compile::deploy_provider::{
30    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
31};
32use crate::compile::trybuild::generate::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
33use crate::location::NetworkHint;
34use crate::staging_util::get_this_crate;
35
36/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
37///
38/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
39/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
40pub enum HydroDeploy {}
41
42impl<'a> Deploy<'a> for HydroDeploy {
43    type InstantiateEnv = Deployment;
44    type CompileEnv = ();
45    type Process = DeployNode;
46    type Cluster = DeployCluster;
47    type External = DeployExternal;
48    type Meta = HashMap<usize, Vec<u32>>;
49    type GraphId = ();
50    type Port = String;
51    type ExternalRawPort = CustomClientPort;
52
53    fn allocate_process_port(process: &Self::Process) -> Self::Port {
54        process.next_port()
55    }
56
57    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
58        cluster.next_port()
59    }
60
61    fn allocate_external_port(external: &Self::External) -> Self::Port {
62        external.next_port()
63    }
64
65    fn o2o_sink_source(
66        _env: &(),
67        _p1: &Self::Process,
68        p1_port: &Self::Port,
69        _p2: &Self::Process,
70        p2_port: &Self::Port,
71    ) -> (syn::Expr, syn::Expr) {
72        let p1_port = p1_port.as_str();
73        let p2_port = p2_port.as_str();
74        deploy_o2o(
75            RuntimeData::new("__hydro_lang_trybuild_cli"),
76            p1_port,
77            p2_port,
78        )
79    }
80
81    fn o2o_connect(
82        p1: &Self::Process,
83        p1_port: &Self::Port,
84        p2: &Self::Process,
85        p2_port: &Self::Port,
86    ) -> Box<dyn FnOnce()> {
87        let p1 = p1.clone();
88        let p1_port = p1_port.clone();
89        let p2 = p2.clone();
90        let p2_port = p2_port.clone();
91
92        Box::new(move || {
93            let self_underlying_borrow = p1.underlying.borrow();
94            let self_underlying = self_underlying_borrow.as_ref().unwrap();
95            let source_port = self_underlying
96                .try_read()
97                .unwrap()
98                .get_port(p1_port.clone(), self_underlying);
99
100            let other_underlying_borrow = p2.underlying.borrow();
101            let other_underlying = other_underlying_borrow.as_ref().unwrap();
102            let recipient_port = other_underlying
103                .try_read()
104                .unwrap()
105                .get_port(p2_port.clone(), other_underlying);
106
107            source_port.send_to(&recipient_port)
108        })
109    }
110
111    fn o2m_sink_source(
112        _env: &(),
113        _p1: &Self::Process,
114        p1_port: &Self::Port,
115        _c2: &Self::Cluster,
116        c2_port: &Self::Port,
117    ) -> (syn::Expr, syn::Expr) {
118        let p1_port = p1_port.as_str();
119        let c2_port = c2_port.as_str();
120        deploy_o2m(
121            RuntimeData::new("__hydro_lang_trybuild_cli"),
122            p1_port,
123            c2_port,
124        )
125    }
126
127    fn o2m_connect(
128        p1: &Self::Process,
129        p1_port: &Self::Port,
130        c2: &Self::Cluster,
131        c2_port: &Self::Port,
132    ) -> Box<dyn FnOnce()> {
133        let p1 = p1.clone();
134        let p1_port = p1_port.clone();
135        let c2 = c2.clone();
136        let c2_port = c2_port.clone();
137
138        Box::new(move || {
139            let self_underlying_borrow = p1.underlying.borrow();
140            let self_underlying = self_underlying_borrow.as_ref().unwrap();
141            let source_port = self_underlying
142                .try_read()
143                .unwrap()
144                .get_port(p1_port.clone(), self_underlying);
145
146            let recipient_port = DemuxSink {
147                demux: c2
148                    .members
149                    .borrow()
150                    .iter()
151                    .enumerate()
152                    .map(|(id, c)| {
153                        let n = c.underlying.try_read().unwrap();
154                        (
155                            id as u32,
156                            Arc::new(n.get_port(c2_port.clone(), &c.underlying))
157                                as Arc<dyn RustCrateSink + 'static>,
158                        )
159                    })
160                    .collect(),
161            };
162
163            source_port.send_to(&recipient_port)
164        })
165    }
166
167    fn m2o_sink_source(
168        _env: &(),
169        _c1: &Self::Cluster,
170        c1_port: &Self::Port,
171        _p2: &Self::Process,
172        p2_port: &Self::Port,
173    ) -> (syn::Expr, syn::Expr) {
174        let c1_port = c1_port.as_str();
175        let p2_port = p2_port.as_str();
176        deploy_m2o(
177            RuntimeData::new("__hydro_lang_trybuild_cli"),
178            c1_port,
179            p2_port,
180        )
181    }
182
183    fn m2o_connect(
184        c1: &Self::Cluster,
185        c1_port: &Self::Port,
186        p2: &Self::Process,
187        p2_port: &Self::Port,
188    ) -> Box<dyn FnOnce()> {
189        let c1 = c1.clone();
190        let c1_port = c1_port.clone();
191        let p2 = p2.clone();
192        let p2_port = p2_port.clone();
193
194        Box::new(move || {
195            let other_underlying_borrow = p2.underlying.borrow();
196            let other_underlying = other_underlying_borrow.as_ref().unwrap();
197            let recipient_port = other_underlying
198                .try_read()
199                .unwrap()
200                .get_port(p2_port.clone(), other_underlying)
201                .merge();
202
203            for (i, node) in c1.members.borrow().iter().enumerate() {
204                let source_port = node
205                    .underlying
206                    .try_read()
207                    .unwrap()
208                    .get_port(c1_port.clone(), &node.underlying);
209
210                TaggedSource {
211                    source: Arc::new(source_port),
212                    tag: i as u32,
213                }
214                .send_to(&recipient_port);
215            }
216        })
217    }
218
219    fn m2m_sink_source(
220        _env: &(),
221        _c1: &Self::Cluster,
222        c1_port: &Self::Port,
223        _c2: &Self::Cluster,
224        c2_port: &Self::Port,
225    ) -> (syn::Expr, syn::Expr) {
226        let c1_port = c1_port.as_str();
227        let c2_port = c2_port.as_str();
228        deploy_m2m(
229            RuntimeData::new("__hydro_lang_trybuild_cli"),
230            c1_port,
231            c2_port,
232        )
233    }
234
235    fn m2m_connect(
236        c1: &Self::Cluster,
237        c1_port: &Self::Port,
238        c2: &Self::Cluster,
239        c2_port: &Self::Port,
240    ) -> Box<dyn FnOnce()> {
241        let c1 = c1.clone();
242        let c1_port = c1_port.clone();
243        let c2 = c2.clone();
244        let c2_port = c2_port.clone();
245
246        Box::new(move || {
247            for (i, sender) in c1.members.borrow().iter().enumerate() {
248                let source_port = sender
249                    .underlying
250                    .try_read()
251                    .unwrap()
252                    .get_port(c1_port.clone(), &sender.underlying);
253
254                let recipient_port = DemuxSink {
255                    demux: c2
256                        .members
257                        .borrow()
258                        .iter()
259                        .enumerate()
260                        .map(|(id, c)| {
261                            let n = c.underlying.try_read().unwrap();
262                            (
263                                id as u32,
264                                Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
265                                    as Arc<dyn RustCrateSink + 'static>,
266                            )
267                        })
268                        .collect(),
269                };
270
271                TaggedSource {
272                    source: Arc::new(source_port),
273                    tag: i as u32,
274                }
275                .send_to(&recipient_port);
276            }
277        })
278    }
279
280    fn e2o_many_source(
281        _compile_env: &Self::CompileEnv,
282        extra_stmts: &mut Vec<syn::Stmt>,
283        _p2: &Self::Process,
284        p2_port: &Self::Port,
285        codec_type: &syn::Type,
286        shared_handle: String,
287    ) -> syn::Expr {
288        let connect_ident = syn::Ident::new(
289            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
290            Span::call_site(),
291        );
292        let source_ident = syn::Ident::new(
293            &format!("__hydro_deploy_many_{}_source", &shared_handle),
294            Span::call_site(),
295        );
296        let sink_ident = syn::Ident::new(
297            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
298            Span::call_site(),
299        );
300        let membership_ident = syn::Ident::new(
301            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
302            Span::call_site(),
303        );
304
305        let root = get_this_crate();
306
307        extra_stmts.push(syn::parse_quote! {
308            let #connect_ident = __hydro_lang_trybuild_cli
309                .port(#p2_port)
310                .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
311        });
312
313        extra_stmts.push(syn::parse_quote! {
314            let #source_ident = #connect_ident.source;
315        });
316
317        extra_stmts.push(syn::parse_quote! {
318            let #sink_ident = #connect_ident.sink;
319        });
320
321        extra_stmts.push(syn::parse_quote! {
322            let #membership_ident = #connect_ident.membership;
323        });
324
325        parse_quote!(#source_ident)
326    }
327
328    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
329        let sink_ident = syn::Ident::new(
330            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
331            Span::call_site(),
332        );
333        parse_quote!(#sink_ident)
334    }
335
336    fn e2o_source(
337        _compile_env: &Self::CompileEnv,
338        extra_stmts: &mut Vec<syn::Stmt>,
339        _p1: &Self::External,
340        _p1_port: &Self::Port,
341        _p2: &Self::Process,
342        p2_port: &Self::Port,
343        codec_type: &syn::Type,
344        shared_handle: String,
345    ) -> syn::Expr {
346        let connect_ident = syn::Ident::new(
347            &format!("__hydro_deploy_{}_connect", &shared_handle),
348            Span::call_site(),
349        );
350        let source_ident = syn::Ident::new(
351            &format!("__hydro_deploy_{}_source", &shared_handle),
352            Span::call_site(),
353        );
354        let sink_ident = syn::Ident::new(
355            &format!("__hydro_deploy_{}_sink", &shared_handle),
356            Span::call_site(),
357        );
358
359        let root = get_this_crate();
360
361        extra_stmts.push(syn::parse_quote! {
362            let #connect_ident = __hydro_lang_trybuild_cli
363                .port(#p2_port)
364                .connect::<#root::runtime_support::dfir_rs::util::deploy::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
365        });
366
367        extra_stmts.push(syn::parse_quote! {
368            let #source_ident = #connect_ident.source;
369        });
370
371        extra_stmts.push(syn::parse_quote! {
372            let #sink_ident = #connect_ident.sink;
373        });
374
375        parse_quote!(#source_ident)
376    }
377
378    fn e2o_connect(
379        p1: &Self::External,
380        p1_port: &Self::Port,
381        p2: &Self::Process,
382        p2_port: &Self::Port,
383        _many: bool,
384        server_hint: NetworkHint,
385    ) -> Box<dyn FnOnce()> {
386        let p1 = p1.clone();
387        let p1_port = p1_port.clone();
388        let p2 = p2.clone();
389        let p2_port = p2_port.clone();
390
391        Box::new(move || {
392            let self_underlying_borrow = p1.underlying.borrow();
393            let self_underlying = self_underlying_borrow.as_ref().unwrap();
394            let source_port = self_underlying
395                .try_read()
396                .unwrap()
397                .declare_many_client(self_underlying);
398
399            let other_underlying_borrow = p2.underlying.borrow();
400            let other_underlying = other_underlying_borrow.as_ref().unwrap();
401            let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
402                p2_port.clone(),
403                match server_hint {
404                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
405                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
406                },
407                other_underlying,
408            );
409
410            source_port.send_to(&recipient_port);
411
412            p1.client_ports
413                .borrow_mut()
414                .insert(p1_port.clone(), source_port);
415        })
416    }
417
418    fn o2e_sink(
419        _compile_env: &Self::CompileEnv,
420        _p1: &Self::Process,
421        _p1_port: &Self::Port,
422        _p2: &Self::External,
423        _p2_port: &Self::Port,
424        shared_handle: String,
425    ) -> syn::Expr {
426        let sink_ident = syn::Ident::new(
427            &format!("__hydro_deploy_{}_sink", &shared_handle),
428            Span::call_site(),
429        );
430        parse_quote!(#sink_ident)
431    }
432
433    fn cluster_ids(
434        _env: &Self::CompileEnv,
435        of_cluster: usize,
436    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
437        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
438    }
439
440    fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
441        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
442    }
443}
444
445#[expect(missing_docs, reason = "TODO")]
446pub trait DeployCrateWrapper {
447    fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
448
449    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
450    async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
451        self.underlying().read().await.stdout()
452    }
453
454    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
455    async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
456        self.underlying().read().await.stderr()
457    }
458
459    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
460    async fn stdout_filter(
461        &self,
462        prefix: impl Into<String>,
463    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
464        self.underlying().read().await.stdout_filter(prefix.into())
465    }
466
467    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
468    async fn stderr_filter(
469        &self,
470        prefix: impl Into<String>,
471    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
472        self.underlying().read().await.stderr_filter(prefix.into())
473    }
474
475    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
476    async fn tracing_results(&self) -> Option<TracingResults> {
477        self.underlying().read().await.tracing_results().cloned()
478    }
479}
480
481#[expect(missing_docs, reason = "TODO")]
482#[derive(Clone)]
483pub struct TrybuildHost {
484    host: Arc<dyn Host>,
485    display_name: Option<String>,
486    rustflags: Option<String>,
487    additional_hydro_features: Vec<String>,
488    features: Vec<String>,
489    tracing: Option<TracingOptions>,
490    build_envs: Vec<(String, String)>,
491    name_hint: Option<String>,
492    cluster_idx: Option<usize>,
493}
494
495impl From<Arc<dyn Host>> for TrybuildHost {
496    fn from(host: Arc<dyn Host>) -> Self {
497        Self {
498            host,
499            display_name: None,
500            rustflags: None,
501            additional_hydro_features: vec![],
502            features: vec![],
503            tracing: None,
504            build_envs: vec![],
505            name_hint: None,
506            cluster_idx: None,
507        }
508    }
509}
510
511impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
512    fn from(host: Arc<H>) -> Self {
513        Self {
514            host,
515            display_name: None,
516            rustflags: None,
517            additional_hydro_features: vec![],
518            features: vec![],
519            tracing: None,
520            build_envs: vec![],
521            name_hint: None,
522            cluster_idx: None,
523        }
524    }
525}
526
527#[expect(missing_docs, reason = "TODO")]
528impl TrybuildHost {
529    pub fn new(host: Arc<dyn Host>) -> Self {
530        Self {
531            host,
532            display_name: None,
533            rustflags: None,
534            additional_hydro_features: vec![],
535            features: vec![],
536            tracing: None,
537            build_envs: vec![],
538            name_hint: None,
539            cluster_idx: None,
540        }
541    }
542
543    pub fn display_name(self, display_name: impl Into<String>) -> Self {
544        if self.display_name.is_some() {
545            panic!("{} already set", name_of!(display_name in Self));
546        }
547
548        Self {
549            display_name: Some(display_name.into()),
550            ..self
551        }
552    }
553
554    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
555        if self.rustflags.is_some() {
556            panic!("{} already set", name_of!(rustflags in Self));
557        }
558
559        Self {
560            rustflags: Some(rustflags.into()),
561            ..self
562        }
563    }
564
565    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
566        Self {
567            additional_hydro_features,
568            ..self
569        }
570    }
571
572    pub fn features(self, features: Vec<String>) -> Self {
573        Self {
574            features: self.features.into_iter().chain(features).collect(),
575            ..self
576        }
577    }
578
579    pub fn tracing(self, tracing: TracingOptions) -> Self {
580        if self.tracing.is_some() {
581            panic!("{} already set", name_of!(tracing in Self));
582        }
583
584        Self {
585            tracing: Some(tracing),
586            ..self
587        }
588    }
589
590    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
591        Self {
592            build_envs: self
593                .build_envs
594                .into_iter()
595                .chain(std::iter::once((key.into(), value.into())))
596                .collect(),
597            ..self
598        }
599    }
600}
601
602impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
603    type ProcessSpec = TrybuildHost;
604    fn into_process_spec(self) -> TrybuildHost {
605        TrybuildHost {
606            host: self,
607            display_name: None,
608            rustflags: None,
609            additional_hydro_features: vec![],
610            features: vec![],
611            tracing: None,
612            build_envs: vec![],
613            name_hint: None,
614            cluster_idx: None,
615        }
616    }
617}
618
619impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
620    type ProcessSpec = TrybuildHost;
621    fn into_process_spec(self) -> TrybuildHost {
622        TrybuildHost {
623            host: self,
624            display_name: None,
625            rustflags: None,
626            additional_hydro_features: vec![],
627            features: vec![],
628            tracing: None,
629            build_envs: vec![],
630            name_hint: None,
631            cluster_idx: None,
632        }
633    }
634}
635
636#[expect(missing_docs, reason = "TODO")]
637#[derive(Clone)]
638pub struct DeployExternal {
639    next_port: Rc<RefCell<usize>>,
640    host: Arc<dyn Host>,
641    underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
642    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
643    allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
644}
645
646impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
647    fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
648        assert!(
649            self.allocated_ports
650                .borrow_mut()
651                .insert(key, port.clone())
652                .is_none_or(|old| old == port)
653        );
654    }
655
656    fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
657        self.client_ports
658            .borrow()
659            .get(self.allocated_ports.borrow().get(&key).unwrap())
660            .unwrap()
661            .clone()
662    }
663
664    fn as_bytes_bidi(
665        &self,
666        key: usize,
667    ) -> impl Future<
668        Output = (
669            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
670            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
671        ),
672    > + 'a {
673        let port = self.raw_port(key);
674
675        async move {
676            let (source, sink) = port.connect().await.into_source_sink();
677            (
678                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
679                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
680            )
681        }
682    }
683
684    fn as_bincode_bidi<InT, OutT>(
685        &self,
686        key: usize,
687    ) -> impl Future<
688        Output = (
689            Pin<Box<dyn Stream<Item = OutT>>>,
690            Pin<Box<dyn Sink<InT, Error = Error>>>,
691        ),
692    > + 'a
693    where
694        InT: Serialize + 'static,
695        OutT: DeserializeOwned + 'static,
696    {
697        let port = self.raw_port(key);
698        async move {
699            let (source, sink) = port.connect().await.into_source_sink();
700            (
701                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
702                    as Pin<Box<dyn Stream<Item = OutT>>>,
703                Box::pin(
704                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
705                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
706            )
707        }
708    }
709
710    fn as_bincode_sink<T: Serialize + 'static>(
711        &self,
712        key: usize,
713    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
714        let port = self.raw_port(key);
715        async move {
716            let sink = port.connect().await.into_sink();
717            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
718                as Pin<Box<dyn Sink<T, Error = Error>>>
719        }
720    }
721
722    fn as_bincode_source<T: DeserializeOwned + 'static>(
723        &self,
724        key: usize,
725    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
726        let port = self.raw_port(key);
727        async move {
728            let source = port.connect().await.into_source();
729            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
730                as Pin<Box<dyn Stream<Item = T>>>
731        }
732    }
733}
734
735impl Node for DeployExternal {
736    type Port = String;
737    type Meta = HashMap<usize, Vec<u32>>;
738    type InstantiateEnv = Deployment;
739
740    fn next_port(&self) -> Self::Port {
741        let next_port = *self.next_port.borrow();
742        *self.next_port.borrow_mut() += 1;
743
744        format!("port_{}", next_port)
745    }
746
747    fn instantiate(
748        &self,
749        env: &mut Self::InstantiateEnv,
750        _meta: &mut Self::Meta,
751        _graph: DfirGraph,
752        _extra_stmts: Vec<syn::Stmt>,
753    ) {
754        let service = env.CustomService(self.host.clone(), vec![]);
755        *self.underlying.borrow_mut() = Some(service);
756    }
757
758    fn update_meta(&mut self, _meta: &Self::Meta) {}
759}
760
761impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
762    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
763        DeployExternal {
764            next_port: Rc::new(RefCell::new(0)),
765            host: self,
766            underlying: Rc::new(RefCell::new(None)),
767            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
768            client_ports: Rc::new(RefCell::new(HashMap::new())),
769        }
770    }
771}
772
773impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
774    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
775        DeployExternal {
776            next_port: Rc::new(RefCell::new(0)),
777            host: self,
778            underlying: Rc::new(RefCell::new(None)),
779            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
780            client_ports: Rc::new(RefCell::new(HashMap::new())),
781        }
782    }
783}
784
785pub(crate) enum CrateOrTrybuild {
786    Crate(RustCrate),
787    Trybuild(TrybuildHost),
788}
789
790#[expect(missing_docs, reason = "TODO")]
791#[derive(Clone)]
792pub struct DeployNode {
793    id: usize,
794    next_port: Rc<RefCell<usize>>,
795    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
796    underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
797}
798
799impl DeployCrateWrapper for DeployNode {
800    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
801        self.underlying.borrow().as_ref().unwrap().clone()
802    }
803}
804
805impl Node for DeployNode {
806    type Port = String;
807    type Meta = HashMap<usize, Vec<u32>>;
808    type InstantiateEnv = Deployment;
809
810    fn next_port(&self) -> String {
811        let next_port = *self.next_port.borrow();
812        *self.next_port.borrow_mut() += 1;
813
814        format!("port_{}", next_port)
815    }
816
817    fn update_meta(&mut self, meta: &Self::Meta) {
818        let underlying_node = self.underlying.borrow();
819        let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
820        n.update_meta(HydroMeta {
821            clusters: meta.clone(),
822            cluster_id: None,
823            subgraph_id: self.id,
824        });
825    }
826
827    fn instantiate(
828        &self,
829        env: &mut Self::InstantiateEnv,
830        _meta: &mut Self::Meta,
831        graph: DfirGraph,
832        extra_stmts: Vec<syn::Stmt>,
833    ) {
834        let service = match self.service_spec.borrow_mut().take().unwrap() {
835            CrateOrTrybuild::Crate(c) => c,
836            CrateOrTrybuild::Trybuild(trybuild) => {
837                let (bin_name, config) =
838                    create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
839                create_trybuild_service(
840                    trybuild,
841                    &config.project_dir,
842                    &config.target_dir,
843                    &config.features,
844                    &bin_name,
845                )
846            }
847        };
848
849        *self.underlying.borrow_mut() = Some(env.add_service(service));
850    }
851}
852
853#[expect(missing_docs, reason = "TODO")]
854#[derive(Clone)]
855pub struct DeployClusterNode {
856    underlying: Arc<RwLock<RustCrateService>>,
857}
858
859impl DeployCrateWrapper for DeployClusterNode {
860    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
861        self.underlying.clone()
862    }
863}
864#[expect(missing_docs, reason = "TODO")]
865#[derive(Clone)]
866pub struct DeployCluster {
867    id: usize,
868    next_port: Rc<RefCell<usize>>,
869    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
870    members: Rc<RefCell<Vec<DeployClusterNode>>>,
871    name_hint: Option<String>,
872}
873
874impl DeployCluster {
875    #[expect(missing_docs, reason = "TODO")]
876    pub fn members(&self) -> Vec<DeployClusterNode> {
877        self.members.borrow().clone()
878    }
879}
880
881impl Node for DeployCluster {
882    type Port = String;
883    type Meta = HashMap<usize, Vec<u32>>;
884    type InstantiateEnv = Deployment;
885
886    fn next_port(&self) -> String {
887        let next_port = *self.next_port.borrow();
888        *self.next_port.borrow_mut() += 1;
889
890        format!("port_{}", next_port)
891    }
892
893    fn instantiate(
894        &self,
895        env: &mut Self::InstantiateEnv,
896        meta: &mut Self::Meta,
897        graph: DfirGraph,
898        extra_stmts: Vec<syn::Stmt>,
899    ) {
900        let has_trybuild = self
901            .cluster_spec
902            .borrow()
903            .as_ref()
904            .unwrap()
905            .iter()
906            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
907
908        let maybe_trybuild = if has_trybuild {
909            Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
910        } else {
911            None
912        };
913
914        let cluster_nodes = self
915            .cluster_spec
916            .borrow_mut()
917            .take()
918            .unwrap()
919            .into_iter()
920            .map(|spec| {
921                let service = match spec {
922                    CrateOrTrybuild::Crate(c) => c,
923                    CrateOrTrybuild::Trybuild(trybuild) => {
924                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
925                        create_trybuild_service(
926                            trybuild,
927                            &config.project_dir,
928                            &config.target_dir,
929                            &config.features,
930                            bin_name,
931                        )
932                    }
933                };
934
935                env.add_service(service)
936            })
937            .collect::<Vec<_>>();
938        meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
939        *self.members.borrow_mut() = cluster_nodes
940            .into_iter()
941            .map(|n| DeployClusterNode { underlying: n })
942            .collect();
943    }
944
945    fn update_meta(&mut self, meta: &Self::Meta) {
946        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
947            let mut n = node.underlying.try_write().unwrap();
948            n.update_meta(HydroMeta {
949                clusters: meta.clone(),
950                cluster_id: Some(cluster_id as u32),
951                subgraph_id: self.id,
952            });
953        }
954    }
955}
956
957#[expect(missing_docs, reason = "TODO")]
958#[derive(Clone)]
959pub struct DeployProcessSpec(RustCrate);
960
961impl DeployProcessSpec {
962    #[expect(missing_docs, reason = "TODO")]
963    pub fn new(t: RustCrate) -> Self {
964        Self(t)
965    }
966}
967
968impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
969    fn build(self, id: usize, _name_hint: &str) -> DeployNode {
970        DeployNode {
971            id,
972            next_port: Rc::new(RefCell::new(0)),
973            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
974            underlying: Rc::new(RefCell::new(None)),
975        }
976    }
977}
978
979impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
980    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
981        self.name_hint = Some(format!("{} (process {id})", name_hint));
982        DeployNode {
983            id,
984            next_port: Rc::new(RefCell::new(0)),
985            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
986            underlying: Rc::new(RefCell::new(None)),
987        }
988    }
989}
990
991#[expect(missing_docs, reason = "TODO")]
992#[derive(Clone)]
993pub struct DeployClusterSpec(Vec<RustCrate>);
994
995impl DeployClusterSpec {
996    #[expect(missing_docs, reason = "TODO")]
997    pub fn new(crates: Vec<RustCrate>) -> Self {
998        Self(crates)
999    }
1000}
1001
1002impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1003    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1004        DeployCluster {
1005            id,
1006            next_port: Rc::new(RefCell::new(0)),
1007            cluster_spec: Rc::new(RefCell::new(Some(
1008                self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
1009            ))),
1010            members: Rc::new(RefCell::new(vec![])),
1011            name_hint: None,
1012        }
1013    }
1014}
1015
1016impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1017    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1018        let name_hint = format!("{} (cluster {id})", name_hint);
1019        DeployCluster {
1020            id,
1021            next_port: Rc::new(RefCell::new(0)),
1022            cluster_spec: Rc::new(RefCell::new(Some(
1023                self.into_iter()
1024                    .enumerate()
1025                    .map(|(idx, b)| {
1026                        let mut b = b.into();
1027                        b.name_hint = Some(name_hint.clone());
1028                        b.cluster_idx = Some(idx);
1029                        CrateOrTrybuild::Trybuild(b)
1030                    })
1031                    .collect(),
1032            ))),
1033            members: Rc::new(RefCell::new(vec![])),
1034            name_hint: Some(name_hint),
1035        }
1036    }
1037}
1038
1039fn create_trybuild_service(
1040    trybuild: TrybuildHost,
1041    dir: &std::path::PathBuf,
1042    target_dir: &std::path::PathBuf,
1043    features: &Option<Vec<String>>,
1044    bin_name: &str,
1045) -> RustCrate {
1046    let mut ret = RustCrate::new(dir, trybuild.host)
1047        .target_dir(target_dir)
1048        .example(bin_name)
1049        .no_default_features();
1050
1051    if let Some(display_name) = trybuild.display_name {
1052        ret = ret.display_name(display_name);
1053    } else if let Some(name_hint) = trybuild.name_hint {
1054        if let Some(cluster_idx) = trybuild.cluster_idx {
1055            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1056        } else {
1057            ret = ret.display_name(name_hint);
1058        }
1059    }
1060
1061    if let Some(rustflags) = trybuild.rustflags {
1062        ret = ret.rustflags(rustflags);
1063    }
1064
1065    if let Some(tracing) = trybuild.tracing {
1066        ret = ret.tracing(tracing);
1067    }
1068
1069    ret = ret.features(
1070        vec!["hydro___feature_deploy_integration".to_string()]
1071            .into_iter()
1072            .chain(
1073                trybuild
1074                    .additional_hydro_features
1075                    .into_iter()
1076                    .map(|runtime_feature| {
1077                        assert!(
1078                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1079                            "{runtime_feature} is not a valid Hydro runtime feature"
1080                        );
1081                        format!("hydro___feature_{runtime_feature}")
1082                    }),
1083            )
1084            .chain(trybuild.features),
1085    );
1086
1087    for (key, value) in trybuild.build_envs {
1088        ret = ret.build_env(key, value);
1089    }
1090
1091    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1092    ret = ret.config("build.incremental = false");
1093
1094    if let Some(features) = features {
1095        ret = ret.features(features);
1096    }
1097
1098    ret
1099}