hydro_lang/deploy/
deploy_graph.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::io::Error;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use bytes::{Bytes, BytesMut};
10use dfir_lang::graph::DfirGraph;
11use futures::{Sink, SinkExt, Stream, StreamExt};
12use hydro_deploy::custom_service::CustomClientPort;
13use hydro_deploy::rust_crate::RustCrateService;
14use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
15use hydro_deploy::rust_crate::tracing_options::TracingOptions;
16use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
17use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
18use nameof::name_of;
19use proc_macro2::Span;
20use serde::Serialize;
21use serde::de::DeserializeOwned;
22use stageleft::{QuotedWithContext, RuntimeData};
23use syn::parse_quote;
24use tokio::sync::RwLock;
25
26use super::trybuild::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
27use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};
28use crate::NetworkHint;
29use crate::deploy_runtime::*;
30use crate::staging_util::get_this_crate;
31
32pub struct HydroDeploy {}
33
34impl<'a> Deploy<'a> for HydroDeploy {
35    type InstantiateEnv = Deployment;
36    type CompileEnv = ();
37    type Process = DeployNode;
38    type Cluster = DeployCluster;
39    type External = DeployExternal;
40    type Meta = HashMap<usize, Vec<u32>>;
41    type GraphId = ();
42    type Port = String;
43    type ExternalRawPort = CustomClientPort;
44
45    fn allocate_process_port(process: &Self::Process) -> Self::Port {
46        process.next_port()
47    }
48
49    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
50        cluster.next_port()
51    }
52
53    fn allocate_external_port(external: &Self::External) -> Self::Port {
54        external.next_port()
55    }
56
57    fn o2o_sink_source(
58        _env: &(),
59        _p1: &Self::Process,
60        p1_port: &Self::Port,
61        _p2: &Self::Process,
62        p2_port: &Self::Port,
63    ) -> (syn::Expr, syn::Expr) {
64        let p1_port = p1_port.as_str();
65        let p2_port = p2_port.as_str();
66        deploy_o2o(
67            RuntimeData::new("__hydro_lang_trybuild_cli"),
68            p1_port,
69            p2_port,
70        )
71    }
72
73    fn o2o_connect(
74        p1: &Self::Process,
75        p1_port: &Self::Port,
76        p2: &Self::Process,
77        p2_port: &Self::Port,
78    ) -> Box<dyn FnOnce()> {
79        let p1 = p1.clone();
80        let p1_port = p1_port.clone();
81        let p2 = p2.clone();
82        let p2_port = p2_port.clone();
83
84        Box::new(move || {
85            let self_underlying_borrow = p1.underlying.borrow();
86            let self_underlying = self_underlying_borrow.as_ref().unwrap();
87            let source_port = self_underlying
88                .try_read()
89                .unwrap()
90                .get_port(p1_port.clone(), self_underlying);
91
92            let other_underlying_borrow = p2.underlying.borrow();
93            let other_underlying = other_underlying_borrow.as_ref().unwrap();
94            let recipient_port = other_underlying
95                .try_read()
96                .unwrap()
97                .get_port(p2_port.clone(), other_underlying);
98
99            source_port.send_to(&recipient_port)
100        })
101    }
102
103    fn o2m_sink_source(
104        _env: &(),
105        _p1: &Self::Process,
106        p1_port: &Self::Port,
107        _c2: &Self::Cluster,
108        c2_port: &Self::Port,
109    ) -> (syn::Expr, syn::Expr) {
110        let p1_port = p1_port.as_str();
111        let c2_port = c2_port.as_str();
112        deploy_o2m(
113            RuntimeData::new("__hydro_lang_trybuild_cli"),
114            p1_port,
115            c2_port,
116        )
117    }
118
119    fn o2m_connect(
120        p1: &Self::Process,
121        p1_port: &Self::Port,
122        c2: &Self::Cluster,
123        c2_port: &Self::Port,
124    ) -> Box<dyn FnOnce()> {
125        let p1 = p1.clone();
126        let p1_port = p1_port.clone();
127        let c2 = c2.clone();
128        let c2_port = c2_port.clone();
129
130        Box::new(move || {
131            let self_underlying_borrow = p1.underlying.borrow();
132            let self_underlying = self_underlying_borrow.as_ref().unwrap();
133            let source_port = self_underlying
134                .try_read()
135                .unwrap()
136                .get_port(p1_port.clone(), self_underlying);
137
138            let recipient_port = DemuxSink {
139                demux: c2
140                    .members
141                    .borrow()
142                    .iter()
143                    .enumerate()
144                    .map(|(id, c)| {
145                        let n = c.underlying.try_read().unwrap();
146                        (
147                            id as u32,
148                            Arc::new(n.get_port(c2_port.clone(), &c.underlying))
149                                as Arc<dyn RustCrateSink + 'static>,
150                        )
151                    })
152                    .collect(),
153            };
154
155            source_port.send_to(&recipient_port)
156        })
157    }
158
159    fn m2o_sink_source(
160        _env: &(),
161        _c1: &Self::Cluster,
162        c1_port: &Self::Port,
163        _p2: &Self::Process,
164        p2_port: &Self::Port,
165    ) -> (syn::Expr, syn::Expr) {
166        let c1_port = c1_port.as_str();
167        let p2_port = p2_port.as_str();
168        deploy_m2o(
169            RuntimeData::new("__hydro_lang_trybuild_cli"),
170            c1_port,
171            p2_port,
172        )
173    }
174
175    fn m2o_connect(
176        c1: &Self::Cluster,
177        c1_port: &Self::Port,
178        p2: &Self::Process,
179        p2_port: &Self::Port,
180    ) -> Box<dyn FnOnce()> {
181        let c1 = c1.clone();
182        let c1_port = c1_port.clone();
183        let p2 = p2.clone();
184        let p2_port = p2_port.clone();
185
186        Box::new(move || {
187            let other_underlying_borrow = p2.underlying.borrow();
188            let other_underlying = other_underlying_borrow.as_ref().unwrap();
189            let recipient_port = other_underlying
190                .try_read()
191                .unwrap()
192                .get_port(p2_port.clone(), other_underlying)
193                .merge();
194
195            for (i, node) in c1.members.borrow().iter().enumerate() {
196                let source_port = node
197                    .underlying
198                    .try_read()
199                    .unwrap()
200                    .get_port(c1_port.clone(), &node.underlying);
201
202                TaggedSource {
203                    source: Arc::new(source_port),
204                    tag: i as u32,
205                }
206                .send_to(&recipient_port);
207            }
208        })
209    }
210
211    fn m2m_sink_source(
212        _env: &(),
213        _c1: &Self::Cluster,
214        c1_port: &Self::Port,
215        _c2: &Self::Cluster,
216        c2_port: &Self::Port,
217    ) -> (syn::Expr, syn::Expr) {
218        let c1_port = c1_port.as_str();
219        let c2_port = c2_port.as_str();
220        deploy_m2m(
221            RuntimeData::new("__hydro_lang_trybuild_cli"),
222            c1_port,
223            c2_port,
224        )
225    }
226
227    fn m2m_connect(
228        c1: &Self::Cluster,
229        c1_port: &Self::Port,
230        c2: &Self::Cluster,
231        c2_port: &Self::Port,
232    ) -> Box<dyn FnOnce()> {
233        let c1 = c1.clone();
234        let c1_port = c1_port.clone();
235        let c2 = c2.clone();
236        let c2_port = c2_port.clone();
237
238        Box::new(move || {
239            for (i, sender) in c1.members.borrow().iter().enumerate() {
240                let source_port = sender
241                    .underlying
242                    .try_read()
243                    .unwrap()
244                    .get_port(c1_port.clone(), &sender.underlying);
245
246                let recipient_port = DemuxSink {
247                    demux: c2
248                        .members
249                        .borrow()
250                        .iter()
251                        .enumerate()
252                        .map(|(id, c)| {
253                            let n = c.underlying.try_read().unwrap();
254                            (
255                                id as u32,
256                                Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
257                                    as Arc<dyn RustCrateSink + 'static>,
258                            )
259                        })
260                        .collect(),
261                };
262
263                TaggedSource {
264                    source: Arc::new(source_port),
265                    tag: i as u32,
266                }
267                .send_to(&recipient_port);
268            }
269        })
270    }
271
272    fn e2o_many_source(
273        _compile_env: &Self::CompileEnv,
274        extra_stmts: &mut Vec<syn::Stmt>,
275        _p2: &Self::Process,
276        p2_port: &Self::Port,
277        codec_type: &syn::Type,
278        shared_handle: String,
279    ) -> syn::Expr {
280        let connect_ident = syn::Ident::new(
281            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
282            Span::call_site(),
283        );
284        let source_ident = syn::Ident::new(
285            &format!("__hydro_deploy_many_{}_source", &shared_handle),
286            Span::call_site(),
287        );
288        let sink_ident = syn::Ident::new(
289            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
290            Span::call_site(),
291        );
292        let membership_ident = syn::Ident::new(
293            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
294            Span::call_site(),
295        );
296
297        let root = get_this_crate();
298
299        extra_stmts.push(syn::parse_quote! {
300            let #connect_ident = __hydro_lang_trybuild_cli
301                .port(#p2_port)
302                .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
303        });
304
305        extra_stmts.push(syn::parse_quote! {
306            let #source_ident = #connect_ident.source;
307        });
308
309        extra_stmts.push(syn::parse_quote! {
310            let #sink_ident = #connect_ident.sink;
311        });
312
313        extra_stmts.push(syn::parse_quote! {
314            let #membership_ident = #connect_ident.membership;
315        });
316
317        parse_quote!(#source_ident)
318    }
319
320    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
321        let sink_ident = syn::Ident::new(
322            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
323            Span::call_site(),
324        );
325        parse_quote!(#sink_ident)
326    }
327
328    fn e2o_source(
329        _compile_env: &Self::CompileEnv,
330        _p1: &Self::External,
331        p1_port: &Self::Port,
332        _p2: &Self::Process,
333        p2_port: &Self::Port,
334    ) -> syn::Expr {
335        let p1_port = p1_port.as_str();
336        let p2_port = p2_port.as_str();
337        deploy_e2o(
338            RuntimeData::new("__hydro_lang_trybuild_cli"),
339            p1_port,
340            p2_port,
341        )
342    }
343
344    fn e2o_connect(
345        p1: &Self::External,
346        p1_port: &Self::Port,
347        p2: &Self::Process,
348        p2_port: &Self::Port,
349        many: bool,
350        server_hint: NetworkHint,
351    ) -> Box<dyn FnOnce()> {
352        let p1 = p1.clone();
353        let p1_port = p1_port.clone();
354        let p2 = p2.clone();
355        let p2_port = p2_port.clone();
356
357        Box::new(move || {
358            let self_underlying_borrow = p1.underlying.borrow();
359            let self_underlying = self_underlying_borrow.as_ref().unwrap();
360            let source_port = if many {
361                self_underlying
362                    .try_read()
363                    .unwrap()
364                    .declare_many_client(self_underlying)
365            } else {
366                self_underlying
367                    .try_read()
368                    .unwrap()
369                    .declare_client(self_underlying)
370            };
371
372            let other_underlying_borrow = p2.underlying.borrow();
373            let other_underlying = other_underlying_borrow.as_ref().unwrap();
374            let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
375                p2_port.clone(),
376                match server_hint {
377                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
378                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
379                },
380                other_underlying,
381            );
382
383            source_port.send_to(&recipient_port);
384
385            p1.client_ports
386                .borrow_mut()
387                .insert(p1_port.clone(), source_port);
388        })
389    }
390
391    fn o2e_sink(
392        _compile_env: &Self::CompileEnv,
393        _p1: &Self::Process,
394        p1_port: &Self::Port,
395        _p2: &Self::External,
396        p2_port: &Self::Port,
397    ) -> syn::Expr {
398        let p1_port = p1_port.as_str();
399        let p2_port = p2_port.as_str();
400        deploy_o2e(
401            RuntimeData::new("__hydro_lang_trybuild_cli"),
402            p1_port,
403            p2_port,
404        )
405    }
406
407    fn o2e_connect(
408        p1: &Self::Process,
409        p1_port: &Self::Port,
410        p2: &Self::External,
411        p2_port: &Self::Port,
412    ) -> Box<dyn FnOnce()> {
413        let p1 = p1.clone();
414        let p1_port = p1_port.clone();
415        let p2 = p2.clone();
416        let p2_port = p2_port.clone();
417
418        Box::new(move || {
419            let self_underlying_borrow = p1.underlying.borrow();
420            let self_underlying = self_underlying_borrow.as_ref().unwrap();
421            let source_port = self_underlying
422                .try_read()
423                .unwrap()
424                .get_port(p1_port.clone(), self_underlying);
425
426            let other_underlying_borrow = p2.underlying.borrow();
427            let other_underlying = other_underlying_borrow.as_ref().unwrap();
428            let recipient_port = other_underlying
429                .try_read()
430                .unwrap()
431                .declare_client(other_underlying);
432
433            source_port.send_to(&recipient_port);
434
435            p2.client_ports
436                .borrow_mut()
437                .insert(p2_port.clone(), recipient_port);
438        })
439    }
440
441    fn cluster_ids(
442        _env: &Self::CompileEnv,
443        of_cluster: usize,
444    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
445        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
446    }
447
448    fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
449        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
450    }
451}
452
453pub trait DeployCrateWrapper {
454    fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
455
456    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
457    async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
458        self.underlying().read().await.stdout()
459    }
460
461    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
462    async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
463        self.underlying().read().await.stderr()
464    }
465
466    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
467    async fn stdout_filter(
468        &self,
469        prefix: impl Into<String>,
470    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
471        self.underlying().read().await.stdout_filter(prefix.into())
472    }
473
474    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
475    async fn stderr_filter(
476        &self,
477        prefix: impl Into<String>,
478    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
479        self.underlying().read().await.stderr_filter(prefix.into())
480    }
481
482    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
483    async fn tracing_results(&self) -> Option<TracingResults> {
484        self.underlying().read().await.tracing_results().cloned()
485    }
486}
487
488#[derive(Clone)]
489pub struct TrybuildHost {
490    pub host: Arc<dyn Host>,
491    pub display_name: Option<String>,
492    pub rustflags: Option<String>,
493    pub additional_hydro_features: Vec<String>,
494    pub features: Vec<String>,
495    pub tracing: Option<TracingOptions>,
496    pub name_hint: Option<String>,
497    pub cluster_idx: Option<usize>,
498}
499
500impl From<Arc<dyn Host>> for TrybuildHost {
501    fn from(host: Arc<dyn Host>) -> Self {
502        Self {
503            host,
504            display_name: None,
505            rustflags: None,
506            additional_hydro_features: vec![],
507            features: vec![],
508            tracing: None,
509            name_hint: None,
510            cluster_idx: None,
511        }
512    }
513}
514
515impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
516    fn from(host: Arc<H>) -> Self {
517        Self {
518            host,
519            display_name: None,
520            rustflags: None,
521            additional_hydro_features: vec![],
522            features: vec![],
523            tracing: None,
524            name_hint: None,
525            cluster_idx: None,
526        }
527    }
528}
529
530impl TrybuildHost {
531    pub fn new(host: Arc<dyn Host>) -> Self {
532        Self {
533            host,
534            display_name: None,
535            rustflags: None,
536            additional_hydro_features: vec![],
537            features: vec![],
538            tracing: None,
539            name_hint: None,
540            cluster_idx: None,
541        }
542    }
543
544    pub fn display_name(self, display_name: impl Into<String>) -> Self {
545        if self.display_name.is_some() {
546            panic!("{} already set", name_of!(display_name in Self));
547        }
548
549        Self {
550            display_name: Some(display_name.into()),
551            ..self
552        }
553    }
554
555    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
556        if self.rustflags.is_some() {
557            panic!("{} already set", name_of!(rustflags in Self));
558        }
559
560        Self {
561            rustflags: Some(rustflags.into()),
562            ..self
563        }
564    }
565
566    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
567        Self {
568            additional_hydro_features,
569            ..self
570        }
571    }
572
573    pub fn features(self, features: Vec<String>) -> Self {
574        Self {
575            features: self.features.into_iter().chain(features).collect(),
576            ..self
577        }
578    }
579
580    pub fn tracing(self, tracing: TracingOptions) -> Self {
581        if self.tracing.is_some() {
582            panic!("{} already set", name_of!(tracing in Self));
583        }
584
585        Self {
586            tracing: Some(tracing),
587            ..self
588        }
589    }
590}
591
592impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
593    type ProcessSpec = TrybuildHost;
594    fn into_process_spec(self) -> TrybuildHost {
595        TrybuildHost {
596            host: self,
597            display_name: None,
598            rustflags: None,
599            additional_hydro_features: vec![],
600            features: vec![],
601            tracing: None,
602            name_hint: None,
603            cluster_idx: None,
604        }
605    }
606}
607
608impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
609    type ProcessSpec = TrybuildHost;
610    fn into_process_spec(self) -> TrybuildHost {
611        TrybuildHost {
612            host: self,
613            display_name: None,
614            rustflags: None,
615            additional_hydro_features: vec![],
616            features: vec![],
617            tracing: None,
618            name_hint: None,
619            cluster_idx: None,
620        }
621    }
622}
623
624#[derive(Clone)]
625pub struct DeployExternal {
626    next_port: Rc<RefCell<usize>>,
627    host: Arc<dyn Host>,
628    underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
629    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
630    allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
631}
632
633impl DeployExternal {
634    pub fn take_port(&self, key: usize) -> CustomClientPort {
635        self.client_ports
636            .borrow_mut()
637            .remove(self.allocated_ports.borrow().get(&key).unwrap())
638            .unwrap()
639    }
640}
641
642impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
643    fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
644        self.allocated_ports.borrow_mut().insert(key, port);
645    }
646
647    fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
648        self.client_ports
649            .borrow()
650            .get(self.allocated_ports.borrow().get(&key).unwrap())
651            .unwrap()
652            .clone()
653    }
654
655    fn as_bytes_bidi(
656        &self,
657        key: usize,
658    ) -> impl Future<
659        Output = (
660            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
661            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
662        ),
663    > + 'a {
664        let port = self.raw_port(key);
665
666        async move {
667            let (source, sink) = port.connect().await.into_source_sink();
668            (
669                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
670                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
671            )
672        }
673    }
674
675    fn as_bincode_bidi<InT, OutT>(
676        &self,
677        key: usize,
678    ) -> impl Future<
679        Output = (
680            Pin<Box<dyn Stream<Item = OutT>>>,
681            Pin<Box<dyn Sink<InT, Error = Error>>>,
682        ),
683    > + 'a
684    where
685        InT: Serialize + 'static,
686        OutT: DeserializeOwned + 'static,
687    {
688        let port = self.raw_port(key);
689        async move {
690            let (source, sink) = port.connect().await.into_source_sink();
691            (
692                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
693                    as Pin<Box<dyn Stream<Item = OutT>>>,
694                Box::pin(
695                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
696                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
697            )
698        }
699    }
700
701    fn as_bincode_sink<T: Serialize + 'static>(
702        &self,
703        key: usize,
704    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
705        let port = self.raw_port(key);
706        async move {
707            let sink = port.connect().await.into_sink();
708            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
709                as Pin<Box<dyn Sink<T, Error = Error>>>
710        }
711    }
712
713    fn as_bincode_source<T: DeserializeOwned + 'static>(
714        &self,
715        key: usize,
716    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
717        let port = self.raw_port(key);
718        async move {
719            let source = port.connect().await.into_source();
720            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
721                as Pin<Box<dyn Stream<Item = T>>>
722        }
723    }
724}
725
726impl Node for DeployExternal {
727    type Port = String;
728    type Meta = HashMap<usize, Vec<u32>>;
729    type InstantiateEnv = Deployment;
730
731    fn next_port(&self) -> Self::Port {
732        let next_port = *self.next_port.borrow();
733        *self.next_port.borrow_mut() += 1;
734
735        format!("port_{}", next_port)
736    }
737
738    fn instantiate(
739        &self,
740        env: &mut Self::InstantiateEnv,
741        _meta: &mut Self::Meta,
742        _graph: DfirGraph,
743        _extra_stmts: Vec<syn::Stmt>,
744    ) {
745        let service = env.CustomService(self.host.clone(), vec![]);
746        *self.underlying.borrow_mut() = Some(service);
747    }
748
749    fn update_meta(&mut self, _meta: &Self::Meta) {}
750}
751
752impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
753    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
754        DeployExternal {
755            next_port: Rc::new(RefCell::new(0)),
756            host: self,
757            underlying: Rc::new(RefCell::new(None)),
758            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
759            client_ports: Rc::new(RefCell::new(HashMap::new())),
760        }
761    }
762}
763
764impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
765    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
766        DeployExternal {
767            next_port: Rc::new(RefCell::new(0)),
768            host: self,
769            underlying: Rc::new(RefCell::new(None)),
770            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
771            client_ports: Rc::new(RefCell::new(HashMap::new())),
772        }
773    }
774}
775
776pub enum CrateOrTrybuild {
777    Crate(RustCrate),
778    Trybuild(TrybuildHost),
779}
780
781#[derive(Clone)]
782pub struct DeployNode {
783    id: usize,
784    next_port: Rc<RefCell<usize>>,
785    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
786    underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
787}
788
789impl DeployCrateWrapper for DeployNode {
790    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
791        self.underlying.borrow().as_ref().unwrap().clone()
792    }
793}
794
795impl Node for DeployNode {
796    type Port = String;
797    type Meta = HashMap<usize, Vec<u32>>;
798    type InstantiateEnv = Deployment;
799
800    fn next_port(&self) -> String {
801        let next_port = *self.next_port.borrow();
802        *self.next_port.borrow_mut() += 1;
803
804        format!("port_{}", next_port)
805    }
806
807    fn update_meta(&mut self, meta: &Self::Meta) {
808        let underlying_node = self.underlying.borrow();
809        let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
810        n.update_meta(HydroMeta {
811            clusters: meta.clone(),
812            cluster_id: None,
813            subgraph_id: self.id,
814        });
815    }
816
817    fn instantiate(
818        &self,
819        env: &mut Self::InstantiateEnv,
820        _meta: &mut Self::Meta,
821        graph: DfirGraph,
822        extra_stmts: Vec<syn::Stmt>,
823    ) {
824        let service = match self.service_spec.borrow_mut().take().unwrap() {
825            CrateOrTrybuild::Crate(c) => c,
826            CrateOrTrybuild::Trybuild(trybuild) => {
827                let (bin_name, config) =
828                    create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
829                create_trybuild_service(
830                    trybuild,
831                    &config.project_dir,
832                    &config.target_dir,
833                    &config.features,
834                    &bin_name,
835                )
836            }
837        };
838
839        *self.underlying.borrow_mut() = Some(env.add_service(service));
840    }
841}
842
843#[derive(Clone)]
844pub struct DeployClusterNode {
845    underlying: Arc<RwLock<RustCrateService>>,
846}
847
848impl DeployCrateWrapper for DeployClusterNode {
849    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
850        self.underlying.clone()
851    }
852}
853
854#[derive(Clone)]
855pub struct DeployCluster {
856    id: usize,
857    next_port: Rc<RefCell<usize>>,
858    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
859    members: Rc<RefCell<Vec<DeployClusterNode>>>,
860    name_hint: Option<String>,
861}
862
863impl DeployCluster {
864    pub fn members(&self) -> Vec<DeployClusterNode> {
865        self.members.borrow().clone()
866    }
867}
868
869impl Node for DeployCluster {
870    type Port = String;
871    type Meta = HashMap<usize, Vec<u32>>;
872    type InstantiateEnv = Deployment;
873
874    fn next_port(&self) -> String {
875        let next_port = *self.next_port.borrow();
876        *self.next_port.borrow_mut() += 1;
877
878        format!("port_{}", next_port)
879    }
880
881    fn instantiate(
882        &self,
883        env: &mut Self::InstantiateEnv,
884        meta: &mut Self::Meta,
885        graph: DfirGraph,
886        extra_stmts: Vec<syn::Stmt>,
887    ) {
888        let has_trybuild = self
889            .cluster_spec
890            .borrow()
891            .as_ref()
892            .unwrap()
893            .iter()
894            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
895
896        let maybe_trybuild = if has_trybuild {
897            Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
898        } else {
899            None
900        };
901
902        let cluster_nodes = self
903            .cluster_spec
904            .borrow_mut()
905            .take()
906            .unwrap()
907            .into_iter()
908            .map(|spec| {
909                let service = match spec {
910                    CrateOrTrybuild::Crate(c) => c,
911                    CrateOrTrybuild::Trybuild(trybuild) => {
912                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
913                        create_trybuild_service(
914                            trybuild,
915                            &config.project_dir,
916                            &config.target_dir,
917                            &config.features,
918                            bin_name,
919                        )
920                    }
921                };
922
923                env.add_service(service)
924            })
925            .collect::<Vec<_>>();
926        meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
927        *self.members.borrow_mut() = cluster_nodes
928            .into_iter()
929            .map(|n| DeployClusterNode { underlying: n })
930            .collect();
931    }
932
933    fn update_meta(&mut self, meta: &Self::Meta) {
934        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
935            let mut n = node.underlying.try_write().unwrap();
936            n.update_meta(HydroMeta {
937                clusters: meta.clone(),
938                cluster_id: Some(cluster_id as u32),
939                subgraph_id: self.id,
940            });
941        }
942    }
943}
944
945#[derive(Clone)]
946pub struct DeployProcessSpec(RustCrate);
947
948impl DeployProcessSpec {
949    pub fn new(t: RustCrate) -> Self {
950        Self(t)
951    }
952}
953
954impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
955    fn build(self, id: usize, _name_hint: &str) -> DeployNode {
956        DeployNode {
957            id,
958            next_port: Rc::new(RefCell::new(0)),
959            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
960            underlying: Rc::new(RefCell::new(None)),
961        }
962    }
963}
964
965impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
966    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
967        self.name_hint = Some(format!("{} (process {id})", name_hint));
968        DeployNode {
969            id,
970            next_port: Rc::new(RefCell::new(0)),
971            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
972            underlying: Rc::new(RefCell::new(None)),
973        }
974    }
975}
976
977#[derive(Clone)]
978pub struct DeployClusterSpec(Vec<RustCrate>);
979
980impl DeployClusterSpec {
981    pub fn new(crates: Vec<RustCrate>) -> Self {
982        Self(crates)
983    }
984}
985
986impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
987    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
988        DeployCluster {
989            id,
990            next_port: Rc::new(RefCell::new(0)),
991            cluster_spec: Rc::new(RefCell::new(Some(
992                self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
993            ))),
994            members: Rc::new(RefCell::new(vec![])),
995            name_hint: None,
996        }
997    }
998}
999
1000impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1001    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1002        let name_hint = format!("{} (cluster {id})", name_hint);
1003        DeployCluster {
1004            id,
1005            next_port: Rc::new(RefCell::new(0)),
1006            cluster_spec: Rc::new(RefCell::new(Some(
1007                self.into_iter()
1008                    .enumerate()
1009                    .map(|(idx, b)| {
1010                        let mut b = b.into();
1011                        b.name_hint = Some(name_hint.clone());
1012                        b.cluster_idx = Some(idx);
1013                        CrateOrTrybuild::Trybuild(b)
1014                    })
1015                    .collect(),
1016            ))),
1017            members: Rc::new(RefCell::new(vec![])),
1018            name_hint: Some(name_hint),
1019        }
1020    }
1021}
1022
1023fn create_trybuild_service(
1024    trybuild: TrybuildHost,
1025    dir: &std::path::PathBuf,
1026    target_dir: &std::path::PathBuf,
1027    features: &Option<Vec<String>>,
1028    bin_name: &str,
1029) -> RustCrate {
1030    let mut ret = RustCrate::new(dir, trybuild.host)
1031        .target_dir(target_dir)
1032        .bin(bin_name)
1033        .no_default_features();
1034
1035    if let Some(display_name) = trybuild.display_name {
1036        ret = ret.display_name(display_name);
1037    } else if let Some(name_hint) = trybuild.name_hint {
1038        if let Some(cluster_idx) = trybuild.cluster_idx {
1039            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1040        } else {
1041            ret = ret.display_name(name_hint);
1042        }
1043    }
1044
1045    if let Some(rustflags) = trybuild.rustflags {
1046        ret = ret.rustflags(rustflags);
1047    }
1048
1049    if let Some(tracing) = trybuild.tracing {
1050        ret = ret.tracing(tracing);
1051    }
1052
1053    ret = ret.features(
1054        trybuild
1055            .additional_hydro_features
1056            .into_iter()
1057            .map(|runtime_feature| {
1058                assert!(
1059                    HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1060                    "{runtime_feature} is not a valid Hydro runtime feature"
1061                );
1062                format!("hydro___feature_{runtime_feature}")
1063            })
1064            .chain(trybuild.features),
1065    );
1066
1067    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1068    ret = ret.config("build.incremental = false");
1069
1070    if let Some(features) = features {
1071        ret = ret.features(features);
1072    }
1073
1074    ret
1075}