Skip to main content

hydro_lang/deploy/
deploy_graph_containerized_ecs.rs

1//! Deployment backend for ECS that generates manifests describing the binaries,
2//! ports, and service naming needed to package and orchestrate Hydro applications.
3
4use std::cell::RefCell;
5use std::collections::BTreeMap;
6use std::pin::Pin;
7use std::rc::Rc;
8
9use bytes::Bytes;
10use dfir_lang::graph::DfirGraph;
11use futures::{Sink, Stream};
12use proc_macro2::Span;
13use serde::{Deserialize, Serialize};
14use stageleft::QuotedWithContext;
15use syn::parse_quote;
16use tracing::{instrument, trace};
17
18/// Manifest for exporting - describes all processes, clusters, and their configuration
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct HydroManifest {
21    /// Process definitions (single-instance services)
22    pub processes: BTreeMap<String, ProcessManifest>,
23    /// Cluster definitions (multi-instance services)
24    pub clusters: BTreeMap<String, ClusterManifest>,
25}
26
27/// Information the build toolchain needs to compile a trybuild binary.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct BuildConfig {
30    /// Path to the trybuild project directory
31    pub project_dir: String,
32    /// Path to the target directory
33    pub target_dir: String,
34    /// Example/binary name to build
35    pub bin_name: String,
36    /// Package name containing the example (for -p flag)
37    pub package_name: String,
38    /// Features to enable
39    pub features: Vec<String>,
40}
41
42/// Information about an exposed port.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", rename_all = "lowercase")]
45pub enum PortInfo {
46    /// A TCP listener.
47    Tcp {
48        /// The port number.
49        port: u16,
50    },
51}
52
53/// Manifest entry for a single process
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ProcessManifest {
56    /// Build toolchain info for this binary
57    pub build: BuildConfig,
58    /// Ports that need to be exposed, keyed by external port identifier
59    pub ports: BTreeMap<String, PortInfo>,
60    /// Task family name (used for ECS service discovery)
61    pub task_family: String,
62}
63
64/// Manifest entry for a cluster (multiple instances of the same service)
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ClusterManifest {
67    /// Build configuration for this cluster (same binary for all instances)
68    pub build: BuildConfig,
69    /// Ports that need to be exposed, keyed by external port identifier
70    pub ports: BTreeMap<String, PortInfo>,
71    /// Default number of instances
72    pub default_count: usize,
73    /// Task family prefix (instances will be named {prefix}-0, {prefix}-1, etc.)
74    pub task_family_prefix: String,
75}
76
77use super::deploy_runtime_containerized_ecs::*;
78use crate::compile::builder::ExternalPortId;
79use crate::compile::deploy::DeployResult;
80use crate::compile::deploy_provider::{
81    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
82};
83use crate::compile::trybuild::generate::create_graph_trybuild;
84use crate::location::dynamic::LocationId;
85use crate::location::member_id::TaglessMemberId;
86use crate::location::{LocationKey, MembershipEvent, NetworkHint};
87
88/// Represents a process running in an ecs deployment
89#[derive(Clone)]
90pub struct EcsDeployProcess {
91    id: LocationKey,
92    name: String,
93    next_port: Rc<RefCell<u16>>,
94
95    exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
96
97    trybuild_config:
98        Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
99}
100
101impl Node for EcsDeployProcess {
102    type Port = u16;
103    type Meta = ();
104    type InstantiateEnv = EcsDeploy;
105
106    #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
107    fn next_port(&self) -> Self::Port {
108        let port = {
109            let mut borrow = self.next_port.borrow_mut();
110            let port = *borrow;
111            *borrow += 1;
112            port
113        };
114
115        port
116    }
117
118    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
119    fn update_meta(&self, _meta: &Self::Meta) {}
120
121    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
122    fn instantiate(
123        &self,
124        _env: &mut Self::InstantiateEnv,
125        meta: &mut Self::Meta,
126        graph: DfirGraph,
127        extra_stmts: &[syn::Stmt],
128        sidecars: &[syn::Expr],
129    ) {
130        let (bin_name, config) = create_graph_trybuild(
131            graph,
132            extra_stmts,
133            sidecars,
134            Some(&self.name),
135            crate::compile::trybuild::generate::DeployMode::Containerized,
136            crate::compile::trybuild::generate::LinkingMode::Static,
137        );
138
139        // Store the trybuild config for export
140        *self.trybuild_config.borrow_mut() = Some((bin_name, config));
141    }
142}
143
144/// Represents a logical cluster, which can be a variable amount of individual containers.
145#[derive(Clone)]
146pub struct EcsDeployCluster {
147    id: LocationKey,
148    name: String,
149    next_port: Rc<RefCell<u16>>,
150
151    exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
152
153    count: usize,
154
155    /// Stored trybuild config for export
156    trybuild_config:
157        Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
158}
159
160impl Node for EcsDeployCluster {
161    type Port = u16;
162    type Meta = ();
163    type InstantiateEnv = EcsDeploy;
164
165    #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
166    fn next_port(&self) -> Self::Port {
167        let port = {
168            let mut borrow = self.next_port.borrow_mut();
169            let port = *borrow;
170            *borrow += 1;
171            port
172        };
173
174        port
175    }
176
177    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
178    fn update_meta(&self, _meta: &Self::Meta) {}
179
180    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, extra_stmts = extra_stmts.len()))]
181    fn instantiate(
182        &self,
183        _env: &mut Self::InstantiateEnv,
184        _meta: &mut Self::Meta,
185        graph: DfirGraph,
186        extra_stmts: &[syn::Stmt],
187        sidecars: &[syn::Expr],
188    ) {
189        let (bin_name, config) = create_graph_trybuild(
190            graph,
191            extra_stmts,
192            sidecars,
193            Some(&self.name),
194            crate::compile::trybuild::generate::DeployMode::Containerized,
195            crate::compile::trybuild::generate::LinkingMode::Static,
196        );
197
198        // Store the trybuild config for export
199        *self.trybuild_config.borrow_mut() = Some((bin_name, config));
200    }
201}
202
203/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
204#[derive(Clone, Debug)]
205pub struct EcsDeployExternal {
206    name: String,
207    next_port: Rc<RefCell<u16>>,
208}
209
210impl Node for EcsDeployExternal {
211    type Port = u16;
212    type Meta = ();
213    type InstantiateEnv = EcsDeploy;
214
215    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
216    fn next_port(&self) -> Self::Port {
217        let port = {
218            let mut borrow = self.next_port.borrow_mut();
219            let port = *borrow;
220            *borrow += 1;
221            port
222        };
223
224        port
225    }
226
227    #[instrument(level = "trace", skip_all, fields(name = self.name))]
228    fn update_meta(&self, _meta: &Self::Meta) {}
229
230    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
231    fn instantiate(
232        &self,
233        _env: &mut Self::InstantiateEnv,
234        meta: &mut Self::Meta,
235        graph: DfirGraph,
236        extra_stmts: &[syn::Stmt],
237        sidecars: &[syn::Expr],
238    ) {
239        trace!(name: "surface", surface = graph.surface_syntax_string());
240    }
241}
242
243type DynSourceSink<Out, In, InErr> = (
244    Pin<Box<dyn Stream<Item = Out>>>,
245    Pin<Box<dyn Sink<In, Error = InErr>>>,
246);
247
248impl<'a> RegisterPort<'a, EcsDeploy> for EcsDeployExternal {
249    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
250    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {}
251
252    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
253    fn as_bytes_bidi(
254        &self,
255        _external_port_id: ExternalPortId,
256    ) -> impl Future<
257        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
258    > + 'a {
259        async { unimplemented!() }
260    }
261
262    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
263    fn as_bincode_bidi<InT, OutT>(
264        &self,
265        _external_port_id: ExternalPortId,
266    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
267    where
268        InT: Serialize + 'static,
269        OutT: serde::de::DeserializeOwned + 'static,
270    {
271        async { unimplemented!() }
272    }
273
274    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
275    fn as_bincode_sink<T>(
276        &self,
277        _external_port_id: ExternalPortId,
278    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
279    where
280        T: Serialize + 'static,
281    {
282        async { unimplemented!() }
283    }
284
285    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
286    fn as_bincode_source<T>(
287        &self,
288        _external_port_id: ExternalPortId,
289    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
290    where
291        T: serde::de::DeserializeOwned + 'static,
292    {
293        async { unimplemented!() }
294    }
295}
296
297/// Represents an aws ecs deployment.
298pub struct EcsDeploy;
299
300impl Default for EcsDeploy {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306impl EcsDeploy {
307    /// Creates a new ecs deployment.
308    pub fn new() -> Self {
309        Self
310    }
311
312    /// Add an internal ecs process to the deployment.
313    pub fn add_ecs_process(&mut self) -> EcsDeployProcessSpec {
314        EcsDeployProcessSpec
315    }
316
317    /// Add an internal ecs cluster to the deployment.
318    pub fn add_ecs_cluster(&mut self, count: usize) -> EcsDeployClusterSpec {
319        EcsDeployClusterSpec { count }
320    }
321
322    /// Add an external process to the deployment.
323    pub fn add_external(&self, name: String) -> EcsDeployExternalSpec {
324        EcsDeployExternalSpec { name }
325    }
326
327    /// Export a deployment manifest describing each process and cluster: its
328    /// binary name, exposed ports, and ECS task-family naming.
329    ///
330    /// The returned [`HydroManifest`] is typically serialized to JSON and
331    /// consumed by a build script (to compile the trybuild binaries) and a
332    /// deployment tool (CDK, custom scripts, etc.) to create container images
333    /// and orchestrate services.
334    #[instrument(level = "trace", skip_all)]
335    pub fn export(&self, nodes: &DeployResult<'_, Self>) -> HydroManifest {
336        let mut manifest = HydroManifest {
337            processes: BTreeMap::new(),
338            clusters: BTreeMap::new(),
339        };
340
341        // processes
342        for (location_id, name_hint, process) in nodes.get_all_processes() {
343            let LocationId::Process(_) = location_id else {
344                unreachable!()
345            };
346
347            let (bin_name, trybuild_config) = process
348                .trybuild_config
349                .borrow()
350                .clone()
351                .expect("trybuild_config should be set after instantiate");
352
353            manifest.processes.insert(
354                name_hint.to_owned(),
355                ProcessManifest {
356                    build: build_info_from_config(&bin_name, &trybuild_config),
357                    ports: process.exposed_ports.borrow().clone(),
358                    task_family: process.name.clone(),
359                },
360            );
361        }
362
363        // clusters
364        for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
365            let LocationId::Cluster(_) = location_id else {
366                unreachable!()
367            };
368
369            let (bin_name, trybuild_config) = cluster
370                .trybuild_config
371                .borrow()
372                .clone()
373                .expect("trybuild_config should be set after instantiate");
374
375            manifest.clusters.insert(
376                name_hint.to_owned(),
377                ClusterManifest {
378                    build: build_info_from_config(&bin_name, &trybuild_config),
379                    ports: cluster.exposed_ports.borrow().clone(),
380                    default_count: cluster.count,
381                    task_family_prefix: cluster.name.clone(),
382                },
383            );
384        }
385
386        manifest
387    }
388}
389
390fn build_info_from_config(
391    bin_name: &str,
392    config: &crate::compile::trybuild::generate::TrybuildConfig,
393) -> BuildConfig {
394    let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
395    if let Some(extra) = &config.features {
396        features.extend(extra.clone());
397    }
398    let crate_name = config
399        .project_dir
400        .file_name()
401        .and_then(|n| n.to_str())
402        .unwrap_or("unknown")
403        .replace("_", "-");
404
405    let package_name = format!("{}-hydro-trybuild", crate_name);
406
407    BuildConfig {
408        project_dir: config.project_dir.to_string_lossy().into_owned(),
409        target_dir: config.target_dir.to_string_lossy().into_owned(),
410        bin_name: bin_name.to_owned(),
411        package_name,
412        features,
413    }
414}
415
416impl<'a> Deploy<'a> for EcsDeploy {
417    type InstantiateEnv = Self;
418    type Process = EcsDeployProcess;
419    type Cluster = EcsDeployCluster;
420    type External = EcsDeployExternal;
421    type Meta = ();
422
423    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
424    fn o2o_sink_source(
425        _env: &mut Self::InstantiateEnv,
426        p1: &Self::Process,
427        p1_port: &<Self::Process as Node>::Port,
428        p2: &Self::Process,
429        p2_port: &<Self::Process as Node>::Port,
430        name: Option<&str>,
431        networking_info: &crate::networking::NetworkingInfo,
432    ) -> (syn::Expr, syn::Expr) {
433        match networking_info {
434            crate::networking::NetworkingInfo::Tcp {
435                fault: crate::networking::TcpFault::FailStop,
436            } => {}
437            _ => panic!("Unsupported networking info: {:?}", networking_info),
438        }
439
440        deploy_containerized_o2o(
441            &p2.name,
442            name.expect("channel name is required for containerized deployment"),
443        )
444    }
445
446    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
447    fn o2o_connect(
448        p1: &Self::Process,
449        p1_port: &<Self::Process as Node>::Port,
450        p2: &Self::Process,
451        p2_port: &<Self::Process as Node>::Port,
452    ) -> Box<dyn FnOnce()> {
453        let serialized = format!(
454            "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
455            p1.name, p2.name
456        );
457
458        Box::new(move || {
459            trace!(name: "o2o_connect thunk", %serialized);
460        })
461    }
462
463    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
464    fn o2m_sink_source(
465        _env: &mut Self::InstantiateEnv,
466        p1: &Self::Process,
467        p1_port: &<Self::Process as Node>::Port,
468        c2: &Self::Cluster,
469        c2_port: &<Self::Cluster as Node>::Port,
470        name: Option<&str>,
471        networking_info: &crate::networking::NetworkingInfo,
472    ) -> (syn::Expr, syn::Expr) {
473        match networking_info {
474            crate::networking::NetworkingInfo::Tcp {
475                fault: crate::networking::TcpFault::FailStop,
476            } => {}
477            _ => panic!("Unsupported networking info: {:?}", networking_info),
478        }
479
480        deploy_containerized_o2m(
481            name.expect("channel name is required for containerized deployment"),
482        )
483    }
484
485    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
486    fn o2m_connect(
487        p1: &Self::Process,
488        p1_port: &<Self::Process as Node>::Port,
489        c2: &Self::Cluster,
490        c2_port: &<Self::Cluster as Node>::Port,
491    ) -> Box<dyn FnOnce()> {
492        let serialized = format!(
493            "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
494            p1.name, c2.name
495        );
496
497        Box::new(move || {
498            trace!(name: "o2m_connect thunk", %serialized);
499        })
500    }
501
502    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
503    fn m2o_sink_source(
504        _env: &mut Self::InstantiateEnv,
505        c1: &Self::Cluster,
506        c1_port: &<Self::Cluster as Node>::Port,
507        p2: &Self::Process,
508        p2_port: &<Self::Process as Node>::Port,
509        name: Option<&str>,
510        networking_info: &crate::networking::NetworkingInfo,
511    ) -> (syn::Expr, syn::Expr) {
512        match networking_info {
513            crate::networking::NetworkingInfo::Tcp {
514                fault: crate::networking::TcpFault::FailStop,
515            } => {}
516            _ => panic!("Unsupported networking info: {:?}", networking_info),
517        }
518
519        deploy_containerized_m2o(
520            &p2.name,
521            name.expect("channel name is required for containerized deployment"),
522        )
523    }
524
525    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
526    fn m2o_connect(
527        c1: &Self::Cluster,
528        c1_port: &<Self::Cluster as Node>::Port,
529        p2: &Self::Process,
530        p2_port: &<Self::Process as Node>::Port,
531    ) -> Box<dyn FnOnce()> {
532        let serialized = format!(
533            "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
534            c1.name, p2.name
535        );
536
537        Box::new(move || {
538            trace!(name: "m2o_connect thunk", %serialized);
539        })
540    }
541
542    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
543    fn m2m_sink_source(
544        _env: &mut Self::InstantiateEnv,
545        c1: &Self::Cluster,
546        c1_port: &<Self::Cluster as Node>::Port,
547        c2: &Self::Cluster,
548        c2_port: &<Self::Cluster as Node>::Port,
549        name: Option<&str>,
550        networking_info: &crate::networking::NetworkingInfo,
551    ) -> (syn::Expr, syn::Expr) {
552        match networking_info {
553            crate::networking::NetworkingInfo::Tcp {
554                fault: crate::networking::TcpFault::FailStop,
555            } => {}
556            _ => panic!("Unsupported networking info: {:?}", networking_info),
557        }
558
559        deploy_containerized_m2m(
560            name.expect("channel name is required for containerized deployment"),
561        )
562    }
563
564    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
565    fn m2m_connect(
566        c1: &Self::Cluster,
567        c1_port: &<Self::Cluster as Node>::Port,
568        c2: &Self::Cluster,
569        c2_port: &<Self::Cluster as Node>::Port,
570    ) -> Box<dyn FnOnce()> {
571        let serialized = format!(
572            "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
573            c1.name, c2.name
574        );
575
576        Box::new(move || {
577            trace!(name: "m2m_connect thunk", %serialized);
578        })
579    }
580
581    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
582    fn e2o_many_source(
583        extra_stmts: &mut Vec<syn::Stmt>,
584        p2: &Self::Process,
585        p2_port: &<Self::Process as Node>::Port,
586        codec_type: &syn::Type,
587        shared_handle: String,
588    ) -> syn::Expr {
589        p2.exposed_ports
590            .borrow_mut()
591            .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
592
593        let socket_ident = syn::Ident::new(
594            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
595            Span::call_site(),
596        );
597
598        let source_ident = syn::Ident::new(
599            &format!("__hydro_deploy_many_{}_source", &shared_handle),
600            Span::call_site(),
601        );
602
603        let sink_ident = syn::Ident::new(
604            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
605            Span::call_site(),
606        );
607
608        let membership_ident = syn::Ident::new(
609            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
610            Span::call_site(),
611        );
612
613        let bind_addr = format!("0.0.0.0:{}", p2_port);
614
615        extra_stmts.push(syn::parse_quote! {
616            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
617        });
618
619        let root = crate::staging_util::get_this_crate();
620
621        extra_stmts.push(syn::parse_quote! {
622            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
623        });
624
625        parse_quote!(#source_ident)
626    }
627
628    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
629    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
630        let sink_ident = syn::Ident::new(
631            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
632            Span::call_site(),
633        );
634        parse_quote!(#sink_ident)
635    }
636
637    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
638    fn e2o_source(
639        extra_stmts: &mut Vec<syn::Stmt>,
640        p1: &Self::External,
641        p1_port: &<Self::External as Node>::Port,
642        p2: &Self::Process,
643        p2_port: &<Self::Process as Node>::Port,
644        codec_type: &syn::Type,
645        shared_handle: String,
646    ) -> syn::Expr {
647        // Record the port for manifest export
648        p2.exposed_ports
649            .borrow_mut()
650            .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
651
652        let source_ident = syn::Ident::new(
653            &format!("__hydro_deploy_{}_source", &shared_handle),
654            Span::call_site(),
655        );
656
657        let bind_addr = format!("0.0.0.0:{}", p2_port);
658
659        // Always use LazySinkSource for external connections - it creates both sink and source
660        // which is needed for bidirectional connections (unpaired: false)
661        let socket_ident = syn::Ident::new(
662            &format!("__hydro_deploy_{}_socket", &shared_handle),
663            Span::call_site(),
664        );
665
666        let sink_ident = syn::Ident::new(
667            &format!("__hydro_deploy_{}_sink", &shared_handle),
668            Span::call_site(),
669        );
670
671        extra_stmts.push(syn::parse_quote! {
672            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
673        });
674
675        let create_expr = deploy_containerized_external_sink_source_ident(bind_addr, socket_ident);
676
677        extra_stmts.push(syn::parse_quote! {
678            let (#sink_ident, #source_ident) = (#create_expr).split();
679        });
680
681        parse_quote!(#source_ident)
682    }
683
684    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
685    fn e2o_connect(
686        p1: &Self::External,
687        p1_port: &<Self::External as Node>::Port,
688        p2: &Self::Process,
689        p2_port: &<Self::Process as Node>::Port,
690        many: bool,
691        server_hint: NetworkHint,
692    ) -> Box<dyn FnOnce()> {
693        let serialized = format!(
694            "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
695            p1.name, p2.name
696        );
697
698        Box::new(move || {
699            trace!(name: "e2o_connect thunk", %serialized);
700        })
701    }
702
703    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
704    fn o2e_sink(
705        p1: &Self::Process,
706        p1_port: &<Self::Process as Node>::Port,
707        p2: &Self::External,
708        p2_port: &<Self::External as Node>::Port,
709        shared_handle: String,
710    ) -> syn::Expr {
711        let sink_ident = syn::Ident::new(
712            &format!("__hydro_deploy_{}_sink", &shared_handle),
713            Span::call_site(),
714        );
715        parse_quote!(#sink_ident)
716    }
717
718    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
719    fn cluster_ids(
720        of_cluster: LocationKey,
721    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
722        cluster_ids()
723    }
724
725    #[instrument(level = "trace", skip_all)]
726    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
727        cluster_self_id()
728    }
729
730    #[instrument(level = "trace", skip_all, fields(?location_id))]
731    fn cluster_membership_stream(
732        _env: &mut Self::InstantiateEnv,
733        _at_location: &LocationId,
734        location_id: &LocationId,
735    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
736    {
737        cluster_membership_stream(location_id)
738    }
739}
740
741#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location))]
742fn get_ecs_image_name(name_hint: &str, location: LocationKey) -> String {
743    let name_hint = name_hint
744        .split("::")
745        .last()
746        .unwrap()
747        .to_ascii_lowercase()
748        .replace(".", "-")
749        .replace("_", "-")
750        .replace("::", "-");
751
752    format!("hy-{name_hint}-{location}")
753}
754
755/// Represents a Process running in an ecs deployment
756#[derive(Clone)]
757pub struct EcsDeployProcessSpec;
758
759impl<'a> ProcessSpec<'a, EcsDeploy> for EcsDeployProcessSpec {
760    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
761    fn build(self, id: LocationKey, name_hint: &'_ str) -> <EcsDeploy as Deploy<'a>>::Process {
762        EcsDeployProcess {
763            id,
764            name: get_ecs_image_name(name_hint, id),
765            next_port: Rc::new(RefCell::new(10001)),
766            exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
767            trybuild_config: Rc::new(RefCell::new(None)),
768        }
769    }
770}
771
772/// Represents a Cluster running across `count` ecs tasks.
773#[derive(Clone)]
774pub struct EcsDeployClusterSpec {
775    count: usize,
776}
777
778impl<'a> ClusterSpec<'a, EcsDeploy> for EcsDeployClusterSpec {
779    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
780    fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::Cluster {
781        EcsDeployCluster {
782            id,
783            name: get_ecs_image_name(name_hint, id),
784            next_port: Rc::new(RefCell::new(10001)),
785            exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
786            count: self.count,
787            trybuild_config: Rc::new(RefCell::new(None)),
788        }
789    }
790}
791
792/// Represents an external process outside of the management of hydro deploy.
793pub struct EcsDeployExternalSpec {
794    name: String,
795}
796
797impl<'a> ExternalSpec<'a, EcsDeploy> for EcsDeployExternalSpec {
798    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
799    fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::External {
800        EcsDeployExternal {
801            name: self.name,
802            next_port: Rc::new(RefCell::new(10000)),
803        }
804    }
805}