hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26use tokio::sync::RwLock;
27
28use super::deploy_runtime::*;
29use super::trybuild::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
30use crate::compile::deploy_provider::{
31    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::location::NetworkHint;
34use crate::staging_util::get_this_crate;
35
36/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
37///
38/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
39/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
40pub enum HydroDeploy {}
41
42impl<'a> Deploy<'a> for HydroDeploy {
43    type InstantiateEnv = Deployment;
44    type CompileEnv = ();
45    type Process = DeployNode;
46    type Cluster = DeployCluster;
47    type External = DeployExternal;
48    type Meta = HashMap<usize, Vec<u32>>;
49    type GraphId = ();
50    type Port = String;
51    type ExternalRawPort = CustomClientPort;
52
53    fn allocate_process_port(process: &Self::Process) -> Self::Port {
54        process.next_port()
55    }
56
57    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
58        cluster.next_port()
59    }
60
61    fn allocate_external_port(external: &Self::External) -> Self::Port {
62        external.next_port()
63    }
64
65    fn o2o_sink_source(
66        _env: &(),
67        _p1: &Self::Process,
68        p1_port: &Self::Port,
69        _p2: &Self::Process,
70        p2_port: &Self::Port,
71    ) -> (syn::Expr, syn::Expr) {
72        let p1_port = p1_port.as_str();
73        let p2_port = p2_port.as_str();
74        deploy_o2o(
75            RuntimeData::new("__hydro_lang_trybuild_cli"),
76            p1_port,
77            p2_port,
78        )
79    }
80
81    fn o2o_connect(
82        p1: &Self::Process,
83        p1_port: &Self::Port,
84        p2: &Self::Process,
85        p2_port: &Self::Port,
86    ) -> Box<dyn FnOnce()> {
87        let p1 = p1.clone();
88        let p1_port = p1_port.clone();
89        let p2 = p2.clone();
90        let p2_port = p2_port.clone();
91
92        Box::new(move || {
93            let self_underlying_borrow = p1.underlying.borrow();
94            let self_underlying = self_underlying_borrow.as_ref().unwrap();
95            let source_port = self_underlying
96                .try_read()
97                .unwrap()
98                .get_port(p1_port.clone(), self_underlying);
99
100            let other_underlying_borrow = p2.underlying.borrow();
101            let other_underlying = other_underlying_borrow.as_ref().unwrap();
102            let recipient_port = other_underlying
103                .try_read()
104                .unwrap()
105                .get_port(p2_port.clone(), other_underlying);
106
107            source_port.send_to(&recipient_port)
108        })
109    }
110
111    fn o2m_sink_source(
112        _env: &(),
113        _p1: &Self::Process,
114        p1_port: &Self::Port,
115        _c2: &Self::Cluster,
116        c2_port: &Self::Port,
117    ) -> (syn::Expr, syn::Expr) {
118        let p1_port = p1_port.as_str();
119        let c2_port = c2_port.as_str();
120        deploy_o2m(
121            RuntimeData::new("__hydro_lang_trybuild_cli"),
122            p1_port,
123            c2_port,
124        )
125    }
126
127    fn o2m_connect(
128        p1: &Self::Process,
129        p1_port: &Self::Port,
130        c2: &Self::Cluster,
131        c2_port: &Self::Port,
132    ) -> Box<dyn FnOnce()> {
133        let p1 = p1.clone();
134        let p1_port = p1_port.clone();
135        let c2 = c2.clone();
136        let c2_port = c2_port.clone();
137
138        Box::new(move || {
139            let self_underlying_borrow = p1.underlying.borrow();
140            let self_underlying = self_underlying_borrow.as_ref().unwrap();
141            let source_port = self_underlying
142                .try_read()
143                .unwrap()
144                .get_port(p1_port.clone(), self_underlying);
145
146            let recipient_port = DemuxSink {
147                demux: c2
148                    .members
149                    .borrow()
150                    .iter()
151                    .enumerate()
152                    .map(|(id, c)| {
153                        let n = c.underlying.try_read().unwrap();
154                        (
155                            id as u32,
156                            Arc::new(n.get_port(c2_port.clone(), &c.underlying))
157                                as Arc<dyn RustCrateSink + 'static>,
158                        )
159                    })
160                    .collect(),
161            };
162
163            source_port.send_to(&recipient_port)
164        })
165    }
166
167    fn m2o_sink_source(
168        _env: &(),
169        _c1: &Self::Cluster,
170        c1_port: &Self::Port,
171        _p2: &Self::Process,
172        p2_port: &Self::Port,
173    ) -> (syn::Expr, syn::Expr) {
174        let c1_port = c1_port.as_str();
175        let p2_port = p2_port.as_str();
176        deploy_m2o(
177            RuntimeData::new("__hydro_lang_trybuild_cli"),
178            c1_port,
179            p2_port,
180        )
181    }
182
183    fn m2o_connect(
184        c1: &Self::Cluster,
185        c1_port: &Self::Port,
186        p2: &Self::Process,
187        p2_port: &Self::Port,
188    ) -> Box<dyn FnOnce()> {
189        let c1 = c1.clone();
190        let c1_port = c1_port.clone();
191        let p2 = p2.clone();
192        let p2_port = p2_port.clone();
193
194        Box::new(move || {
195            let other_underlying_borrow = p2.underlying.borrow();
196            let other_underlying = other_underlying_borrow.as_ref().unwrap();
197            let recipient_port = other_underlying
198                .try_read()
199                .unwrap()
200                .get_port(p2_port.clone(), other_underlying)
201                .merge();
202
203            for (i, node) in c1.members.borrow().iter().enumerate() {
204                let source_port = node
205                    .underlying
206                    .try_read()
207                    .unwrap()
208                    .get_port(c1_port.clone(), &node.underlying);
209
210                TaggedSource {
211                    source: Arc::new(source_port),
212                    tag: i as u32,
213                }
214                .send_to(&recipient_port);
215            }
216        })
217    }
218
219    fn m2m_sink_source(
220        _env: &(),
221        _c1: &Self::Cluster,
222        c1_port: &Self::Port,
223        _c2: &Self::Cluster,
224        c2_port: &Self::Port,
225    ) -> (syn::Expr, syn::Expr) {
226        let c1_port = c1_port.as_str();
227        let c2_port = c2_port.as_str();
228        deploy_m2m(
229            RuntimeData::new("__hydro_lang_trybuild_cli"),
230            c1_port,
231            c2_port,
232        )
233    }
234
235    fn m2m_connect(
236        c1: &Self::Cluster,
237        c1_port: &Self::Port,
238        c2: &Self::Cluster,
239        c2_port: &Self::Port,
240    ) -> Box<dyn FnOnce()> {
241        let c1 = c1.clone();
242        let c1_port = c1_port.clone();
243        let c2 = c2.clone();
244        let c2_port = c2_port.clone();
245
246        Box::new(move || {
247            for (i, sender) in c1.members.borrow().iter().enumerate() {
248                let source_port = sender
249                    .underlying
250                    .try_read()
251                    .unwrap()
252                    .get_port(c1_port.clone(), &sender.underlying);
253
254                let recipient_port = DemuxSink {
255                    demux: c2
256                        .members
257                        .borrow()
258                        .iter()
259                        .enumerate()
260                        .map(|(id, c)| {
261                            let n = c.underlying.try_read().unwrap();
262                            (
263                                id as u32,
264                                Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
265                                    as Arc<dyn RustCrateSink + 'static>,
266                            )
267                        })
268                        .collect(),
269                };
270
271                TaggedSource {
272                    source: Arc::new(source_port),
273                    tag: i as u32,
274                }
275                .send_to(&recipient_port);
276            }
277        })
278    }
279
280    fn e2o_many_source(
281        _compile_env: &Self::CompileEnv,
282        extra_stmts: &mut Vec<syn::Stmt>,
283        _p2: &Self::Process,
284        p2_port: &Self::Port,
285        codec_type: &syn::Type,
286        shared_handle: String,
287    ) -> syn::Expr {
288        let connect_ident = syn::Ident::new(
289            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
290            Span::call_site(),
291        );
292        let source_ident = syn::Ident::new(
293            &format!("__hydro_deploy_many_{}_source", &shared_handle),
294            Span::call_site(),
295        );
296        let sink_ident = syn::Ident::new(
297            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
298            Span::call_site(),
299        );
300        let membership_ident = syn::Ident::new(
301            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
302            Span::call_site(),
303        );
304
305        let root = get_this_crate();
306
307        extra_stmts.push(syn::parse_quote! {
308            let #connect_ident = __hydro_lang_trybuild_cli
309                .port(#p2_port)
310                .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
311        });
312
313        extra_stmts.push(syn::parse_quote! {
314            let #source_ident = #connect_ident.source;
315        });
316
317        extra_stmts.push(syn::parse_quote! {
318            let #sink_ident = #connect_ident.sink;
319        });
320
321        extra_stmts.push(syn::parse_quote! {
322            let #membership_ident = #connect_ident.membership;
323        });
324
325        parse_quote!(#source_ident)
326    }
327
328    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
329        let sink_ident = syn::Ident::new(
330            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
331            Span::call_site(),
332        );
333        parse_quote!(#sink_ident)
334    }
335
336    fn e2o_source(
337        _compile_env: &Self::CompileEnv,
338        _p1: &Self::External,
339        p1_port: &Self::Port,
340        _p2: &Self::Process,
341        p2_port: &Self::Port,
342    ) -> syn::Expr {
343        let p1_port = p1_port.as_str();
344        let p2_port = p2_port.as_str();
345        deploy_e2o(
346            RuntimeData::new("__hydro_lang_trybuild_cli"),
347            p1_port,
348            p2_port,
349        )
350    }
351
352    fn e2o_connect(
353        p1: &Self::External,
354        p1_port: &Self::Port,
355        p2: &Self::Process,
356        p2_port: &Self::Port,
357        many: bool,
358        server_hint: NetworkHint,
359    ) -> Box<dyn FnOnce()> {
360        let p1 = p1.clone();
361        let p1_port = p1_port.clone();
362        let p2 = p2.clone();
363        let p2_port = p2_port.clone();
364
365        Box::new(move || {
366            let self_underlying_borrow = p1.underlying.borrow();
367            let self_underlying = self_underlying_borrow.as_ref().unwrap();
368            let source_port = if many {
369                self_underlying
370                    .try_read()
371                    .unwrap()
372                    .declare_many_client(self_underlying)
373            } else {
374                self_underlying
375                    .try_read()
376                    .unwrap()
377                    .declare_client(self_underlying)
378            };
379
380            let other_underlying_borrow = p2.underlying.borrow();
381            let other_underlying = other_underlying_borrow.as_ref().unwrap();
382            let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
383                p2_port.clone(),
384                match server_hint {
385                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
386                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
387                },
388                other_underlying,
389            );
390
391            source_port.send_to(&recipient_port);
392
393            p1.client_ports
394                .borrow_mut()
395                .insert(p1_port.clone(), source_port);
396        })
397    }
398
399    fn o2e_sink(
400        _compile_env: &Self::CompileEnv,
401        _p1: &Self::Process,
402        p1_port: &Self::Port,
403        _p2: &Self::External,
404        p2_port: &Self::Port,
405    ) -> syn::Expr {
406        let p1_port = p1_port.as_str();
407        let p2_port = p2_port.as_str();
408        deploy_o2e(
409            RuntimeData::new("__hydro_lang_trybuild_cli"),
410            p1_port,
411            p2_port,
412        )
413    }
414
415    fn o2e_connect(
416        p1: &Self::Process,
417        p1_port: &Self::Port,
418        p2: &Self::External,
419        p2_port: &Self::Port,
420    ) -> Box<dyn FnOnce()> {
421        let p1 = p1.clone();
422        let p1_port = p1_port.clone();
423        let p2 = p2.clone();
424        let p2_port = p2_port.clone();
425
426        Box::new(move || {
427            let self_underlying_borrow = p1.underlying.borrow();
428            let self_underlying = self_underlying_borrow.as_ref().unwrap();
429            let source_port = self_underlying
430                .try_read()
431                .unwrap()
432                .get_port(p1_port.clone(), self_underlying);
433
434            let other_underlying_borrow = p2.underlying.borrow();
435            let other_underlying = other_underlying_borrow.as_ref().unwrap();
436            let recipient_port = other_underlying
437                .try_read()
438                .unwrap()
439                .declare_client(other_underlying);
440
441            source_port.send_to(&recipient_port);
442
443            p2.client_ports
444                .borrow_mut()
445                .insert(p2_port.clone(), recipient_port);
446        })
447    }
448
449    fn cluster_ids(
450        _env: &Self::CompileEnv,
451        of_cluster: usize,
452    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
453        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
454    }
455
456    fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
457        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
458    }
459}
460
461#[expect(missing_docs, reason = "TODO")]
462pub trait DeployCrateWrapper {
463    fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
464
465    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
466    async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
467        self.underlying().read().await.stdout()
468    }
469
470    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
471    async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
472        self.underlying().read().await.stderr()
473    }
474
475    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
476    async fn stdout_filter(
477        &self,
478        prefix: impl Into<String>,
479    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
480        self.underlying().read().await.stdout_filter(prefix.into())
481    }
482
483    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
484    async fn stderr_filter(
485        &self,
486        prefix: impl Into<String>,
487    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
488        self.underlying().read().await.stderr_filter(prefix.into())
489    }
490
491    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
492    async fn tracing_results(&self) -> Option<TracingResults> {
493        self.underlying().read().await.tracing_results().cloned()
494    }
495}
496
497#[expect(missing_docs, reason = "TODO")]
498#[derive(Clone)]
499pub struct TrybuildHost {
500    host: Arc<dyn Host>,
501    display_name: Option<String>,
502    rustflags: Option<String>,
503    additional_hydro_features: Vec<String>,
504    features: Vec<String>,
505    tracing: Option<TracingOptions>,
506    build_envs: Vec<(String, String)>,
507    name_hint: Option<String>,
508    cluster_idx: Option<usize>,
509}
510
511impl From<Arc<dyn Host>> for TrybuildHost {
512    fn from(host: Arc<dyn Host>) -> Self {
513        Self {
514            host,
515            display_name: None,
516            rustflags: None,
517            additional_hydro_features: vec![],
518            features: vec![],
519            tracing: None,
520            build_envs: vec![],
521            name_hint: None,
522            cluster_idx: None,
523        }
524    }
525}
526
527impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
528    fn from(host: Arc<H>) -> Self {
529        Self {
530            host,
531            display_name: None,
532            rustflags: None,
533            additional_hydro_features: vec![],
534            features: vec![],
535            tracing: None,
536            build_envs: vec![],
537            name_hint: None,
538            cluster_idx: None,
539        }
540    }
541}
542
543#[expect(missing_docs, reason = "TODO")]
544impl TrybuildHost {
545    pub fn new(host: Arc<dyn Host>) -> Self {
546        Self {
547            host,
548            display_name: None,
549            rustflags: None,
550            additional_hydro_features: vec![],
551            features: vec![],
552            tracing: None,
553            build_envs: vec![],
554            name_hint: None,
555            cluster_idx: None,
556        }
557    }
558
559    pub fn display_name(self, display_name: impl Into<String>) -> Self {
560        if self.display_name.is_some() {
561            panic!("{} already set", name_of!(display_name in Self));
562        }
563
564        Self {
565            display_name: Some(display_name.into()),
566            ..self
567        }
568    }
569
570    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
571        if self.rustflags.is_some() {
572            panic!("{} already set", name_of!(rustflags in Self));
573        }
574
575        Self {
576            rustflags: Some(rustflags.into()),
577            ..self
578        }
579    }
580
581    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
582        Self {
583            additional_hydro_features,
584            ..self
585        }
586    }
587
588    pub fn features(self, features: Vec<String>) -> Self {
589        Self {
590            features: self.features.into_iter().chain(features).collect(),
591            ..self
592        }
593    }
594
595    pub fn tracing(self, tracing: TracingOptions) -> Self {
596        if self.tracing.is_some() {
597            panic!("{} already set", name_of!(tracing in Self));
598        }
599
600        Self {
601            tracing: Some(tracing),
602            ..self
603        }
604    }
605
606    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
607        Self {
608            build_envs: self
609                .build_envs
610                .into_iter()
611                .chain(std::iter::once((key.into(), value.into())))
612                .collect(),
613            ..self
614        }
615    }
616}
617
618impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
619    type ProcessSpec = TrybuildHost;
620    fn into_process_spec(self) -> TrybuildHost {
621        TrybuildHost {
622            host: self,
623            display_name: None,
624            rustflags: None,
625            additional_hydro_features: vec![],
626            features: vec![],
627            tracing: None,
628            build_envs: vec![],
629            name_hint: None,
630            cluster_idx: None,
631        }
632    }
633}
634
635impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
636    type ProcessSpec = TrybuildHost;
637    fn into_process_spec(self) -> TrybuildHost {
638        TrybuildHost {
639            host: self,
640            display_name: None,
641            rustflags: None,
642            additional_hydro_features: vec![],
643            features: vec![],
644            tracing: None,
645            build_envs: vec![],
646            name_hint: None,
647            cluster_idx: None,
648        }
649    }
650}
651
652#[expect(missing_docs, reason = "TODO")]
653#[derive(Clone)]
654pub struct DeployExternal {
655    next_port: Rc<RefCell<usize>>,
656    host: Arc<dyn Host>,
657    underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
658    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
659    allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
660}
661
662impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
663    fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
664        self.allocated_ports.borrow_mut().insert(key, port);
665    }
666
667    fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
668        self.client_ports
669            .borrow()
670            .get(self.allocated_ports.borrow().get(&key).unwrap())
671            .unwrap()
672            .clone()
673    }
674
675    fn as_bytes_bidi(
676        &self,
677        key: usize,
678    ) -> impl Future<
679        Output = (
680            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
681            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
682        ),
683    > + 'a {
684        let port = self.raw_port(key);
685
686        async move {
687            let (source, sink) = port.connect().await.into_source_sink();
688            (
689                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
690                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
691            )
692        }
693    }
694
695    fn as_bincode_bidi<InT, OutT>(
696        &self,
697        key: usize,
698    ) -> impl Future<
699        Output = (
700            Pin<Box<dyn Stream<Item = OutT>>>,
701            Pin<Box<dyn Sink<InT, Error = Error>>>,
702        ),
703    > + 'a
704    where
705        InT: Serialize + 'static,
706        OutT: DeserializeOwned + 'static,
707    {
708        let port = self.raw_port(key);
709        async move {
710            let (source, sink) = port.connect().await.into_source_sink();
711            (
712                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
713                    as Pin<Box<dyn Stream<Item = OutT>>>,
714                Box::pin(
715                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
716                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
717            )
718        }
719    }
720
721    fn as_bincode_sink<T: Serialize + 'static>(
722        &self,
723        key: usize,
724    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
725        let port = self.raw_port(key);
726        async move {
727            let sink = port.connect().await.into_sink();
728            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
729                as Pin<Box<dyn Sink<T, Error = Error>>>
730        }
731    }
732
733    fn as_bincode_source<T: DeserializeOwned + 'static>(
734        &self,
735        key: usize,
736    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
737        let port = self.raw_port(key);
738        async move {
739            let source = port.connect().await.into_source();
740            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
741                as Pin<Box<dyn Stream<Item = T>>>
742        }
743    }
744}
745
746impl Node for DeployExternal {
747    type Port = String;
748    type Meta = HashMap<usize, Vec<u32>>;
749    type InstantiateEnv = Deployment;
750
751    fn next_port(&self) -> Self::Port {
752        let next_port = *self.next_port.borrow();
753        *self.next_port.borrow_mut() += 1;
754
755        format!("port_{}", next_port)
756    }
757
758    fn instantiate(
759        &self,
760        env: &mut Self::InstantiateEnv,
761        _meta: &mut Self::Meta,
762        _graph: DfirGraph,
763        _extra_stmts: Vec<syn::Stmt>,
764    ) {
765        let service = env.CustomService(self.host.clone(), vec![]);
766        *self.underlying.borrow_mut() = Some(service);
767    }
768
769    fn update_meta(&mut self, _meta: &Self::Meta) {}
770}
771
772impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
773    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
774        DeployExternal {
775            next_port: Rc::new(RefCell::new(0)),
776            host: self,
777            underlying: Rc::new(RefCell::new(None)),
778            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
779            client_ports: Rc::new(RefCell::new(HashMap::new())),
780        }
781    }
782}
783
784impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
785    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
786        DeployExternal {
787            next_port: Rc::new(RefCell::new(0)),
788            host: self,
789            underlying: Rc::new(RefCell::new(None)),
790            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
791            client_ports: Rc::new(RefCell::new(HashMap::new())),
792        }
793    }
794}
795
796pub(crate) enum CrateOrTrybuild {
797    Crate(RustCrate),
798    Trybuild(TrybuildHost),
799}
800
801#[expect(missing_docs, reason = "TODO")]
802#[derive(Clone)]
803pub struct DeployNode {
804    id: usize,
805    next_port: Rc<RefCell<usize>>,
806    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
807    underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
808}
809
810impl DeployCrateWrapper for DeployNode {
811    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
812        self.underlying.borrow().as_ref().unwrap().clone()
813    }
814}
815
816impl Node for DeployNode {
817    type Port = String;
818    type Meta = HashMap<usize, Vec<u32>>;
819    type InstantiateEnv = Deployment;
820
821    fn next_port(&self) -> String {
822        let next_port = *self.next_port.borrow();
823        *self.next_port.borrow_mut() += 1;
824
825        format!("port_{}", next_port)
826    }
827
828    fn update_meta(&mut self, meta: &Self::Meta) {
829        let underlying_node = self.underlying.borrow();
830        let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
831        n.update_meta(HydroMeta {
832            clusters: meta.clone(),
833            cluster_id: None,
834            subgraph_id: self.id,
835        });
836    }
837
838    fn instantiate(
839        &self,
840        env: &mut Self::InstantiateEnv,
841        _meta: &mut Self::Meta,
842        graph: DfirGraph,
843        extra_stmts: Vec<syn::Stmt>,
844    ) {
845        let service = match self.service_spec.borrow_mut().take().unwrap() {
846            CrateOrTrybuild::Crate(c) => c,
847            CrateOrTrybuild::Trybuild(trybuild) => {
848                let (bin_name, config) =
849                    create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
850                create_trybuild_service(
851                    trybuild,
852                    &config.project_dir,
853                    &config.target_dir,
854                    &config.features,
855                    &bin_name,
856                )
857            }
858        };
859
860        *self.underlying.borrow_mut() = Some(env.add_service(service));
861    }
862}
863
864#[expect(missing_docs, reason = "TODO")]
865#[derive(Clone)]
866pub struct DeployClusterNode {
867    underlying: Arc<RwLock<RustCrateService>>,
868}
869
870impl DeployCrateWrapper for DeployClusterNode {
871    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
872        self.underlying.clone()
873    }
874}
875#[expect(missing_docs, reason = "TODO")]
876#[derive(Clone)]
877pub struct DeployCluster {
878    id: usize,
879    next_port: Rc<RefCell<usize>>,
880    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
881    members: Rc<RefCell<Vec<DeployClusterNode>>>,
882    name_hint: Option<String>,
883}
884
885impl DeployCluster {
886    #[expect(missing_docs, reason = "TODO")]
887    pub fn members(&self) -> Vec<DeployClusterNode> {
888        self.members.borrow().clone()
889    }
890}
891
892impl Node for DeployCluster {
893    type Port = String;
894    type Meta = HashMap<usize, Vec<u32>>;
895    type InstantiateEnv = Deployment;
896
897    fn next_port(&self) -> String {
898        let next_port = *self.next_port.borrow();
899        *self.next_port.borrow_mut() += 1;
900
901        format!("port_{}", next_port)
902    }
903
904    fn instantiate(
905        &self,
906        env: &mut Self::InstantiateEnv,
907        meta: &mut Self::Meta,
908        graph: DfirGraph,
909        extra_stmts: Vec<syn::Stmt>,
910    ) {
911        let has_trybuild = self
912            .cluster_spec
913            .borrow()
914            .as_ref()
915            .unwrap()
916            .iter()
917            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
918
919        let maybe_trybuild = if has_trybuild {
920            Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
921        } else {
922            None
923        };
924
925        let cluster_nodes = self
926            .cluster_spec
927            .borrow_mut()
928            .take()
929            .unwrap()
930            .into_iter()
931            .map(|spec| {
932                let service = match spec {
933                    CrateOrTrybuild::Crate(c) => c,
934                    CrateOrTrybuild::Trybuild(trybuild) => {
935                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
936                        create_trybuild_service(
937                            trybuild,
938                            &config.project_dir,
939                            &config.target_dir,
940                            &config.features,
941                            bin_name,
942                        )
943                    }
944                };
945
946                env.add_service(service)
947            })
948            .collect::<Vec<_>>();
949        meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
950        *self.members.borrow_mut() = cluster_nodes
951            .into_iter()
952            .map(|n| DeployClusterNode { underlying: n })
953            .collect();
954    }
955
956    fn update_meta(&mut self, meta: &Self::Meta) {
957        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
958            let mut n = node.underlying.try_write().unwrap();
959            n.update_meta(HydroMeta {
960                clusters: meta.clone(),
961                cluster_id: Some(cluster_id as u32),
962                subgraph_id: self.id,
963            });
964        }
965    }
966}
967
968#[expect(missing_docs, reason = "TODO")]
969#[derive(Clone)]
970pub struct DeployProcessSpec(RustCrate);
971
972impl DeployProcessSpec {
973    #[expect(missing_docs, reason = "TODO")]
974    pub fn new(t: RustCrate) -> Self {
975        Self(t)
976    }
977}
978
979impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
980    fn build(self, id: usize, _name_hint: &str) -> DeployNode {
981        DeployNode {
982            id,
983            next_port: Rc::new(RefCell::new(0)),
984            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
985            underlying: Rc::new(RefCell::new(None)),
986        }
987    }
988}
989
990impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
991    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
992        self.name_hint = Some(format!("{} (process {id})", name_hint));
993        DeployNode {
994            id,
995            next_port: Rc::new(RefCell::new(0)),
996            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
997            underlying: Rc::new(RefCell::new(None)),
998        }
999    }
1000}
1001
1002#[expect(missing_docs, reason = "TODO")]
1003#[derive(Clone)]
1004pub struct DeployClusterSpec(Vec<RustCrate>);
1005
1006impl DeployClusterSpec {
1007    #[expect(missing_docs, reason = "TODO")]
1008    pub fn new(crates: Vec<RustCrate>) -> Self {
1009        Self(crates)
1010    }
1011}
1012
1013impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1014    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1015        DeployCluster {
1016            id,
1017            next_port: Rc::new(RefCell::new(0)),
1018            cluster_spec: Rc::new(RefCell::new(Some(
1019                self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
1020            ))),
1021            members: Rc::new(RefCell::new(vec![])),
1022            name_hint: None,
1023        }
1024    }
1025}
1026
1027impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1028    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1029        let name_hint = format!("{} (cluster {id})", name_hint);
1030        DeployCluster {
1031            id,
1032            next_port: Rc::new(RefCell::new(0)),
1033            cluster_spec: Rc::new(RefCell::new(Some(
1034                self.into_iter()
1035                    .enumerate()
1036                    .map(|(idx, b)| {
1037                        let mut b = b.into();
1038                        b.name_hint = Some(name_hint.clone());
1039                        b.cluster_idx = Some(idx);
1040                        CrateOrTrybuild::Trybuild(b)
1041                    })
1042                    .collect(),
1043            ))),
1044            members: Rc::new(RefCell::new(vec![])),
1045            name_hint: Some(name_hint),
1046        }
1047    }
1048}
1049
1050fn create_trybuild_service(
1051    trybuild: TrybuildHost,
1052    dir: &std::path::PathBuf,
1053    target_dir: &std::path::PathBuf,
1054    features: &Option<Vec<String>>,
1055    bin_name: &str,
1056) -> RustCrate {
1057    let mut ret = RustCrate::new(dir, trybuild.host)
1058        .target_dir(target_dir)
1059        .bin(bin_name)
1060        .no_default_features();
1061
1062    if let Some(display_name) = trybuild.display_name {
1063        ret = ret.display_name(display_name);
1064    } else if let Some(name_hint) = trybuild.name_hint {
1065        if let Some(cluster_idx) = trybuild.cluster_idx {
1066            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1067        } else {
1068            ret = ret.display_name(name_hint);
1069        }
1070    }
1071
1072    if let Some(rustflags) = trybuild.rustflags {
1073        ret = ret.rustflags(rustflags);
1074    }
1075
1076    if let Some(tracing) = trybuild.tracing {
1077        ret = ret.tracing(tracing);
1078    }
1079
1080    ret = ret.features(
1081        vec!["hydro___feature_deploy_integration".to_string()]
1082            .into_iter()
1083            .chain(
1084                trybuild
1085                    .additional_hydro_features
1086                    .into_iter()
1087                    .map(|runtime_feature| {
1088                        assert!(
1089                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1090                            "{runtime_feature} is not a valid Hydro runtime feature"
1091                        );
1092                        format!("hydro___feature_{runtime_feature}")
1093                    }),
1094            )
1095            .chain(trybuild.features),
1096    );
1097
1098    for (key, value) in trybuild.build_envs {
1099        ret = ret.build_env(key, value);
1100    }
1101
1102    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1103    ret = ret.config("build.incremental = false");
1104
1105    if let Some(features) = features {
1106        ret = ret.features(features);
1107    }
1108
1109    ret
1110}