hydro_lang/deploy/
deploy_graph.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::io::Error;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use dfir_lang::graph::DfirGraph;
10use dfir_rs::bytes::Bytes;
11use dfir_rs::futures::{Sink, SinkExt, Stream, StreamExt};
12use dfir_rs::util::deploy::{ConnectedSink, ConnectedSource};
13use hydro_deploy::custom_service::CustomClientPort;
14use hydro_deploy::rust_crate::RustCrateService;
15use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
16use hydro_deploy::rust_crate::tracing_options::TracingOptions;
17use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
18use nameof::name_of;
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use stageleft::{QuotedWithContext, RuntimeData};
22use tokio::sync::RwLock;
23
24use super::trybuild::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
25use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};
26use crate::deploy_runtime::*;
27
28pub struct HydroDeploy {}
29
30impl<'a> Deploy<'a> for HydroDeploy {
31    type InstantiateEnv = Deployment;
32    type CompileEnv = ();
33    type Process = DeployNode;
34    type Cluster = DeployCluster;
35    type ExternalProcess = DeployExternal;
36    type Meta = HashMap<usize, Vec<u32>>;
37    type GraphId = ();
38    type Port = String;
39    type ExternalRawPort = CustomClientPort;
40
41    fn allocate_process_port(process: &Self::Process) -> Self::Port {
42        process.next_port()
43    }
44
45    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
46        cluster.next_port()
47    }
48
49    fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port {
50        external.next_port()
51    }
52
53    fn o2o_sink_source(
54        _env: &(),
55        _p1: &Self::Process,
56        p1_port: &Self::Port,
57        _p2: &Self::Process,
58        p2_port: &Self::Port,
59    ) -> (syn::Expr, syn::Expr) {
60        let p1_port = p1_port.as_str();
61        let p2_port = p2_port.as_str();
62        deploy_o2o(
63            RuntimeData::new("__hydro_lang_trybuild_cli"),
64            p1_port,
65            p2_port,
66        )
67    }
68
69    fn o2o_connect(
70        p1: &Self::Process,
71        p1_port: &Self::Port,
72        p2: &Self::Process,
73        p2_port: &Self::Port,
74    ) -> Box<dyn FnOnce()> {
75        let p1 = p1.clone();
76        let p1_port = p1_port.clone();
77        let p2 = p2.clone();
78        let p2_port = p2_port.clone();
79
80        Box::new(move || {
81            let self_underlying_borrow = p1.underlying.borrow();
82            let self_underlying = self_underlying_borrow.as_ref().unwrap();
83            let source_port = self_underlying
84                .try_read()
85                .unwrap()
86                .get_port(p1_port.clone(), self_underlying);
87
88            let other_underlying_borrow = p2.underlying.borrow();
89            let other_underlying = other_underlying_borrow.as_ref().unwrap();
90            let recipient_port = other_underlying
91                .try_read()
92                .unwrap()
93                .get_port(p2_port.clone(), other_underlying);
94
95            source_port.send_to(&recipient_port)
96        })
97    }
98
99    fn o2m_sink_source(
100        _env: &(),
101        _p1: &Self::Process,
102        p1_port: &Self::Port,
103        _c2: &Self::Cluster,
104        c2_port: &Self::Port,
105    ) -> (syn::Expr, syn::Expr) {
106        let p1_port = p1_port.as_str();
107        let c2_port = c2_port.as_str();
108        deploy_o2m(
109            RuntimeData::new("__hydro_lang_trybuild_cli"),
110            p1_port,
111            c2_port,
112        )
113    }
114
115    fn o2m_connect(
116        p1: &Self::Process,
117        p1_port: &Self::Port,
118        c2: &Self::Cluster,
119        c2_port: &Self::Port,
120    ) -> Box<dyn FnOnce()> {
121        let p1 = p1.clone();
122        let p1_port = p1_port.clone();
123        let c2 = c2.clone();
124        let c2_port = c2_port.clone();
125
126        Box::new(move || {
127            let self_underlying_borrow = p1.underlying.borrow();
128            let self_underlying = self_underlying_borrow.as_ref().unwrap();
129            let source_port = self_underlying
130                .try_read()
131                .unwrap()
132                .get_port(p1_port.clone(), self_underlying);
133
134            let recipient_port = DemuxSink {
135                demux: c2
136                    .members
137                    .borrow()
138                    .iter()
139                    .enumerate()
140                    .map(|(id, c)| {
141                        let n = c.underlying.try_read().unwrap();
142                        (
143                            id as u32,
144                            Arc::new(n.get_port(c2_port.clone(), &c.underlying))
145                                as Arc<dyn RustCrateSink + 'static>,
146                        )
147                    })
148                    .collect(),
149            };
150
151            source_port.send_to(&recipient_port)
152        })
153    }
154
155    fn m2o_sink_source(
156        _env: &(),
157        _c1: &Self::Cluster,
158        c1_port: &Self::Port,
159        _p2: &Self::Process,
160        p2_port: &Self::Port,
161    ) -> (syn::Expr, syn::Expr) {
162        let c1_port = c1_port.as_str();
163        let p2_port = p2_port.as_str();
164        deploy_m2o(
165            RuntimeData::new("__hydro_lang_trybuild_cli"),
166            c1_port,
167            p2_port,
168        )
169    }
170
171    fn m2o_connect(
172        c1: &Self::Cluster,
173        c1_port: &Self::Port,
174        p2: &Self::Process,
175        p2_port: &Self::Port,
176    ) -> Box<dyn FnOnce()> {
177        let c1 = c1.clone();
178        let c1_port = c1_port.clone();
179        let p2 = p2.clone();
180        let p2_port = p2_port.clone();
181
182        Box::new(move || {
183            let other_underlying_borrow = p2.underlying.borrow();
184            let other_underlying = other_underlying_borrow.as_ref().unwrap();
185            let recipient_port = other_underlying
186                .try_read()
187                .unwrap()
188                .get_port(p2_port.clone(), other_underlying)
189                .merge();
190
191            for (i, node) in c1.members.borrow().iter().enumerate() {
192                let source_port = node
193                    .underlying
194                    .try_read()
195                    .unwrap()
196                    .get_port(c1_port.clone(), &node.underlying);
197
198                TaggedSource {
199                    source: Arc::new(source_port),
200                    tag: i as u32,
201                }
202                .send_to(&recipient_port);
203            }
204        })
205    }
206
207    fn m2m_sink_source(
208        _env: &(),
209        _c1: &Self::Cluster,
210        c1_port: &Self::Port,
211        _c2: &Self::Cluster,
212        c2_port: &Self::Port,
213    ) -> (syn::Expr, syn::Expr) {
214        let c1_port = c1_port.as_str();
215        let c2_port = c2_port.as_str();
216        deploy_m2m(
217            RuntimeData::new("__hydro_lang_trybuild_cli"),
218            c1_port,
219            c2_port,
220        )
221    }
222
223    fn m2m_connect(
224        c1: &Self::Cluster,
225        c1_port: &Self::Port,
226        c2: &Self::Cluster,
227        c2_port: &Self::Port,
228    ) -> Box<dyn FnOnce()> {
229        let c1 = c1.clone();
230        let c1_port = c1_port.clone();
231        let c2 = c2.clone();
232        let c2_port = c2_port.clone();
233
234        Box::new(move || {
235            for (i, sender) in c1.members.borrow().iter().enumerate() {
236                let source_port = sender
237                    .underlying
238                    .try_read()
239                    .unwrap()
240                    .get_port(c1_port.clone(), &sender.underlying);
241
242                let recipient_port = DemuxSink {
243                    demux: c2
244                        .members
245                        .borrow()
246                        .iter()
247                        .enumerate()
248                        .map(|(id, c)| {
249                            let n = c.underlying.try_read().unwrap();
250                            (
251                                id as u32,
252                                Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
253                                    as Arc<dyn RustCrateSink + 'static>,
254                            )
255                        })
256                        .collect(),
257                };
258
259                TaggedSource {
260                    source: Arc::new(source_port),
261                    tag: i as u32,
262                }
263                .send_to(&recipient_port);
264            }
265        })
266    }
267
268    fn e2o_source(
269        _compile_env: &Self::CompileEnv,
270        _p1: &Self::ExternalProcess,
271        p1_port: &Self::Port,
272        _p2: &Self::Process,
273        p2_port: &Self::Port,
274    ) -> syn::Expr {
275        let p1_port = p1_port.as_str();
276        let p2_port = p2_port.as_str();
277        deploy_e2o(
278            RuntimeData::new("__hydro_lang_trybuild_cli"),
279            p1_port,
280            p2_port,
281        )
282    }
283
284    fn e2o_connect(
285        p1: &Self::ExternalProcess,
286        p1_port: &Self::Port,
287        p2: &Self::Process,
288        p2_port: &Self::Port,
289    ) -> Box<dyn FnOnce()> {
290        let p1 = p1.clone();
291        let p1_port = p1_port.clone();
292        let p2 = p2.clone();
293        let p2_port = p2_port.clone();
294
295        Box::new(move || {
296            let self_underlying_borrow = p1.underlying.borrow();
297            let self_underlying = self_underlying_borrow.as_ref().unwrap();
298            let source_port = self_underlying
299                .try_read()
300                .unwrap()
301                .declare_client(self_underlying);
302
303            let other_underlying_borrow = p2.underlying.borrow();
304            let other_underlying = other_underlying_borrow.as_ref().unwrap();
305            let recipient_port = other_underlying
306                .try_read()
307                .unwrap()
308                .get_port(p2_port.clone(), other_underlying);
309
310            source_port.send_to(&recipient_port);
311
312            p1.client_ports
313                .borrow_mut()
314                .insert(p1_port.clone(), source_port);
315        })
316    }
317
318    fn o2e_sink(
319        _compile_env: &Self::CompileEnv,
320        _p1: &Self::Process,
321        p1_port: &Self::Port,
322        _p2: &Self::ExternalProcess,
323        p2_port: &Self::Port,
324    ) -> syn::Expr {
325        let p1_port = p1_port.as_str();
326        let p2_port = p2_port.as_str();
327        deploy_o2e(
328            RuntimeData::new("__hydro_lang_trybuild_cli"),
329            p1_port,
330            p2_port,
331        )
332    }
333
334    fn o2e_connect(
335        p1: &Self::Process,
336        p1_port: &Self::Port,
337        p2: &Self::ExternalProcess,
338        p2_port: &Self::Port,
339    ) -> Box<dyn FnOnce()> {
340        let p1 = p1.clone();
341        let p1_port = p1_port.clone();
342        let p2 = p2.clone();
343        let p2_port = p2_port.clone();
344
345        Box::new(move || {
346            let self_underlying_borrow = p1.underlying.borrow();
347            let self_underlying = self_underlying_borrow.as_ref().unwrap();
348            let source_port = self_underlying
349                .try_read()
350                .unwrap()
351                .get_port(p1_port.clone(), self_underlying);
352
353            let other_underlying_borrow = p2.underlying.borrow();
354            let other_underlying = other_underlying_borrow.as_ref().unwrap();
355            let recipient_port = other_underlying
356                .try_read()
357                .unwrap()
358                .declare_client(other_underlying);
359
360            source_port.send_to(&recipient_port);
361
362            p2.client_ports
363                .borrow_mut()
364                .insert(p2_port.clone(), recipient_port);
365        })
366    }
367
368    fn cluster_ids(
369        _env: &Self::CompileEnv,
370        of_cluster: usize,
371    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
372        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
373    }
374
375    fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
376        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
377    }
378}
379
380pub trait DeployCrateWrapper {
381    fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
382
383    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
384    async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
385        self.underlying().read().await.stdout()
386    }
387
388    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
389    async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
390        self.underlying().read().await.stderr()
391    }
392
393    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
394    async fn stdout_filter(
395        &self,
396        prefix: impl Into<String>,
397    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
398        self.underlying().read().await.stdout_filter(prefix.into())
399    }
400
401    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
402    async fn stderr_filter(
403        &self,
404        prefix: impl Into<String>,
405    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
406        self.underlying().read().await.stderr_filter(prefix.into())
407    }
408
409    #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
410    async fn tracing_results(&self) -> Option<TracingResults> {
411        self.underlying().read().await.tracing_results().cloned()
412    }
413}
414
415#[derive(Clone)]
416pub struct TrybuildHost {
417    pub host: Arc<dyn Host>,
418    pub display_name: Option<String>,
419    pub rustflags: Option<String>,
420    pub additional_hydro_features: Vec<String>,
421    pub tracing: Option<TracingOptions>,
422    pub name_hint: Option<String>,
423    pub cluster_idx: Option<usize>,
424}
425
426impl From<Arc<dyn Host>> for TrybuildHost {
427    fn from(host: Arc<dyn Host>) -> Self {
428        Self {
429            host,
430            display_name: None,
431            rustflags: None,
432            additional_hydro_features: vec![],
433            tracing: None,
434            name_hint: None,
435            cluster_idx: None,
436        }
437    }
438}
439
440impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
441    fn from(host: Arc<H>) -> Self {
442        Self {
443            host,
444            display_name: None,
445            rustflags: None,
446            additional_hydro_features: vec![],
447            tracing: None,
448            name_hint: None,
449            cluster_idx: None,
450        }
451    }
452}
453
454impl TrybuildHost {
455    pub fn new(host: Arc<dyn Host>) -> Self {
456        Self {
457            host,
458            display_name: None,
459            rustflags: None,
460            additional_hydro_features: vec![],
461            tracing: None,
462            name_hint: None,
463            cluster_idx: None,
464        }
465    }
466
467    pub fn display_name(self, display_name: impl Into<String>) -> Self {
468        if self.display_name.is_some() {
469            panic!("{} already set", name_of!(display_name in Self));
470        }
471
472        Self {
473            display_name: Some(display_name.into()),
474            ..self
475        }
476    }
477
478    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
479        if self.rustflags.is_some() {
480            panic!("{} already set", name_of!(rustflags in Self));
481        }
482
483        Self {
484            rustflags: Some(rustflags.into()),
485            ..self
486        }
487    }
488
489    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
490        Self {
491            additional_hydro_features,
492            ..self
493        }
494    }
495
496    pub fn tracing(self, tracing: TracingOptions) -> Self {
497        if self.tracing.is_some() {
498            panic!("{} already set", name_of!(tracing in Self));
499        }
500
501        Self {
502            tracing: Some(tracing),
503            ..self
504        }
505    }
506}
507
508impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
509    type ProcessSpec = TrybuildHost;
510    fn into_process_spec(self) -> TrybuildHost {
511        TrybuildHost {
512            host: self,
513            display_name: None,
514            rustflags: None,
515            additional_hydro_features: vec![],
516            tracing: None,
517            name_hint: None,
518            cluster_idx: None,
519        }
520    }
521}
522
523impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
524    type ProcessSpec = TrybuildHost;
525    fn into_process_spec(self) -> TrybuildHost {
526        TrybuildHost {
527            host: self,
528            display_name: None,
529            rustflags: None,
530            additional_hydro_features: vec![],
531            tracing: None,
532            name_hint: None,
533            cluster_idx: None,
534        }
535    }
536}
537
538#[derive(Clone)]
539pub struct DeployExternal {
540    next_port: Rc<RefCell<usize>>,
541    host: Arc<dyn Host>,
542    underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
543    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
544    allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
545}
546
547impl DeployExternal {
548    pub fn take_port(&self, key: usize) -> CustomClientPort {
549        self.client_ports
550            .borrow_mut()
551            .remove(self.allocated_ports.borrow().get(&key).unwrap())
552            .unwrap()
553    }
554}
555
556impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
557    fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
558        self.allocated_ports.borrow_mut().insert(key, port);
559    }
560
561    fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy>::ExternalRawPort {
562        self.client_ports
563            .borrow_mut()
564            .remove(self.allocated_ports.borrow().get(&key).unwrap())
565            .unwrap()
566    }
567
568    fn as_bytes_sink(
569        &self,
570        key: usize,
571    ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a {
572        let port = self.raw_port(key);
573        async move {
574            let sink = port.connect().await.into_sink();
575            sink as Pin<Box<dyn Sink<Bytes, Error = Error>>>
576        }
577    }
578
579    fn as_bincode_sink<T: Serialize + 'static>(
580        &self,
581        key: usize,
582    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
583        let port = self.raw_port(key);
584        async move {
585            let sink = port.connect().await.into_sink();
586            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
587                as Pin<Box<dyn Sink<T, Error = Error>>>
588        }
589    }
590
591    fn as_bytes_source(
592        &self,
593        key: usize,
594    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
595        let port = self.raw_port(key);
596        async move {
597            let source = port.connect().await.into_source();
598            Box::pin(source.map(|r| r.unwrap().freeze())) as Pin<Box<dyn Stream<Item = Bytes>>>
599        }
600    }
601
602    fn as_bincode_source<T: DeserializeOwned + 'static>(
603        &self,
604        key: usize,
605    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
606        let port = self.raw_port(key);
607        async move {
608            let source = port.connect().await.into_source();
609            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
610                as Pin<Box<dyn Stream<Item = T>>>
611        }
612    }
613}
614
615impl Node for DeployExternal {
616    type Port = String;
617    type Meta = HashMap<usize, Vec<u32>>;
618    type InstantiateEnv = Deployment;
619
620    fn next_port(&self) -> Self::Port {
621        let next_port = *self.next_port.borrow();
622        *self.next_port.borrow_mut() += 1;
623
624        format!("port_{}", next_port)
625    }
626
627    fn instantiate(
628        &self,
629        env: &mut Self::InstantiateEnv,
630        _meta: &mut Self::Meta,
631        _graph: DfirGraph,
632        _extra_stmts: Vec<syn::Stmt>,
633    ) {
634        let service = env.CustomService(self.host.clone(), vec![]);
635        *self.underlying.borrow_mut() = Some(service);
636    }
637
638    fn update_meta(&mut self, _meta: &Self::Meta) {}
639}
640
641impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
642    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
643        DeployExternal {
644            next_port: Rc::new(RefCell::new(0)),
645            host: self,
646            underlying: Rc::new(RefCell::new(None)),
647            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
648            client_ports: Rc::new(RefCell::new(HashMap::new())),
649        }
650    }
651}
652
653impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
654    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
655        DeployExternal {
656            next_port: Rc::new(RefCell::new(0)),
657            host: self,
658            underlying: Rc::new(RefCell::new(None)),
659            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
660            client_ports: Rc::new(RefCell::new(HashMap::new())),
661        }
662    }
663}
664
665pub enum CrateOrTrybuild {
666    Crate(RustCrate),
667    Trybuild(TrybuildHost),
668}
669
670#[derive(Clone)]
671pub struct DeployNode {
672    id: usize,
673    next_port: Rc<RefCell<usize>>,
674    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
675    underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
676}
677
678impl DeployCrateWrapper for DeployNode {
679    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
680        self.underlying.borrow().as_ref().unwrap().clone()
681    }
682}
683
684impl Node for DeployNode {
685    type Port = String;
686    type Meta = HashMap<usize, Vec<u32>>;
687    type InstantiateEnv = Deployment;
688
689    fn next_port(&self) -> String {
690        let next_port = *self.next_port.borrow();
691        *self.next_port.borrow_mut() += 1;
692
693        format!("port_{}", next_port)
694    }
695
696    fn update_meta(&mut self, meta: &Self::Meta) {
697        let underlying_node = self.underlying.borrow();
698        let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
699        n.update_meta(HydroMeta {
700            clusters: meta.clone(),
701            cluster_id: None,
702            subgraph_id: self.id,
703        });
704    }
705
706    fn instantiate(
707        &self,
708        env: &mut Self::InstantiateEnv,
709        _meta: &mut Self::Meta,
710        graph: DfirGraph,
711        extra_stmts: Vec<syn::Stmt>,
712    ) {
713        let service = match self.service_spec.borrow_mut().take().unwrap() {
714            CrateOrTrybuild::Crate(c) => c,
715            CrateOrTrybuild::Trybuild(trybuild) => {
716                let (bin_name, (dir, target_dir, features)) =
717                    create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
718                create_trybuild_service(trybuild, &dir, &target_dir, &features, &bin_name)
719            }
720        };
721
722        *self.underlying.borrow_mut() = Some(env.add_service(service));
723    }
724}
725
726#[derive(Clone)]
727pub struct DeployClusterNode {
728    underlying: Arc<RwLock<RustCrateService>>,
729}
730
731impl DeployCrateWrapper for DeployClusterNode {
732    fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
733        self.underlying.clone()
734    }
735}
736
737#[derive(Clone)]
738pub struct DeployCluster {
739    id: usize,
740    next_port: Rc<RefCell<usize>>,
741    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
742    members: Rc<RefCell<Vec<DeployClusterNode>>>,
743    name_hint: Option<String>,
744}
745
746impl DeployCluster {
747    pub fn members(&self) -> Vec<DeployClusterNode> {
748        self.members.borrow().clone()
749    }
750}
751
752impl Node for DeployCluster {
753    type Port = String;
754    type Meta = HashMap<usize, Vec<u32>>;
755    type InstantiateEnv = Deployment;
756
757    fn next_port(&self) -> String {
758        let next_port = *self.next_port.borrow();
759        *self.next_port.borrow_mut() += 1;
760
761        format!("port_{}", next_port)
762    }
763
764    fn instantiate(
765        &self,
766        env: &mut Self::InstantiateEnv,
767        meta: &mut Self::Meta,
768        graph: DfirGraph,
769        extra_stmts: Vec<syn::Stmt>,
770    ) {
771        let has_trybuild = self
772            .cluster_spec
773            .borrow()
774            .as_ref()
775            .unwrap()
776            .iter()
777            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
778
779        let maybe_trybuild = if has_trybuild {
780            Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
781        } else {
782            None
783        };
784
785        let cluster_nodes = self
786            .cluster_spec
787            .borrow_mut()
788            .take()
789            .unwrap()
790            .into_iter()
791            .map(|spec| {
792                let service = match spec {
793                    CrateOrTrybuild::Crate(c) => c,
794                    CrateOrTrybuild::Trybuild(trybuild) => {
795                        let (bin_name, (dir, target_dir, features)) =
796                            maybe_trybuild.as_ref().unwrap();
797                        create_trybuild_service(trybuild, dir, target_dir, features, bin_name)
798                    }
799                };
800
801                env.add_service(service)
802            })
803            .collect::<Vec<_>>();
804        meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
805        *self.members.borrow_mut() = cluster_nodes
806            .into_iter()
807            .map(|n| DeployClusterNode { underlying: n })
808            .collect();
809    }
810
811    fn update_meta(&mut self, meta: &Self::Meta) {
812        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
813            let mut n = node.underlying.try_write().unwrap();
814            n.update_meta(HydroMeta {
815                clusters: meta.clone(),
816                cluster_id: Some(cluster_id as u32),
817                subgraph_id: self.id,
818            });
819        }
820    }
821}
822
823#[derive(Clone)]
824pub struct DeployProcessSpec(RustCrate);
825
826impl DeployProcessSpec {
827    pub fn new(t: RustCrate) -> Self {
828        Self(t)
829    }
830}
831
832impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
833    fn build(self, id: usize, _name_hint: &str) -> DeployNode {
834        DeployNode {
835            id,
836            next_port: Rc::new(RefCell::new(0)),
837            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
838            underlying: Rc::new(RefCell::new(None)),
839        }
840    }
841}
842
843impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
844    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
845        self.name_hint = Some(format!("{} (process {id})", name_hint));
846        DeployNode {
847            id,
848            next_port: Rc::new(RefCell::new(0)),
849            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
850            underlying: Rc::new(RefCell::new(None)),
851        }
852    }
853}
854
855#[derive(Clone)]
856pub struct DeployClusterSpec(Vec<RustCrate>);
857
858impl DeployClusterSpec {
859    pub fn new(crates: Vec<RustCrate>) -> Self {
860        Self(crates)
861    }
862}
863
864impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
865    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
866        DeployCluster {
867            id,
868            next_port: Rc::new(RefCell::new(0)),
869            cluster_spec: Rc::new(RefCell::new(Some(
870                self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
871            ))),
872            members: Rc::new(RefCell::new(vec![])),
873            name_hint: None,
874        }
875    }
876}
877
878impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
879    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
880        let name_hint = format!("{} (cluster {id})", name_hint);
881        DeployCluster {
882            id,
883            next_port: Rc::new(RefCell::new(0)),
884            cluster_spec: Rc::new(RefCell::new(Some(
885                self.into_iter()
886                    .enumerate()
887                    .map(|(idx, b)| {
888                        let mut b = b.into();
889                        b.name_hint = Some(name_hint.clone());
890                        b.cluster_idx = Some(idx);
891                        CrateOrTrybuild::Trybuild(b)
892                    })
893                    .collect(),
894            ))),
895            members: Rc::new(RefCell::new(vec![])),
896            name_hint: Some(name_hint),
897        }
898    }
899}
900
901fn create_trybuild_service(
902    trybuild: TrybuildHost,
903    dir: &std::path::PathBuf,
904    target_dir: &std::path::PathBuf,
905    features: &Option<Vec<String>>,
906    bin_name: &str,
907) -> RustCrate {
908    let mut ret = RustCrate::new(dir, trybuild.host)
909        .target_dir(target_dir)
910        .bin(bin_name)
911        .no_default_features();
912
913    if let Some(display_name) = trybuild.display_name {
914        ret = ret.display_name(display_name);
915    } else if let Some(name_hint) = trybuild.name_hint {
916        if let Some(cluster_idx) = trybuild.cluster_idx {
917            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
918        } else {
919            ret = ret.display_name(name_hint);
920        }
921    }
922
923    if let Some(rustflags) = trybuild.rustflags {
924        ret = ret.rustflags(rustflags);
925    }
926
927    if let Some(tracing) = trybuild.tracing {
928        ret = ret.tracing(tracing);
929    }
930
931    ret = ret.features(
932        trybuild
933            .additional_hydro_features
934            .into_iter()
935            .map(|runtime_feature| {
936                assert!(
937                    HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
938                    "{runtime_feature} is not a valid Hydro runtime feature"
939                );
940                format!("hydro___feature_{runtime_feature}")
941            }),
942    );
943
944    if let Some(features) = features {
945        ret = ret.features(features);
946    }
947
948    ret
949}