hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26use tokio::sync::RwLock;
27
28use super::deploy_runtime::*;
29use crate::compile::deploy_provider::{
30    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
31};
32use crate::compile::trybuild::generate::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
33use crate::location::dynamic::LocationId;
34use crate::location::member_id::TaglessMemberId;
35use crate::location::{MembershipEvent, NetworkHint};
36use crate::staging_util::get_this_crate;
37
38/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
39///
40/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
41/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
42pub enum HydroDeploy {}
43
44impl<'a> Deploy<'a> for HydroDeploy {
45    type InstantiateEnv = Deployment;
46    type Process = DeployNode;
47    type Cluster = DeployCluster;
48    type External = DeployExternal;
49    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
50    type GraphId = ();
51    type Port = String;
52    type ExternalRawPort = CustomClientPort;
53
54    fn allocate_process_port(process: &Self::Process) -> Self::Port {
55        process.next_port()
56    }
57
58    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
59        cluster.next_port()
60    }
61
62    fn allocate_external_port(external: &Self::External) -> Self::Port {
63        external.next_port()
64    }
65
66    fn o2o_sink_source(
67        _p1: &Self::Process,
68        p1_port: &Self::Port,
69        _p2: &Self::Process,
70        p2_port: &Self::Port,
71    ) -> (syn::Expr, syn::Expr) {
72        let p1_port = p1_port.as_str();
73        let p2_port = p2_port.as_str();
74        deploy_o2o(
75            RuntimeData::new("__hydro_lang_trybuild_cli"),
76            p1_port,
77            p2_port,
78        )
79    }
80
81    fn o2o_connect(
82        p1: &Self::Process,
83        p1_port: &Self::Port,
84        p2: &Self::Process,
85        p2_port: &Self::Port,
86    ) -> Box<dyn FnOnce()> {
87        let p1 = p1.clone();
88        let p1_port = p1_port.clone();
89        let p2 = p2.clone();
90        let p2_port = p2_port.clone();
91
92        Box::new(move || {
93            let self_underlying_borrow = p1.underlying.borrow();
94            let self_underlying = self_underlying_borrow.as_ref().unwrap();
95            let source_port = self_underlying
96                .try_read()
97                .unwrap()
98                .get_port(p1_port.clone(), self_underlying);
99
100            let other_underlying_borrow = p2.underlying.borrow();
101            let other_underlying = other_underlying_borrow.as_ref().unwrap();
102            let recipient_port = other_underlying
103                .try_read()
104                .unwrap()
105                .get_port(p2_port.clone(), other_underlying);
106
107            source_port.send_to(&recipient_port)
108        })
109    }
110
111    fn o2m_sink_source(
112        _p1: &Self::Process,
113        p1_port: &Self::Port,
114        _c2: &Self::Cluster,
115        c2_port: &Self::Port,
116    ) -> (syn::Expr, syn::Expr) {
117        let p1_port = p1_port.as_str();
118        let c2_port = c2_port.as_str();
119        deploy_o2m(
120            RuntimeData::new("__hydro_lang_trybuild_cli"),
121            p1_port,
122            c2_port,
123        )
124    }
125
126    fn o2m_connect(
127        p1: &Self::Process,
128        p1_port: &Self::Port,
129        c2: &Self::Cluster,
130        c2_port: &Self::Port,
131    ) -> Box<dyn FnOnce()> {
132        let p1 = p1.clone();
133        let p1_port = p1_port.clone();
134        let c2 = c2.clone();
135        let c2_port = c2_port.clone();
136
137        Box::new(move || {
138            let self_underlying_borrow = p1.underlying.borrow();
139            let self_underlying = self_underlying_borrow.as_ref().unwrap();
140            let source_port = self_underlying
141                .try_read()
142                .unwrap()
143                .get_port(p1_port.clone(), self_underlying);
144
145            let recipient_port = DemuxSink {
146                demux: c2
147                    .members
148                    .borrow()
149                    .iter()
150                    .enumerate()
151                    .map(|(id, c)| {
152                        let n = c.underlying.try_read().unwrap();
153                        (
154                            id as u32,
155                            Arc::new(n.get_port(c2_port.clone(), &c.underlying))
156                                as Arc<dyn RustCrateSink + 'static>,
157                        )
158                    })
159                    .collect(),
160            };
161
162            source_port.send_to(&recipient_port)
163        })
164    }
165
166    fn m2o_sink_source(
167        _c1: &Self::Cluster,
168        c1_port: &Self::Port,
169        _p2: &Self::Process,
170        p2_port: &Self::Port,
171    ) -> (syn::Expr, syn::Expr) {
172        let c1_port = c1_port.as_str();
173        let p2_port = p2_port.as_str();
174        deploy_m2o(
175            RuntimeData::new("__hydro_lang_trybuild_cli"),
176            c1_port,
177            p2_port,
178        )
179    }
180
181    fn m2o_connect(
182        c1: &Self::Cluster,
183        c1_port: &Self::Port,
184        p2: &Self::Process,
185        p2_port: &Self::Port,
186    ) -> Box<dyn FnOnce()> {
187        let c1 = c1.clone();
188        let c1_port = c1_port.clone();
189        let p2 = p2.clone();
190        let p2_port = p2_port.clone();
191
192        Box::new(move || {
193            let other_underlying_borrow = p2.underlying.borrow();
194            let other_underlying = other_underlying_borrow.as_ref().unwrap();
195            let recipient_port = other_underlying
196                .try_read()
197                .unwrap()
198                .get_port(p2_port.clone(), other_underlying)
199                .merge();
200
201            for (i, node) in c1.members.borrow().iter().enumerate() {
202                let source_port = node
203                    .underlying
204                    .try_read()
205                    .unwrap()
206                    .get_port(c1_port.clone(), &node.underlying);
207
208                TaggedSource {
209                    source: Arc::new(source_port),
210                    tag: i as u32,
211                }
212                .send_to(&recipient_port);
213            }
214        })
215    }
216
217    fn m2m_sink_source(
218        _c1: &Self::Cluster,
219        c1_port: &Self::Port,
220        _c2: &Self::Cluster,
221        c2_port: &Self::Port,
222    ) -> (syn::Expr, syn::Expr) {
223        let c1_port = c1_port.as_str();
224        let c2_port = c2_port.as_str();
225        deploy_m2m(
226            RuntimeData::new("__hydro_lang_trybuild_cli"),
227            c1_port,
228            c2_port,
229        )
230    }
231
232    fn m2m_connect(
233        c1: &Self::Cluster,
234        c1_port: &Self::Port,
235        c2: &Self::Cluster,
236        c2_port: &Self::Port,
237    ) -> Box<dyn FnOnce()> {
238        let c1 = c1.clone();
239        let c1_port = c1_port.clone();
240        let c2 = c2.clone();
241        let c2_port = c2_port.clone();
242
243        Box::new(move || {
244            for (i, sender) in c1.members.borrow().iter().enumerate() {
245                let source_port = sender
246                    .underlying
247                    .try_read()
248                    .unwrap()
249                    .get_port(c1_port.clone(), &sender.underlying);
250
251                let recipient_port = DemuxSink {
252                    demux: c2
253                        .members
254                        .borrow()
255                        .iter()
256                        .enumerate()
257                        .map(|(id, c)| {
258                            let n = c.underlying.try_read().unwrap();
259                            (
260                                id as u32,
261                                Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
262                                    as Arc<dyn RustCrateSink + 'static>,
263                            )
264                        })
265                        .collect(),
266                };
267
268                TaggedSource {
269                    source: Arc::new(source_port),
270                    tag: i as u32,
271                }
272                .send_to(&recipient_port);
273            }
274        })
275    }
276
277    fn e2o_many_source(
278        extra_stmts: &mut Vec<syn::Stmt>,
279        _p2: &Self::Process,
280        p2_port: &Self::Port,
281        codec_type: &syn::Type,
282        shared_handle: String,
283    ) -> syn::Expr {
284        let connect_ident = syn::Ident::new(
285            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
286            Span::call_site(),
287        );
288        let source_ident = syn::Ident::new(
289            &format!("__hydro_deploy_many_{}_source", &shared_handle),
290            Span::call_site(),
291        );
292        let sink_ident = syn::Ident::new(
293            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
294            Span::call_site(),
295        );
296        let membership_ident = syn::Ident::new(
297            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
298            Span::call_site(),
299        );
300
301        let root = get_this_crate();
302
303        extra_stmts.push(syn::parse_quote! {
304            let #connect_ident = __hydro_lang_trybuild_cli
305                .port(#p2_port)
306                .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
307        });
308
309        extra_stmts.push(syn::parse_quote! {
310            let #source_ident = #connect_ident.source;
311        });
312
313        extra_stmts.push(syn::parse_quote! {
314            let #sink_ident = #connect_ident.sink;
315        });
316
317        extra_stmts.push(syn::parse_quote! {
318            let #membership_ident = #connect_ident.membership;
319        });
320
321        parse_quote!(#source_ident)
322    }
323
324    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
325        let sink_ident = syn::Ident::new(
326            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
327            Span::call_site(),
328        );
329        parse_quote!(#sink_ident)
330    }
331
332    fn e2o_source(
333        extra_stmts: &mut Vec<syn::Stmt>,
334        _p1: &Self::External,
335        _p1_port: &Self::Port,
336        _p2: &Self::Process,
337        p2_port: &Self::Port,
338        codec_type: &syn::Type,
339        shared_handle: String,
340    ) -> syn::Expr {
341        let connect_ident = syn::Ident::new(
342            &format!("__hydro_deploy_{}_connect", &shared_handle),
343            Span::call_site(),
344        );
345        let source_ident = syn::Ident::new(
346            &format!("__hydro_deploy_{}_source", &shared_handle),
347            Span::call_site(),
348        );
349        let sink_ident = syn::Ident::new(
350            &format!("__hydro_deploy_{}_sink", &shared_handle),
351            Span::call_site(),
352        );
353
354        let root = get_this_crate();
355
356        extra_stmts.push(syn::parse_quote! {
357            let #connect_ident = __hydro_lang_trybuild_cli
358                .port(#p2_port)
359                .connect::<#root::runtime_support::dfir_rs::util::deploy::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
360        });
361
362        extra_stmts.push(syn::parse_quote! {
363            let #source_ident = #connect_ident.source;
364        });
365
366        extra_stmts.push(syn::parse_quote! {
367            let #sink_ident = #connect_ident.sink;
368        });
369
370        parse_quote!(#source_ident)
371    }
372
373    fn e2o_connect(
374        p1: &Self::External,
375        p1_port: &Self::Port,
376        p2: &Self::Process,
377        p2_port: &Self::Port,
378        _many: bool,
379        server_hint: NetworkHint,
380    ) -> Box<dyn FnOnce()> {
381        let p1 = p1.clone();
382        let p1_port = p1_port.clone();
383        let p2 = p2.clone();
384        let p2_port = p2_port.clone();
385
386        Box::new(move || {
387            let self_underlying_borrow = p1.underlying.borrow();
388            let self_underlying = self_underlying_borrow.as_ref().unwrap();
389            let source_port = self_underlying
390                .try_read()
391                .unwrap()
392                .declare_many_client(self_underlying);
393
394            let other_underlying_borrow = p2.underlying.borrow();
395            let other_underlying = other_underlying_borrow.as_ref().unwrap();
396            let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
397                p2_port.clone(),
398                match server_hint {
399                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
400                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
401                },
402                other_underlying,
403            );
404
405            source_port.send_to(&recipient_port);
406
407            p1.client_ports
408                .borrow_mut()
409                .insert(p1_port.clone(), source_port);
410        })
411    }
412
413    fn o2e_sink(
414        _p1: &Self::Process,
415        _p1_port: &Self::Port,
416        _p2: &Self::External,
417        _p2_port: &Self::Port,
418        shared_handle: String,
419    ) -> syn::Expr {
420        let sink_ident = syn::Ident::new(
421            &format!("__hydro_deploy_{}_sink", &shared_handle),
422            Span::call_site(),
423        );
424        parse_quote!(#sink_ident)
425    }
426
427    fn cluster_ids(
428        of_cluster: usize,
429    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
430        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
431    }
432
433    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
434        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
435    }
436
437    fn cluster_membership_stream(
438        location_id: &LocationId,
439    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
440    {
441        cluster_membership_stream(location_id)
442    }
443}
444
445#[expect(missing_docs, reason = "TODO")]
446pub trait DeployCrateWrapper {
447    fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
448
449    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
450    async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
451        self.underlying().read().await.stdout()
452    }
453
454    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
455    async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
456        self.underlying().read().await.stderr()
457    }
458
459    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
460    async fn stdout_filter(
461        &self,
462        prefix: impl Into<String>,
463    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
464        self.underlying().read().await.stdout_filter(prefix.into())
465    }
466
467    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
468    async fn stderr_filter(
469        &self,
470        prefix: impl Into<String>,
471    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
472        self.underlying().read().await.stderr_filter(prefix.into())
473    }
474
475    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
476    async fn tracing_results(&self) -> Option<TracingResults> {
477        self.underlying().read().await.tracing_results().cloned()
478    }
479}
480
481#[expect(missing_docs, reason = "TODO")]
482#[derive(Clone)]
483pub struct TrybuildHost {
484    host: Arc<dyn Host>,
485    display_name: Option<String>,
486    rustflags: Option<String>,
487    additional_hydro_features: Vec<String>,
488    features: Vec<String>,
489    tracing: Option<TracingOptions>,
490    build_envs: Vec<(String, String)>,
491    name_hint: Option<String>,
492    cluster_idx: Option<usize>,
493}
494
495impl From<Arc<dyn Host>> for TrybuildHost {
496    fn from(host: Arc<dyn Host>) -> Self {
497        Self {
498            host,
499            display_name: None,
500            rustflags: None,
501            additional_hydro_features: vec![],
502            features: vec![],
503            tracing: None,
504            build_envs: vec![],
505            name_hint: None,
506            cluster_idx: None,
507        }
508    }
509}
510
511impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
512    fn from(host: Arc<H>) -> Self {
513        Self {
514            host,
515            display_name: None,
516            rustflags: None,
517            additional_hydro_features: vec![],
518            features: vec![],
519            tracing: None,
520            build_envs: vec![],
521            name_hint: None,
522            cluster_idx: None,
523        }
524    }
525}
526
527#[expect(missing_docs, reason = "TODO")]
528impl TrybuildHost {
529    pub fn new(host: Arc<dyn Host>) -> Self {
530        Self {
531            host,
532            display_name: None,
533            rustflags: None,
534            additional_hydro_features: vec![],
535            features: vec![],
536            tracing: None,
537            build_envs: vec![],
538            name_hint: None,
539            cluster_idx: None,
540        }
541    }
542
543    pub fn display_name(self, display_name: impl Into<String>) -> Self {
544        if self.display_name.is_some() {
545            panic!("{} already set", name_of!(display_name in Self));
546        }
547
548        Self {
549            display_name: Some(display_name.into()),
550            ..self
551        }
552    }
553
554    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
555        if self.rustflags.is_some() {
556            panic!("{} already set", name_of!(rustflags in Self));
557        }
558
559        Self {
560            rustflags: Some(rustflags.into()),
561            ..self
562        }
563    }
564
565    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
566        Self {
567            additional_hydro_features,
568            ..self
569        }
570    }
571
572    pub fn features(self, features: Vec<String>) -> Self {
573        Self {
574            features: self.features.into_iter().chain(features).collect(),
575            ..self
576        }
577    }
578
579    pub fn tracing(self, tracing: TracingOptions) -> Self {
580        if self.tracing.is_some() {
581            panic!("{} already set", name_of!(tracing in Self));
582        }
583
584        Self {
585            tracing: Some(tracing),
586            ..self
587        }
588    }
589
590    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
591        Self {
592            build_envs: self
593                .build_envs
594                .into_iter()
595                .chain(std::iter::once((key.into(), value.into())))
596                .collect(),
597            ..self
598        }
599    }
600}
601
602impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
603    type ProcessSpec = TrybuildHost;
604    fn into_process_spec(self) -> TrybuildHost {
605        TrybuildHost {
606            host: self,
607            display_name: None,
608            rustflags: None,
609            additional_hydro_features: vec![],
610            features: vec![],
611            tracing: None,
612            build_envs: vec![],
613            name_hint: None,
614            cluster_idx: None,
615        }
616    }
617}
618
619impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
620    type ProcessSpec = TrybuildHost;
621    fn into_process_spec(self) -> TrybuildHost {
622        TrybuildHost {
623            host: self,
624            display_name: None,
625            rustflags: None,
626            additional_hydro_features: vec![],
627            features: vec![],
628            tracing: None,
629            build_envs: vec![],
630            name_hint: None,
631            cluster_idx: None,
632        }
633    }
634}
635
636#[expect(missing_docs, reason = "TODO")]
637#[derive(Clone)]
638pub struct DeployExternal {
639    next_port: Rc<RefCell<usize>>,
640    host: Arc<dyn Host>,
641    underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
642    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
643    allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
644}
645
646impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
647    fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
648        assert!(
649            self.allocated_ports
650                .borrow_mut()
651                .insert(key, port.clone())
652                .is_none_or(|old| old == port)
653        );
654    }
655
656    fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
657        self.client_ports
658            .borrow()
659            .get(self.allocated_ports.borrow().get(&key).unwrap())
660            .unwrap()
661            .clone()
662    }
663
664    fn as_bytes_bidi(
665        &self,
666        key: usize,
667    ) -> impl Future<
668        Output = (
669            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
670            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
671        ),
672    > + 'a {
673        let port = self.raw_port(key);
674
675        async move {
676            let (source, sink) = port.connect().await.into_source_sink();
677            (
678                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
679                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
680            )
681        }
682    }
683
684    fn as_bincode_bidi<InT, OutT>(
685        &self,
686        key: usize,
687    ) -> impl Future<
688        Output = (
689            Pin<Box<dyn Stream<Item = OutT>>>,
690            Pin<Box<dyn Sink<InT, Error = Error>>>,
691        ),
692    > + 'a
693    where
694        InT: Serialize + 'static,
695        OutT: DeserializeOwned + 'static,
696    {
697        let port = self.raw_port(key);
698        async move {
699            let (source, sink) = port.connect().await.into_source_sink();
700            (
701                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
702                    as Pin<Box<dyn Stream<Item = OutT>>>,
703                Box::pin(
704                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
705                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
706            )
707        }
708    }
709
710    fn as_bincode_sink<T: Serialize + 'static>(
711        &self,
712        key: usize,
713    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
714        let port = self.raw_port(key);
715        async move {
716            let sink = port.connect().await.into_sink();
717            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
718                as Pin<Box<dyn Sink<T, Error = Error>>>
719        }
720    }
721
722    fn as_bincode_source<T: DeserializeOwned + 'static>(
723        &self,
724        key: usize,
725    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
726        let port = self.raw_port(key);
727        async move {
728            let source = port.connect().await.into_source();
729            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
730                as Pin<Box<dyn Stream<Item = T>>>
731        }
732    }
733}
734
735impl Node for DeployExternal {
736    type Port = String;
737    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
738    type InstantiateEnv = Deployment;
739
740    fn next_port(&self) -> Self::Port {
741        let next_port = *self.next_port.borrow();
742        *self.next_port.borrow_mut() += 1;
743
744        format!("port_{}", next_port)
745    }
746
747    fn instantiate(
748        &self,
749        env: &mut Self::InstantiateEnv,
750        _meta: &mut Self::Meta,
751        _graph: DfirGraph,
752        _extra_stmts: Vec<syn::Stmt>,
753    ) {
754        let service = env.CustomService(self.host.clone(), vec![]);
755        *self.underlying.borrow_mut() = Some(service);
756    }
757
758    fn update_meta(&mut self, _meta: &Self::Meta) {}
759}
760
761impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
762    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
763        DeployExternal {
764            next_port: Rc::new(RefCell::new(0)),
765            host: self,
766            underlying: Rc::new(RefCell::new(None)),
767            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
768            client_ports: Rc::new(RefCell::new(HashMap::new())),
769        }
770    }
771}
772
773impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
774    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
775        DeployExternal {
776            next_port: Rc::new(RefCell::new(0)),
777            host: self,
778            underlying: Rc::new(RefCell::new(None)),
779            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
780            client_ports: Rc::new(RefCell::new(HashMap::new())),
781        }
782    }
783}
784
785pub(crate) enum CrateOrTrybuild {
786    Crate(RustCrate, Arc<dyn Host>),
787    Trybuild(TrybuildHost),
788}
789
790#[expect(missing_docs, reason = "TODO")]
791#[derive(Clone)]
792pub struct DeployNode {
793    id: usize,
794    next_port: Rc<RefCell<usize>>,
795    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
796    underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
797}
798
799impl DeployCrateWrapper for DeployNode {
800    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
801        self.underlying.borrow().as_ref().unwrap().clone()
802    }
803}
804
805impl Node for DeployNode {
806    type Port = String;
807    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
808    type InstantiateEnv = Deployment;
809
810    fn next_port(&self) -> String {
811        let next_port = *self.next_port.borrow();
812        *self.next_port.borrow_mut() += 1;
813
814        format!("port_{}", next_port)
815    }
816
817    fn update_meta(&mut self, meta: &Self::Meta) {
818        let underlying_node = self.underlying.borrow();
819        let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
820        n.update_meta(HydroMeta {
821            clusters: meta.clone(),
822            cluster_id: None,
823            subgraph_id: self.id,
824        });
825    }
826
827    fn instantiate(
828        &self,
829        env: &mut Self::InstantiateEnv,
830        _meta: &mut Self::Meta,
831        graph: DfirGraph,
832        extra_stmts: Vec<syn::Stmt>,
833    ) {
834        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
835            CrateOrTrybuild::Crate(c, host) => (c, host),
836            CrateOrTrybuild::Trybuild(trybuild) => {
837                let (bin_name, config) =
838                    create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
839                let host = trybuild.host.clone();
840                (
841                    create_trybuild_service(
842                        trybuild,
843                        &config.project_dir,
844                        &config.target_dir,
845                        &config.features,
846                        &bin_name,
847                    ),
848                    host,
849                )
850            }
851        };
852
853        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
854    }
855}
856
857#[expect(missing_docs, reason = "TODO")]
858#[derive(Clone)]
859pub struct DeployClusterNode {
860    underlying: Arc<RwLock<RustCrateService>>,
861}
862
863impl DeployCrateWrapper for DeployClusterNode {
864    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
865        self.underlying.clone()
866    }
867}
868#[expect(missing_docs, reason = "TODO")]
869#[derive(Clone)]
870pub struct DeployCluster {
871    id: usize,
872    next_port: Rc<RefCell<usize>>,
873    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
874    members: Rc<RefCell<Vec<DeployClusterNode>>>,
875    name_hint: Option<String>,
876}
877
878impl DeployCluster {
879    #[expect(missing_docs, reason = "TODO")]
880    pub fn members(&self) -> Vec<DeployClusterNode> {
881        self.members.borrow().clone()
882    }
883}
884
885impl Node for DeployCluster {
886    type Port = String;
887    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
888    type InstantiateEnv = Deployment;
889
890    fn next_port(&self) -> String {
891        let next_port = *self.next_port.borrow();
892        *self.next_port.borrow_mut() += 1;
893
894        format!("port_{}", next_port)
895    }
896
897    fn instantiate(
898        &self,
899        env: &mut Self::InstantiateEnv,
900        meta: &mut Self::Meta,
901        graph: DfirGraph,
902        extra_stmts: Vec<syn::Stmt>,
903    ) {
904        let has_trybuild = self
905            .cluster_spec
906            .borrow()
907            .as_ref()
908            .unwrap()
909            .iter()
910            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
911
912        let maybe_trybuild = if has_trybuild {
913            Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
914        } else {
915            None
916        };
917
918        let cluster_nodes = self
919            .cluster_spec
920            .borrow_mut()
921            .take()
922            .unwrap()
923            .into_iter()
924            .map(|spec| {
925                let (service, host) = match spec {
926                    CrateOrTrybuild::Crate(c, host) => (c, host),
927                    CrateOrTrybuild::Trybuild(trybuild) => {
928                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
929                        let host = trybuild.host.clone();
930                        (
931                            create_trybuild_service(
932                                trybuild,
933                                &config.project_dir,
934                                &config.target_dir,
935                                &config.features,
936                                bin_name,
937                            ),
938                            host,
939                        )
940                    }
941                };
942
943                env.add_service(service, host)
944            })
945            .collect::<Vec<_>>();
946        meta.insert(
947            self.id,
948            (0..(cluster_nodes.len() as u32))
949                .map(TaglessMemberId::from_raw_id)
950                .collect(),
951        );
952        *self.members.borrow_mut() = cluster_nodes
953            .into_iter()
954            .map(|n| DeployClusterNode { underlying: n })
955            .collect();
956    }
957
958    fn update_meta(&mut self, meta: &Self::Meta) {
959        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
960            let mut n = node.underlying.try_write().unwrap();
961            n.update_meta(HydroMeta {
962                clusters: meta.clone(),
963                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
964                subgraph_id: self.id,
965            });
966        }
967    }
968}
969
970#[expect(missing_docs, reason = "TODO")]
971#[derive(Clone)]
972pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
973
974impl DeployProcessSpec {
975    #[expect(missing_docs, reason = "TODO")]
976    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
977        Self(t, host)
978    }
979}
980
981impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
982    fn build(self, id: usize, _name_hint: &str) -> DeployNode {
983        DeployNode {
984            id,
985            next_port: Rc::new(RefCell::new(0)),
986            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
987            underlying: Rc::new(RefCell::new(None)),
988        }
989    }
990}
991
992impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
993    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
994        self.name_hint = Some(format!("{} (process {id})", name_hint));
995        DeployNode {
996            id,
997            next_port: Rc::new(RefCell::new(0)),
998            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
999            underlying: Rc::new(RefCell::new(None)),
1000        }
1001    }
1002}
1003
1004#[expect(missing_docs, reason = "TODO")]
1005#[derive(Clone)]
1006pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1007
1008impl DeployClusterSpec {
1009    #[expect(missing_docs, reason = "TODO")]
1010    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1011        Self(crates)
1012    }
1013}
1014
1015impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1016    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1017        DeployCluster {
1018            id,
1019            next_port: Rc::new(RefCell::new(0)),
1020            cluster_spec: Rc::new(RefCell::new(Some(
1021                self.0
1022                    .into_iter()
1023                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1024                    .collect(),
1025            ))),
1026            members: Rc::new(RefCell::new(vec![])),
1027            name_hint: None,
1028        }
1029    }
1030}
1031
1032impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1033    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1034        let name_hint = format!("{} (cluster {id})", name_hint);
1035        DeployCluster {
1036            id,
1037            next_port: Rc::new(RefCell::new(0)),
1038            cluster_spec: Rc::new(RefCell::new(Some(
1039                self.into_iter()
1040                    .enumerate()
1041                    .map(|(idx, b)| {
1042                        let mut b = b.into();
1043                        b.name_hint = Some(name_hint.clone());
1044                        b.cluster_idx = Some(idx);
1045                        CrateOrTrybuild::Trybuild(b)
1046                    })
1047                    .collect(),
1048            ))),
1049            members: Rc::new(RefCell::new(vec![])),
1050            name_hint: Some(name_hint),
1051        }
1052    }
1053}
1054
1055fn create_trybuild_service(
1056    trybuild: TrybuildHost,
1057    dir: &std::path::PathBuf,
1058    target_dir: &std::path::PathBuf,
1059    features: &Option<Vec<String>>,
1060    bin_name: &str,
1061) -> RustCrate {
1062    let mut ret = RustCrate::new(dir)
1063        .target_dir(target_dir)
1064        .example(bin_name)
1065        .no_default_features();
1066
1067    if let Some(display_name) = trybuild.display_name {
1068        ret = ret.display_name(display_name);
1069    } else if let Some(name_hint) = trybuild.name_hint {
1070        if let Some(cluster_idx) = trybuild.cluster_idx {
1071            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1072        } else {
1073            ret = ret.display_name(name_hint);
1074        }
1075    }
1076
1077    if let Some(rustflags) = trybuild.rustflags {
1078        ret = ret.rustflags(rustflags);
1079    }
1080
1081    if let Some(tracing) = trybuild.tracing {
1082        ret = ret.tracing(tracing);
1083    }
1084
1085    ret = ret.features(
1086        vec!["hydro___feature_deploy_integration".to_string()]
1087            .into_iter()
1088            .chain(
1089                trybuild
1090                    .additional_hydro_features
1091                    .into_iter()
1092                    .map(|runtime_feature| {
1093                        assert!(
1094                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1095                            "{runtime_feature} is not a valid Hydro runtime feature"
1096                        );
1097                        format!("hydro___feature_{runtime_feature}")
1098                    }),
1099            )
1100            .chain(trybuild.features),
1101    );
1102
1103    for (key, value) in trybuild.build_envs {
1104        ret = ret.build_env(key, value);
1105    }
1106
1107    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1108    ret = ret.config("build.incremental = false");
1109
1110    if let Some(features) = features {
1111        ret = ret.features(features);
1112    }
1113
1114    ret
1115}