hydro_lang/deploy/
deploy_graph_containerized_ecs.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::query_parameters::{BuildImageOptions, TagImageOptions};
10use bytes::Bytes;
11use dfir_lang::graph::DfirGraph;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use http_body_util::Full;
14use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
15use hydro_deploy::{LinuxCompileType, RustCrate};
16use nanoid::nanoid;
17use proc_macro2::Span;
18use serde_json::{Map, Value, json};
19use sinktools::lazy::LazySink;
20use stageleft::QuotedWithContext;
21use syn::parse_quote;
22use tar::{Builder, Header};
23use tokio::net::TcpStream;
24use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
25use tracing::{Instrument, instrument, trace};
26
27use super::deploy_runtime_containerized_ecs::*;
28
29/// Task configuration for CloudFormation template generation
30struct TaskConfig {
31    task_name: String,
32    image_uri: String,
33    deployment_instance: String,
34    exposed_ports: Vec<u16>,
35    region: String,
36}
37
38/// Generate a complete CloudFormation template with all infrastructure and tasks
39fn generate_cloudformation_template(
40    deployment_instance: &str,
41    tasks: &[TaskConfig],
42) -> anyhow::Result<String> {
43    let mut resources: Map<String, Value> = Map::new();
44
45    // Base infrastructure resources
46    resources.insert(
47        "EcsTaskExecutionRole".to_string(),
48        json!({
49            "Type": "AWS::IAM::Role",
50            "Properties": {
51                "RoleName": format!("hydro-exec-{}", deployment_instance),
52                "AssumeRolePolicyDocument": {
53                    "Version": "2012-10-17",
54                    "Statement": [{
55                        "Effect": "Allow",
56                        "Principal": { "Service": "ecs-tasks.amazonaws.com" },
57                        "Action": "sts:AssumeRole"
58                    }]
59                },
60                "ManagedPolicyArns": [
61                    "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
62                ],
63                "Policies": [{
64                    "PolicyName": "CloudWatchLogsCreateLogGroup",
65                    "PolicyDocument": {
66                        "Version": "2012-10-17",
67                        "Statement": [{
68                            "Effect": "Allow",
69                            "Action": "logs:CreateLogGroup",
70                            "Resource": "*"
71                        }]
72                    }
73                }]
74            }
75        }),
76    );
77
78    resources.insert(
79        "EcsTaskRole".to_string(),
80        json!({
81            "Type": "AWS::IAM::Role",
82            "Properties": {
83                "RoleName": format!("hydro-task-{}", deployment_instance),
84                "AssumeRolePolicyDocument": {
85                    "Version": "2012-10-17",
86                    "Statement": [{
87                        "Effect": "Allow",
88                        "Principal": { "Service": "ecs-tasks.amazonaws.com" },
89                        "Action": "sts:AssumeRole"
90                    }]
91                },
92                "Policies": [{
93                    "PolicyName": "HydroEcsAccess",
94                    "PolicyDocument": {
95                        "Version": "2012-10-17",
96                        "Statement": [{
97                            "Effect": "Allow",
98                            "Action": [
99                                "ecs:ListTasks",
100                                "ecs:DescribeTasks",
101                                "ec2:DescribeNetworkInterfaces"
102                            ],
103                            "Resource": "*"
104                        }]
105                    }
106                }]
107            }
108        }),
109    );
110
111    resources.insert(
112        "VPC".to_string(),
113        json!({
114            "Type": "AWS::EC2::VPC",
115            "Properties": {
116                "CidrBlock": "10.0.0.0/16",
117                "EnableDnsSupport": true,
118                "EnableDnsHostnames": true,
119                "Tags": [{ "Key": "Name", "Value": format!("hydro-vpc-{}", deployment_instance) }]
120            }
121        }),
122    );
123
124    resources.insert("Subnet".to_string(), json!({
125        "Type": "AWS::EC2::Subnet",
126        "Properties": {
127            "VpcId": { "Ref": "VPC" },
128            "CidrBlock": "10.0.1.0/24",
129            "MapPublicIpOnLaunch": true,
130            "Tags": [{ "Key": "Name", "Value": format!("hydro-subnet-{}", deployment_instance) }]
131        }
132    }));
133
134    resources.insert(
135        "InternetGateway".to_string(),
136        json!({
137            "Type": "AWS::EC2::InternetGateway",
138            "Properties": {
139                "Tags": [{ "Key": "Name", "Value": format!("hydro-igw-{}", deployment_instance) }]
140            }
141        }),
142    );
143
144    resources.insert(
145        "VPCGatewayAttachment".to_string(),
146        json!({
147            "Type": "AWS::EC2::VPCGatewayAttachment",
148            "Properties": {
149                "VpcId": { "Ref": "VPC" },
150                "InternetGatewayId": { "Ref": "InternetGateway" }
151            }
152        }),
153    );
154
155    resources.insert(
156        "RouteTable".to_string(),
157        json!({
158            "Type": "AWS::EC2::RouteTable",
159            "Properties": {
160                "VpcId": { "Ref": "VPC" },
161                "Tags": [{ "Key": "Name", "Value": format!("hydro-rt-{}", deployment_instance) }]
162            }
163        }),
164    );
165
166    resources.insert(
167        "Route".to_string(),
168        json!({
169            "Type": "AWS::EC2::Route",
170            "DependsOn": "VPCGatewayAttachment",
171            "Properties": {
172                "RouteTableId": { "Ref": "RouteTable" },
173                "DestinationCidrBlock": "0.0.0.0/0",
174                "GatewayId": { "Ref": "InternetGateway" }
175            }
176        }),
177    );
178
179    resources.insert(
180        "SubnetRouteTableAssociation".to_string(),
181        json!({
182            "Type": "AWS::EC2::SubnetRouteTableAssociation",
183            "Properties": {
184                "SubnetId": { "Ref": "Subnet" },
185                "RouteTableId": { "Ref": "RouteTable" }
186            }
187        }),
188    );
189
190    resources.insert(
191        "SecurityGroup".to_string(),
192        json!({
193            "Type": "AWS::EC2::SecurityGroup",
194            "Properties": {
195                "GroupDescription": "Security group for Hydro ECS tasks",
196                "VpcId": { "Ref": "VPC" },
197                "SecurityGroupIngress": [{
198                    "IpProtocol": "-1",
199                    "CidrIp": "0.0.0.0/0"
200                }],
201                "Tags": [{ "Key": "Name", "Value": format!("hydro-sg-{}", deployment_instance) }]
202            }
203        }),
204    );
205
206    resources.insert(
207        "EcsCluster".to_string(),
208        json!({
209            "Type": "AWS::ECS::Cluster",
210            "Properties": {
211                "ClusterName": format!("hydro-{}", deployment_instance)
212            }
213        }),
214    );
215
216    // Generate task definitions and ECS services for each task
217    for task in tasks {
218        let safe_name = task.task_name.replace('-', "");
219
220        // Port mappings
221        let port_mappings: Vec<Value> = task
222            .exposed_ports
223            .iter()
224            .map(|p| {
225                json!({
226                    "ContainerPort": *p as i32,
227                    "Protocol": "tcp"
228                })
229            })
230            .collect();
231
232        // Task Definition
233        resources.insert(format!("TaskDef{}", safe_name), json!({
234            "Type": "AWS::ECS::TaskDefinition",
235            "Properties": {
236                "Family": task.task_name,
237                "RequiresCompatibilities": ["FARGATE"],
238                "NetworkMode": "awsvpc",
239                "Cpu": "256",
240                "Memory": "512",
241                "ExecutionRoleArn": { "Fn::GetAtt": ["EcsTaskExecutionRole", "Arn"] },
242                "TaskRoleArn": { "Fn::GetAtt": ["EcsTaskRole", "Arn"] },
243                "ContainerDefinitions": [{
244                    "Name": task.task_name,
245                    "Image": task.image_uri,
246                    "PortMappings": port_mappings,
247                    "LogConfiguration": {
248                        "LogDriver": "awslogs",
249                        "Options": {
250                            "awslogs-group": "/ecs/hydro",
251                            "awslogs-region": task.region,
252                            "awslogs-stream-prefix": "ecs",
253                            "awslogs-create-group": "true"
254                        }
255                    },
256                    "Environment": [
257                        { "Name": "CONTAINER_NAME", "Value": task.task_name },
258                        { "Name": "DEPLOYMENT_INSTANCE", "Value": task.deployment_instance },
259                        { "Name": "RUST_LOG", "Value": "trace,aws_runtime=info,aws_sdk_ecs=info,aws_smithy_runtime=info,aws_smithy_runtime_api=info,aws_config=info,hyper_util=info,aws_smithy_http_client=info,aws_sigv4=info" },
260                        { "Name": "RUST_BACKTRACE", "Value": "1" },
261                        { "Name": "NO_COLOR", "Value": "1" }
262                    ]
263                }]
264            }
265        }));
266
267        // ECS Service
268        resources.insert(
269            format!("EcsService{}", safe_name),
270            json!({
271                "Type": "AWS::ECS::Service",
272                "DependsOn": ["Route", format!("TaskDef{}", safe_name)],
273                "Properties": {
274                    "ServiceName": task.task_name,
275                    "Cluster": { "Ref": "EcsCluster" },
276                    "TaskDefinition": { "Ref": format!("TaskDef{}", safe_name) },
277                    "DesiredCount": 1,
278                    "LaunchType": "FARGATE",
279                    "NetworkConfiguration": {
280                        "AwsvpcConfiguration": {
281                            "Subnets": [{ "Ref": "Subnet" }],
282                            "SecurityGroups": [{ "Ref": "SecurityGroup" }],
283                            "AssignPublicIp": "ENABLED"
284                        }
285                    }
286                }
287            }),
288        );
289    }
290
291    let template = json!({
292        "AWSTemplateFormatVersion": "2010-09-09",
293        "Description": "Hydro ECS Infrastructure",
294        "Resources": resources,
295        "Outputs": {
296            "ClusterName": {
297                "Value": { "Ref": "EcsCluster" }
298            }
299        }
300    });
301
302    Ok(serde_json::to_string(&template)?)
303}
304
305#[instrument(level = "trace", skip_all, fields(%deployment_instance))]
306async fn deploy_stack(
307    deployment_instance: &str,
308    tasks: &[TaskConfig],
309) -> Result<String, anyhow::Error> {
310    use aws_config::BehaviorVersion;
311    use aws_sdk_cloudformation::Client as CfnClient;
312    use aws_sdk_cloudformation::types::{Capability, StackStatus};
313
314    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
315    let cfn_client = CfnClient::new(&config);
316
317    let stack_name = format!("hydro-{}", deployment_instance);
318    let template = generate_cloudformation_template(deployment_instance, tasks)?;
319
320    trace!(name: "creating_stack", %stack_name);
321    cfn_client
322        .create_stack()
323        .stack_name(&stack_name)
324        .template_body(&template)
325        .capabilities(Capability::CapabilityNamedIam)
326        .send()
327        .await?;
328
329    // Wait for stack creation
330    trace!(name: "waiting_for_stack", %stack_name);
331    loop {
332        let describe = cfn_client
333            .describe_stacks()
334            .stack_name(&stack_name)
335            .send()
336            .await?;
337
338        let stack = describe
339            .stacks()
340            .first()
341            .ok_or_else(|| anyhow::anyhow!("Stack not found"))?;
342
343        match stack.stack_status() {
344            Some(StackStatus::CreateComplete) => {
345                trace!(name: "stack_created", %stack_name);
346                break;
347            }
348            Some(StackStatus::CreateFailed)
349            | Some(StackStatus::RollbackComplete)
350            | Some(StackStatus::RollbackFailed) => {
351                return Err(anyhow::anyhow!(
352                    "Stack creation failed: {:?}",
353                    stack.stack_status_reason()
354                ));
355            }
356            status => {
357                trace!(name: "stack_status", ?status);
358                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
359            }
360        }
361    }
362
363    // Extract cluster name from outputs
364    let describe = cfn_client
365        .describe_stacks()
366        .stack_name(&stack_name)
367        .send()
368        .await?;
369
370    let stack = describe
371        .stacks()
372        .first()
373        .ok_or_else(|| anyhow::anyhow!("Stack not found"))?;
374
375    let cluster_name = stack
376        .outputs()
377        .iter()
378        .find(|o| o.output_key() == Some("ClusterName"))
379        .and_then(|o| o.output_value())
380        .map(|s| s.to_string())
381        .ok_or_else(|| anyhow::anyhow!("Missing ClusterName output"))?;
382
383    Ok(cluster_name)
384}
385use crate::compile::builder::ExternalPortId;
386use crate::compile::deploy::DeployResult;
387use crate::compile::deploy_provider::{
388    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
389};
390use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
391use crate::location::dynamic::LocationId;
392use crate::location::member_id::TaglessMemberId;
393use crate::location::{MembershipEvent, NetworkHint};
394
395/// represents a docker network
396#[derive(Clone, Debug)]
397pub struct DockerNetworkEcs {
398    _name: String,
399}
400
401impl DockerNetworkEcs {
402    /// creates a new docker network (will actually be created when deployment.start() is called).
403    pub fn new(name: String) -> Self {
404        Self {
405            _name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
406        }
407    }
408}
409
410/// Represents a process running in a docker container
411#[derive(Clone)]
412pub struct DockerDeployProcessEcs {
413    id: usize,
414    name: String,
415    next_port: Rc<RefCell<u16>>,
416    rust_crate: Rc<RefCell<Option<RustCrate>>>,
417
418    exposed_ports: Rc<RefCell<Vec<u16>>>,
419
420    docker_container_name: Rc<RefCell<Option<String>>>,
421
422    compilation_options: Option<String>,
423
424    config: Vec<String>,
425
426    network: DockerNetworkEcs,
427}
428
429impl Node for DockerDeployProcessEcs {
430    type Port = u16;
431    type Meta = ();
432    type InstantiateEnv = DockerDeployEcs;
433
434    #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
435    fn next_port(&self) -> Self::Port {
436        let port = {
437            let mut borrow = self.next_port.borrow_mut();
438            let port = *borrow;
439            *borrow += 1;
440            port
441        };
442
443        port
444    }
445
446    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
447    fn update_meta(&self, _meta: &Self::Meta) {}
448
449    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
450    fn instantiate(
451        &self,
452        _env: &mut Self::InstantiateEnv,
453        meta: &mut Self::Meta,
454        graph: DfirGraph,
455        extra_stmts: Vec<syn::Stmt>,
456    ) {
457        let (bin_name, config) = create_graph_trybuild(
458            graph,
459            extra_stmts.clone(),
460            &Some(self.name.clone()),
461            true,
462            LinkingMode::Static,
463        );
464
465        let mut ret = RustCrate::new(config.project_dir)
466            .target_dir(config.target_dir)
467            .example(bin_name.clone())
468            .no_default_features();
469
470        ret = ret.display_name("test_display_name");
471
472        ret = ret.features(vec!["hydro___feature_ecs_runtime".to_string()]);
473
474        if let Some(features) = config.features {
475            ret = ret.features(features);
476        }
477
478        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
479        ret = ret.config("build.incremental = false");
480
481        *self.rust_crate.borrow_mut() = Some(ret);
482    }
483}
484
485/// Represents a logical cluster, which can be a variable amount of individual containers.
486#[derive(Clone)]
487pub struct DockerDeployClusterEcs {
488    id: usize,
489    name: String,
490    next_port: Rc<RefCell<u16>>,
491    rust_crate: Rc<RefCell<Option<RustCrate>>>,
492
493    docker_container_name: Rc<RefCell<Vec<String>>>,
494
495    compilation_options: Option<String>,
496
497    config: Vec<String>,
498
499    count: usize,
500}
501
502impl Node for DockerDeployClusterEcs {
503    type Port = u16;
504    type Meta = ();
505    type InstantiateEnv = DockerDeployEcs;
506
507    #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
508    fn next_port(&self) -> Self::Port {
509        let port = {
510            let mut borrow = self.next_port.borrow_mut();
511            let port = *borrow;
512            *borrow += 1;
513            port
514        };
515
516        port
517    }
518
519    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
520    fn update_meta(&self, _meta: &Self::Meta) {}
521
522    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, extra_stmts = extra_stmts.len()))]
523    fn instantiate(
524        &self,
525        _env: &mut Self::InstantiateEnv,
526        _meta: &mut Self::Meta,
527        graph: DfirGraph,
528        extra_stmts: Vec<syn::Stmt>,
529    ) {
530        let (bin_name, config) = create_graph_trybuild(
531            graph,
532            extra_stmts.clone(),
533            &Some(self.name.clone()),
534            true,
535            LinkingMode::Static,
536        );
537
538        let mut ret = RustCrate::new(config.project_dir)
539            .target_dir(config.target_dir)
540            .example(bin_name.clone())
541            .no_default_features();
542
543        ret = ret.display_name("test_display_name");
544
545        ret = ret.features(vec!["hydro___feature_ecs_runtime".to_string()]);
546
547        if let Some(features) = config.features {
548            ret = ret.features(features);
549        }
550
551        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
552        ret = ret.config("build.incremental = false");
553
554        *self.rust_crate.borrow_mut() = Some(ret);
555    }
556}
557
558/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
559#[derive(Clone, Debug)]
560pub struct DockerDeployExternalEcs {
561    name: String,
562    next_port: Rc<RefCell<u16>>,
563
564    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
565
566    #[expect(clippy::type_complexity, reason = "internal code")]
567    connection_info:
568        Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetworkEcs)>>>,
569
570    deployment_instance: String,
571}
572
573impl Node for DockerDeployExternalEcs {
574    type Port = u16;
575    type Meta = ();
576    type InstantiateEnv = DockerDeployEcs;
577
578    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
579    fn next_port(&self) -> Self::Port {
580        let port = {
581            let mut borrow = self.next_port.borrow_mut();
582            let port = *borrow;
583            *borrow += 1;
584            port
585        };
586
587        port
588    }
589
590    #[instrument(level = "trace", skip_all, fields(name = self.name))]
591    fn update_meta(&self, _meta: &Self::Meta) {}
592
593    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
594    fn instantiate(
595        &self,
596        _env: &mut Self::InstantiateEnv,
597        meta: &mut Self::Meta,
598        graph: DfirGraph,
599        extra_stmts: Vec<syn::Stmt>,
600    ) {
601        trace!(name: "surface", surface = graph.surface_syntax_string());
602    }
603}
604
605type DynSourceSink<Out, In, InErr> = (
606    Pin<Box<dyn Stream<Item = Out>>>,
607    Pin<Box<dyn Sink<In, Error = InErr>>>,
608);
609
610impl<'a> RegisterPort<'a, DockerDeployEcs> for DockerDeployExternalEcs {
611    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
612    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
613        self.ports.borrow_mut().insert(external_port_id, port);
614    }
615
616    fn as_bytes_bidi(
617        &self,
618        external_port_id: ExternalPortId,
619    ) -> impl Future<
620        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
621    > + 'a {
622        let _span =
623            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered(); // the instrument macro doesn't work here because of lifetime issues?
624        async { todo!() }
625    }
626
627    fn as_bincode_bidi<InT, OutT>(
628        &self,
629        external_port_id: ExternalPortId,
630    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
631    where
632        InT: serde::Serialize + 'static,
633        OutT: serde::de::DeserializeOwned + 'static,
634    {
635        let _span =
636            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered(); // the instrument macro doesn't work here because of lifetime issues?
637        async { todo!() }
638    }
639
640    fn as_bincode_sink<T>(
641        &self,
642        external_port_id: ExternalPortId,
643    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
644    where
645        T: serde::Serialize + 'static,
646    {
647        let guard =
648            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
649
650        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
651        let (docker_container_name, remote_port, _network) = self
652            .connection_info
653            .borrow()
654            .get(&local_port)
655            .unwrap()
656            .clone();
657        let deployment_instance = self.deployment_instance.clone();
658
659        async move {
660            use aws_config::BehaviorVersion;
661            use aws_sdk_ecs::Client as EcsClient;
662
663            let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
664            let ecs_client = EcsClient::new(&config);
665
666            let task_name = docker_container_name.borrow().as_ref().unwrap().clone();
667            trace!(name: "query_ecs", %task_name);
668
669            let cluster_name = format!("hydro-{}", deployment_instance);
670
671            let task_arn = loop {
672                let tasks = ecs_client
673                    .list_tasks()
674                    .cluster(&cluster_name)
675                    .family(&task_name)
676                    .send()
677                    .await
678                    .unwrap();
679
680                if let Some(arn) = tasks.task_arns().first() {
681                    break arn.clone();
682                }
683
684                trace!(name: "waiting_for_task", %task_name);
685                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
686            };
687            trace!(name: "found_task", %task_arn);
688
689            let eni_id = loop {
690                let task_details = ecs_client
691                    .describe_tasks()
692                    .cluster(&cluster_name)
693                    .tasks(&task_arn)
694                    .send()
695                    .await
696                    .unwrap();
697
698                if let Some(eni) = task_details.tasks().first().and_then(|task| {
699                    task.attachments()
700                        .iter()
701                        .flat_map(|a| a.details())
702                        .find(|d| d.name() == Some("networkInterfaceId"))
703                        .and_then(|d| d.value())
704                }) {
705                    break eni.to_string();
706                }
707
708                trace!(name: "waiting_for_eni", %task_arn);
709                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
710            };
711            trace!(name: "found_eni", %eni_id);
712
713            let ec2_client = aws_sdk_ec2::Client::new(&config);
714
715            let remote_ip_address = loop {
716                let eni_info = ec2_client
717                    .describe_network_interfaces()
718                    .network_interface_ids(&eni_id)
719                    .send()
720                    .await
721                    .unwrap();
722
723                if let Some(ip) = eni_info
724                    .network_interfaces()
725                    .first()
726                    .and_then(|ni| ni.association())
727                    .and_then(|assoc| assoc.public_ip())
728                {
729                    break ip.to_string();
730                }
731
732                trace!(name: "waiting_for_public_ip", %eni_id);
733                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
734            };
735            trace!(name: "resolved_ip", %remote_ip_address);
736
737            Box::pin(
738                LazySink::new(move || {
739                    Box::pin(async move {
740                        trace!(name: "connecting", %remote_ip_address, %remote_port);
741
742                        let stream =
743                            TcpStream::connect(format!("{remote_ip_address}:{remote_port}"))
744                                .await?;
745
746                        trace!(name: "connected", %remote_ip_address, %remote_port);
747
748                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
749                            stream,
750                            LengthDelimitedCodec::new(),
751                        ))
752                    })
753                })
754                .with(move |v| async move { Ok(Bytes::from(bincode::serialize(&v).unwrap())) }),
755            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
756        }
757        .instrument(guard.exit())
758    }
759
760    fn as_bincode_source<T>(
761        &self,
762        external_port_id: ExternalPortId,
763    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
764    where
765        T: serde::de::DeserializeOwned + 'static,
766    {
767        let guard =
768            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
769
770        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
771        let (docker_container_name, remote_port, _network) = self
772            .connection_info
773            .borrow()
774            .get(&local_port)
775            .unwrap()
776            .clone();
777        let deployment_instance = self.deployment_instance.clone();
778
779        async move {
780            use aws_config::BehaviorVersion;
781            use aws_sdk_ecs::Client as EcsClient;
782
783            let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
784            let ecs_client = EcsClient::new(&config);
785
786            let task_name = docker_container_name.borrow().as_ref().unwrap().clone();
787            trace!(name: "query_ecs", %task_name);
788
789            let cluster_name = format!("hydro-{}", deployment_instance);
790
791            let task_arn = loop {
792                let tasks = ecs_client
793                    .list_tasks()
794                    .cluster(&cluster_name)
795                    .family(&task_name)
796                    .send()
797                    .await
798                    .unwrap();
799
800                if let Some(arn) = tasks.task_arns().first() {
801                    break arn.clone();
802                }
803
804                trace!(name: "waiting_for_task", %task_name);
805                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
806            };
807            trace!(name: "found_task", %task_arn);
808
809            let eni_id = loop {
810                let task_details = ecs_client
811                    .describe_tasks()
812                    .cluster(&cluster_name)
813                    .tasks(&task_arn)
814                    .send()
815                    .await
816                    .unwrap();
817
818                if let Some(eni) = task_details.tasks().first().and_then(|task| {
819                    task.attachments()
820                        .iter()
821                        .flat_map(|a| a.details())
822                        .find(|d| d.name() == Some("networkInterfaceId"))
823                        .and_then(|d| d.value())
824                }) {
825                    break eni.to_string();
826                }
827
828                trace!(name: "waiting_for_eni", %task_arn);
829                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
830            };
831            trace!(name: "found_eni", %eni_id);
832
833            let ec2_client = aws_sdk_ec2::Client::new(&config);
834
835            let remote_ip_address = loop {
836                let eni_info = ec2_client
837                    .describe_network_interfaces()
838                    .network_interface_ids(&eni_id)
839                    .send()
840                    .await
841                    .unwrap();
842
843                if let Some(ip) = eni_info
844                    .network_interfaces()
845                    .first()
846                    .and_then(|ni| ni.association())
847                    .and_then(|assoc| assoc.public_ip())
848                {
849                    break ip.to_string();
850                }
851
852                trace!(name: "waiting_for_public_ip", %eni_id);
853                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
854            };
855            trace!(name: "resolved_ip", %remote_ip_address);
856
857            trace!(name: "connecting", %remote_ip_address, %remote_port);
858
859            let stream = TcpStream::connect(format!("{remote_ip_address}:{remote_port}"))
860                .await
861                .unwrap();
862
863            trace!(name: "connected", %remote_ip_address, %remote_port);
864
865            Box::pin(
866                FramedRead::new(stream, LengthDelimitedCodec::new())
867                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
868            ) as Pin<Box<dyn Stream<Item = T>>>
869        }
870        .instrument(guard.exit())
871    }
872}
873
874/// For deploying to a local docker instance
875pub struct DockerDeployEcs {
876    docker_processes: Vec<DockerDeployProcessSpecEcs>,
877    docker_clusters: Vec<DockerDeployClusterSpecEcs>,
878    network: DockerNetworkEcs,
879    deployment_instance: String,
880}
881
882#[instrument(level = "trace", skip_all, fields(%image_name))]
883async fn build_and_create_image(
884    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
885    compilation_options: &Option<String>,
886    config: &[String],
887    image_name: &str,
888) -> Result<(), anyhow::Error> {
889    let mut rust_crate = rust_crate
890        .borrow_mut()
891        .take()
892        .unwrap()
893        .rustflags(compilation_options.clone().unwrap_or("".to_string()));
894
895    for cfg in config {
896        rust_crate = rust_crate.config(cfg);
897    }
898
899    let build_output = match build_crate_memoized(
900        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Glibc)),
901    )
902    .await
903    {
904        Ok(build_output) => build_output,
905        Err(BuildError::FailedToBuildCrate {
906            exit_status,
907            diagnostics,
908            text_lines,
909            stderr_lines,
910        }) => {
911            let diagnostics = diagnostics
912                .into_iter()
913                .map(|d| d.rendered.unwrap())
914                .collect::<Vec<_>>()
915                .join("\n");
916            let text_lines = text_lines.join("\n");
917            let stderr_lines = stderr_lines.join("\n");
918
919            anyhow::bail!(
920                r#"
921Failed to build crate {exit_status:?}
922--- diagnostics
923---
924{diagnostics}
925---
926---
927---
928
929--- text_lines
930---
931---
932{text_lines}
933---
934---
935---
936
937--- stderr_lines
938---
939---
940{stderr_lines}
941---
942---
943---"#
944            );
945        }
946        Err(err) => {
947            anyhow::bail!("Failed to build crate {err:?}");
948        }
949    };
950
951    let docker = Docker::connect_with_local_defaults()?;
952
953    let mut tar_data = Vec::new();
954    {
955        let mut tar = Builder::new(&mut tar_data);
956
957        let dockerfile_content = br#"
958                    FROM debian:trixie-slim
959                    RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
960                    COPY app /app
961                    CMD ["/app"]
962                "#;
963        let mut header = Header::new_gnu();
964        header.set_path("Dockerfile")?;
965        header.set_size(dockerfile_content.len() as u64);
966        header.set_cksum();
967        tar.append(&header, &dockerfile_content[..])?;
968
969        let mut header = Header::new_gnu();
970        header.set_path("app")?;
971        header.set_size(build_output.bin_data.len() as u64);
972        header.set_mode(0o755);
973        header.set_cksum();
974        tar.append(&header, &build_output.bin_data[..])?;
975
976        tar.finish()?;
977    }
978
979    let build_options = BuildImageOptions {
980        dockerfile: "Dockerfile".to_owned(),
981        t: Some(image_name.to_string()),
982        rm: true,
983        ..Default::default()
984    };
985
986    use bollard::errors::Error;
987
988    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
989    let mut build_stream = docker.build_image(build_options, None, Some(body));
990    while let Some(msg) = build_stream.next().await {
991        match msg {
992            Ok(_) => {}
993            Err(e) => match e {
994                Error::DockerStreamError { error } => {
995                    return Err(anyhow::anyhow!(
996                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
997                    ));
998                }
999                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
1000            },
1001        }
1002    }
1003
1004    Ok(())
1005}
1006
1007#[instrument(level = "trace", skip_all, fields(%image_name))]
1008async fn upload_image_to_ecr(docker: &Docker, image_name: &str) -> Result<(), anyhow::Error> {
1009    use aws_config::BehaviorVersion;
1010    use aws_sdk_ecr::Client as EcrClient;
1011    use base64::Engine;
1012    use bollard::auth::DockerCredentials;
1013    use bollard::query_parameters::PushImageOptions;
1014
1015    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1016    let ecr_client = EcrClient::new(&config);
1017
1018    // Create the ECR repository if it doesn't exist
1019    match ecr_client
1020        .create_repository()
1021        .repository_name(image_name)
1022        .send()
1023        .await
1024    {
1025        Ok(_) => trace!(name: "repository_created", %image_name),
1026        Err(error) => {
1027            // Repository might already exist, which is fine
1028            trace!(name: "repository_creation_result", ?error);
1029        }
1030    }
1031
1032    let auth_response = ecr_client.get_authorization_token().send().await?;
1033
1034    let auth_token = auth_response
1035        .authorization_data()
1036        .first()
1037        .ok_or_else(|| anyhow::anyhow!("No ECR authorization data"))?;
1038
1039    let endpoint = auth_token
1040        .proxy_endpoint()
1041        .ok_or_else(|| anyhow::anyhow!("No ECR endpoint"))?;
1042    let token = auth_token
1043        .authorization_token()
1044        .ok_or_else(|| anyhow::anyhow!("No ECR token"))?;
1045
1046    let decoded = String::from_utf8(base64::prelude::BASE64_STANDARD.decode(token)?)?;
1047    let (username, password) = decoded
1048        .split_once(':')
1049        .ok_or_else(|| anyhow::anyhow!("Invalid ECR token format"))?;
1050
1051    let registry = endpoint.trim_start_matches("https://");
1052    let ecr_image_name = format!("{registry}/{image_name}");
1053
1054    // Create the ECR repository if it doesn't exist
1055    match ecr_client
1056        .create_repository()
1057        .repository_name(image_name)
1058        .send()
1059        .await
1060    {
1061        Ok(_) => {}
1062        Err(e) => {
1063            // Ignore "RepositoryAlreadyExistsException" error
1064            let is_already_exists = e
1065                .as_service_error()
1066                .map(|se| se.is_repository_already_exists_exception())
1067                .unwrap_or(false);
1068            if !is_already_exists {
1069                return Err(anyhow::anyhow!("Failed to create ECR repository: {}", e));
1070            }
1071        }
1072    }
1073
1074    docker
1075        .tag_image(
1076            image_name,
1077            Some(TagImageOptions {
1078                repo: Some(ecr_image_name.clone()),
1079                ..Default::default()
1080            }),
1081        )
1082        .await?;
1083
1084    let mut push_stream = docker.push_image(
1085        &ecr_image_name,
1086        Some(PushImageOptions {
1087            ..Default::default()
1088        }),
1089        Some(DockerCredentials {
1090            username: Some(username.to_string()),
1091            password: Some(password.to_string()),
1092            ..Default::default()
1093        }),
1094    );
1095
1096    while let Some(msg) = push_stream.next().await {
1097        match msg {
1098            Ok(_) => {}
1099            Err(e) => match e {
1100                bollard::errors::Error::DockerStreamError { error } => {
1101                    return Err(anyhow::anyhow!(
1102                        "Docker push failed: DockerStreamError: {{ error: {error} }}"
1103                    ));
1104                }
1105                _ => return Err(anyhow::anyhow!("Docker push failed: {}", e)),
1106            },
1107        }
1108        msg?;
1109    }
1110
1111    Ok(())
1112}
1113
1114impl DockerDeployEcs {
1115    /// Create a new deployment
1116    pub fn new(network: DockerNetworkEcs) -> Self {
1117        Self {
1118            docker_processes: Vec::new(),
1119            docker_clusters: Vec::new(),
1120            network,
1121            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
1122        }
1123    }
1124
1125    /// Add an internal docker service to the deployment.
1126    pub fn add_localhost_docker(
1127        &mut self,
1128        compilation_options: Option<String>,
1129        config: Vec<String>,
1130    ) -> DockerDeployProcessSpecEcs {
1131        let process = DockerDeployProcessSpecEcs {
1132            compilation_options,
1133            config,
1134            network: self.network.clone(),
1135            deployment_instance: self.deployment_instance.clone(),
1136        };
1137
1138        self.docker_processes.push(process.clone());
1139
1140        process
1141    }
1142
1143    /// Add an internal docker cluster to the deployment.
1144    pub fn add_localhost_docker_cluster(
1145        &mut self,
1146        compilation_options: Option<String>,
1147        config: Vec<String>,
1148        count: usize,
1149    ) -> DockerDeployClusterSpecEcs {
1150        let cluster = DockerDeployClusterSpecEcs {
1151            compilation_options,
1152            config,
1153            count,
1154            deployment_instance: self.deployment_instance.clone(),
1155        };
1156
1157        self.docker_clusters.push(cluster.clone());
1158
1159        cluster
1160    }
1161
1162    /// Add an external process to the deployment.
1163    pub fn add_external(&self, name: String) -> DockerDeployExternalSpecEcs {
1164        DockerDeployExternalSpecEcs {
1165            name,
1166            deployment_instance: self.deployment_instance.clone(),
1167        }
1168    }
1169
1170    /// Get the deployment instance from this deployment.
1171    pub fn get_deployment_instance(&self) -> String {
1172        self.deployment_instance.clone()
1173    }
1174
1175    /// Create docker images.
1176    #[instrument(level = "trace", skip_all)]
1177    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1178        for (_, _, process) in nodes.get_all_processes() {
1179            build_and_create_image(
1180                &process.rust_crate,
1181                &process.compilation_options,
1182                &process.config,
1183                &process.name,
1184            )
1185            .await?;
1186        }
1187
1188        for (_, _, cluster) in nodes.get_all_clusters() {
1189            build_and_create_image(
1190                &cluster.rust_crate,
1191                &cluster.compilation_options,
1192                &cluster.config,
1193                &cluster.name,
1194            )
1195            .await?;
1196        }
1197
1198        // upload created docker images to amazon aws ECR
1199        let docker = Docker::connect_with_local_defaults()?;
1200
1201        for (_, _, process) in nodes.get_all_processes() {
1202            upload_image_to_ecr(&docker, &process.name).await?;
1203        }
1204
1205        for (_, _, cluster) in nodes.get_all_clusters() {
1206            upload_image_to_ecr(&docker, &cluster.name).await?;
1207        }
1208
1209        Ok(())
1210    }
1211
1212    /// Start the deployment, create ECS tasks from the provisioned images.
1213    #[instrument(level = "trace", skip_all)]
1214    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1215        use aws_config::BehaviorVersion;
1216        use aws_sdk_ecs::Client as EcsClient;
1217
1218        let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1219        let region = config.region().unwrap().to_string();
1220
1221        // Get ECR registry URL
1222        let ecr_client = aws_sdk_ecr::Client::new(&config);
1223        let auth_response = ecr_client.get_authorization_token().send().await?;
1224        let auth_token = auth_response
1225            .authorization_data()
1226            .first()
1227            .ok_or_else(|| anyhow::anyhow!("No ECR authorization data"))?;
1228        let endpoint = auth_token
1229            .proxy_endpoint()
1230            .ok_or_else(|| anyhow::anyhow!("No ECR endpoint"))?;
1231        let registry = endpoint.trim_start_matches("https://");
1232
1233        // Build task configurations for CloudFormation template
1234        let mut tasks = Vec::new();
1235
1236        for (_, _, process) in nodes.get_all_processes() {
1237            let task_name = get_docker_container_name(&process.name, None);
1238            *process.docker_container_name.borrow_mut() = Some(task_name.clone());
1239
1240            let image_uri = format!("{registry}/{}", process.name);
1241            let exposed_ports = process.exposed_ports.borrow().clone();
1242
1243            tasks.push(TaskConfig {
1244                task_name,
1245                image_uri,
1246                deployment_instance: self.deployment_instance.clone(),
1247                exposed_ports,
1248                region: region.clone(),
1249            });
1250        }
1251
1252        for (_, _, cluster) in nodes.get_all_clusters() {
1253            let image_uri = format!("{registry}/{}", cluster.name);
1254
1255            for num in 0..cluster.count {
1256                let task_name = get_docker_container_name(&cluster.name, Some(num));
1257                cluster
1258                    .docker_container_name
1259                    .borrow_mut()
1260                    .push(task_name.clone());
1261
1262                tasks.push(TaskConfig {
1263                    task_name,
1264                    image_uri: image_uri.clone(),
1265                    deployment_instance: self.deployment_instance.clone(),
1266                    exposed_ports: vec![],
1267                    region: region.clone(),
1268                });
1269            }
1270        }
1271
1272        // Deploy everything via CloudFormation
1273        let cluster_name = deploy_stack(&self.deployment_instance, &tasks).await?;
1274        trace!(name: "stack_deployed", %cluster_name);
1275
1276        // Wait for all services to have running tasks
1277        let ecs_client = EcsClient::new(&config);
1278        trace!("waiting_for_services_to_stabilize");
1279
1280        for task in &tasks {
1281            trace!(name: "waiting_for_service", service_name = %task.task_name);
1282            loop {
1283                let services = ecs_client
1284                    .describe_services()
1285                    .cluster(&cluster_name)
1286                    .services(&task.task_name)
1287                    .send()
1288                    .await?;
1289
1290                if let Some(service) = services.services().first() {
1291                    let desired = service.desired_count();
1292                    let running = service.running_count();
1293                    if running >= desired && desired > 0 {
1294                        trace!(name: "service_running", service_name = %task.task_name, %running, %desired);
1295                        break;
1296                    }
1297                }
1298                tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1299            }
1300        }
1301
1302        Ok(())
1303    }
1304
1305    /// Stop the deployment, destroy all containers
1306    #[instrument(level = "trace", skip_all)]
1307    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1308        use aws_config::BehaviorVersion;
1309        use aws_sdk_ecs::Client as EcsClient;
1310
1311        let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1312        let ecs_client = EcsClient::new(&config);
1313        let cluster_name = format!("hydro-{}", self.deployment_instance);
1314
1315        // Stop ECS services by setting desired count to 0
1316        for (_, _, process) in nodes.get_all_processes() {
1317            let service_name = get_docker_container_name(&process.name, None);
1318            let _ = ecs_client
1319                .update_service()
1320                .cluster(&cluster_name)
1321                .service(&service_name)
1322                .desired_count(0)
1323                .send()
1324                .await;
1325        }
1326
1327        for (_, _, cluster) in nodes.get_all_clusters() {
1328            for num in 0..cluster.count {
1329                let service_name = get_docker_container_name(&cluster.name, Some(num));
1330                let _ = ecs_client
1331                    .update_service()
1332                    .cluster(&cluster_name)
1333                    .service(&service_name)
1334                    .desired_count(0)
1335                    .send()
1336                    .await;
1337            }
1338        }
1339
1340        Ok(())
1341    }
1342
1343    /// remove containers, images, and networks.
1344    #[instrument(level = "trace", skip_all)]
1345    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1346        use aws_config::BehaviorVersion;
1347        use aws_sdk_cloudformation::types::StackStatus;
1348
1349        let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1350
1351        // Delete CloudFormation stack first (handles VPC, IAM, ECS, Cloud Map)
1352        let cfn_client = aws_sdk_cloudformation::Client::new(&config);
1353        let stack_name = format!("hydro-{}", self.deployment_instance);
1354        let _ = cfn_client
1355            .delete_stack()
1356            .stack_name(&stack_name)
1357            .send()
1358            .await;
1359        trace!(name: "stack_deletion_initiated", %stack_name);
1360
1361        // Wait for stack deletion to complete
1362        while let Ok(resp) = cfn_client
1363            .describe_stacks()
1364            .stack_name(&stack_name)
1365            .send()
1366            .await
1367        {
1368            if let Some(stack) = resp.stacks().first() {
1369                match stack.stack_status() {
1370                    Some(StackStatus::DeleteComplete) => break,
1371                    Some(StackStatus::DeleteFailed) => {
1372                        trace!(name: "stack_deletion_failed", %stack_name);
1373                        break;
1374                    }
1375                    _ => {
1376                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1377                    }
1378                }
1379            } else {
1380                break;
1381            }
1382        }
1383        trace!(name: "stack_deleted", %stack_name);
1384
1385        // Delete ECR repositories after stack is deleted
1386        let ecr_client = aws_sdk_ecr::Client::new(&config);
1387        for (_, _, process) in nodes.get_all_processes() {
1388            let _ = ecr_client
1389                .delete_repository()
1390                .repository_name(&process.name)
1391                .force(true)
1392                .send()
1393                .await;
1394        }
1395        for (_, _, cluster) in nodes.get_all_clusters() {
1396            let _ = ecr_client
1397                .delete_repository()
1398                .repository_name(&cluster.name)
1399                .force(true)
1400                .send()
1401                .await;
1402        }
1403
1404        Ok(())
1405    }
1406}
1407
1408impl<'a> Deploy<'a> for DockerDeployEcs {
1409    type Meta = ();
1410    type InstantiateEnv = Self;
1411
1412    type Process = DockerDeployProcessEcs;
1413    type Cluster = DockerDeployClusterEcs;
1414    type External = DockerDeployExternalEcs;
1415
1416    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
1417    fn o2o_sink_source(
1418        p1: &Self::Process,
1419        p1_port: &<Self::Process as Node>::Port,
1420        p2: &Self::Process,
1421        p2_port: &<Self::Process as Node>::Port,
1422    ) -> (syn::Expr, syn::Expr) {
1423        // Pass container name for ECS API resolution
1424        deploy_containerized_o2o(&p2.name, *p2_port)
1425    }
1426
1427    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
1428    fn o2o_connect(
1429        p1: &Self::Process,
1430        p1_port: &<Self::Process as Node>::Port,
1431        p2: &Self::Process,
1432        p2_port: &<Self::Process as Node>::Port,
1433    ) -> Box<dyn FnOnce()> {
1434        let serialized = format!(
1435            "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1436            p1.name, p2.name
1437        );
1438
1439        Box::new(move || {
1440            trace!(name: "o2o_connect thunk", %serialized);
1441        })
1442    }
1443
1444    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1445    fn o2m_sink_source(
1446        p1: &Self::Process,
1447        p1_port: &<Self::Process as Node>::Port,
1448        c2: &Self::Cluster,
1449        c2_port: &<Self::Cluster as Node>::Port,
1450    ) -> (syn::Expr, syn::Expr) {
1451        deploy_containerized_o2m(*c2_port)
1452    }
1453
1454    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1455    fn o2m_connect(
1456        p1: &Self::Process,
1457        p1_port: &<Self::Process as Node>::Port,
1458        c2: &Self::Cluster,
1459        c2_port: &<Self::Cluster as Node>::Port,
1460    ) -> Box<dyn FnOnce()> {
1461        let serialized = format!(
1462            "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
1463            p1.name, c2.name
1464        );
1465
1466        Box::new(move || {
1467            trace!(name: "o2m_connect thunk", %serialized);
1468        })
1469    }
1470
1471    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1472    fn m2o_sink_source(
1473        c1: &Self::Cluster,
1474        c1_port: &<Self::Cluster as Node>::Port,
1475        p2: &Self::Process,
1476        p2_port: &<Self::Process as Node>::Port,
1477    ) -> (syn::Expr, syn::Expr) {
1478        // Pass container name for ECS API resolution
1479        deploy_containerized_m2o(*p2_port, &p2.name)
1480    }
1481
1482    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1483    fn m2o_connect(
1484        c1: &Self::Cluster,
1485        c1_port: &<Self::Cluster as Node>::Port,
1486        p2: &Self::Process,
1487        p2_port: &<Self::Process as Node>::Port,
1488    ) -> Box<dyn FnOnce()> {
1489        let serialized = format!(
1490            "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
1491            c1.name, p2.name
1492        );
1493
1494        Box::new(move || {
1495            trace!(name: "m2o_connect thunk", %serialized);
1496        })
1497    }
1498
1499    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1500    fn m2m_sink_source(
1501        c1: &Self::Cluster,
1502        c1_port: &<Self::Cluster as Node>::Port,
1503        c2: &Self::Cluster,
1504        c2_port: &<Self::Cluster as Node>::Port,
1505    ) -> (syn::Expr, syn::Expr) {
1506        deploy_containerized_m2m(*c2_port)
1507    }
1508
1509    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1510    fn m2m_connect(
1511        c1: &Self::Cluster,
1512        c1_port: &<Self::Cluster as Node>::Port,
1513        c2: &Self::Cluster,
1514        c2_port: &<Self::Cluster as Node>::Port,
1515    ) -> Box<dyn FnOnce()> {
1516        let serialized = format!(
1517            "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
1518            c1.name, c2.name
1519        );
1520
1521        Box::new(move || {
1522            trace!(name: "m2m_connect thunk", %serialized);
1523        })
1524    }
1525
1526    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, ?codec_type, %shared_handle, extra_stmts = extra_stmts.len()))]
1527    fn e2o_many_source(
1528        extra_stmts: &mut Vec<syn::Stmt>,
1529        p2: &Self::Process,
1530        p2_port: &<Self::Process as Node>::Port,
1531        codec_type: &syn::Type,
1532        shared_handle: String,
1533    ) -> syn::Expr {
1534        todo!()
1535    }
1536
1537    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1538    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1539        todo!()
1540    }
1541
1542    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
1543    fn e2o_source(
1544        extra_stmts: &mut Vec<syn::Stmt>,
1545        p1: &Self::External,
1546        p1_port: &<Self::External as Node>::Port,
1547        p2: &Self::Process,
1548        p2_port: &<Self::Process as Node>::Port,
1549        codec_type: &syn::Type,
1550        shared_handle: String,
1551    ) -> syn::Expr {
1552        p1.connection_info.borrow_mut().insert(
1553            *p1_port,
1554            (
1555                p2.docker_container_name.clone(),
1556                *p2_port,
1557                p2.network.clone(),
1558            ),
1559        );
1560
1561        p2.exposed_ports.borrow_mut().push(*p2_port);
1562
1563        let socket_ident = syn::Ident::new(
1564            &format!("__hydro_deploy_{}_socket", &shared_handle),
1565            Span::call_site(),
1566        );
1567
1568        let source_ident = syn::Ident::new(
1569            &format!("__hydro_deploy_{}_source", &shared_handle),
1570            Span::call_site(),
1571        );
1572
1573        let sink_ident = syn::Ident::new(
1574            &format!("__hydro_deploy_{}_sink", &shared_handle),
1575            Span::call_site(),
1576        );
1577
1578        let bind_addr = format!("0.0.0.0:{}", p2_port);
1579
1580        extra_stmts.push(syn::parse_quote! {
1581            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1582        });
1583
1584        let create_expr =
1585            deploy_containerized_external_sink_source_ident(bind_addr, socket_ident.clone());
1586
1587        extra_stmts.push(syn::parse_quote! {
1588            let (#sink_ident, #source_ident) = (#create_expr).split();
1589        });
1590
1591        parse_quote!(#source_ident)
1592    }
1593
1594    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1595    fn e2o_connect(
1596        p1: &Self::External,
1597        p1_port: &<Self::External as Node>::Port,
1598        p2: &Self::Process,
1599        p2_port: &<Self::Process as Node>::Port,
1600        many: bool,
1601        server_hint: NetworkHint,
1602    ) -> Box<dyn FnOnce()> {
1603        let serialized = format!(
1604            "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1605            p1.name, p2.name
1606        );
1607
1608        Box::new(move || {
1609            trace!(name: "e2o_connect thunk", %serialized);
1610        })
1611    }
1612
1613    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1614    fn o2e_sink(
1615        p1: &Self::Process,
1616        p1_port: &<Self::Process as Node>::Port,
1617        p2: &Self::External,
1618        p2_port: &<Self::External as Node>::Port,
1619        shared_handle: String,
1620    ) -> syn::Expr {
1621        let sink_ident = syn::Ident::new(
1622            &format!("__hydro_deploy_{}_sink", &shared_handle),
1623            Span::call_site(),
1624        );
1625        parse_quote!(#sink_ident)
1626    }
1627
1628    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1629    fn cluster_ids(
1630        of_cluster: usize,
1631    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1632        cluster_ids()
1633    }
1634
1635    #[instrument(level = "trace", skip_all)]
1636    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1637        cluster_self_id()
1638    }
1639
1640    #[instrument(level = "trace", skip_all, fields(?location_id))]
1641    fn cluster_membership_stream(
1642        location_id: &LocationId,
1643    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1644    {
1645        cluster_membership_stream(location_id)
1646    }
1647}
1648
1649const CONTAINER_ALPHABET: [char; 36] = [
1650    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1651    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1652];
1653
1654#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location, %deployment_instance))]
1655fn get_docker_image_name(name_hint: &str, location: usize, deployment_instance: &str) -> String {
1656    let name_hint = name_hint
1657        .split("::")
1658        .last()
1659        .unwrap()
1660        .to_string()
1661        .to_ascii_lowercase()
1662        .replace(".", "-")
1663        .replace("_", "-")
1664        .replace("::", "-");
1665
1666    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1667
1668    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location}")
1669}
1670
1671#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1672fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1673    if let Some(instance) = instance {
1674        format!("{image_name}-{instance}")
1675    } else {
1676        image_name.to_string()
1677    }
1678}
1679/// Represents a Process running in a docker container
1680#[derive(Clone)]
1681pub struct DockerDeployProcessSpecEcs {
1682    compilation_options: Option<String>,
1683    config: Vec<String>,
1684    network: DockerNetworkEcs,
1685    deployment_instance: String,
1686}
1687
1688impl<'a> ProcessSpec<'a, DockerDeployEcs> for DockerDeployProcessSpecEcs {
1689    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1690    fn build(self, id: usize, name_hint: &'_ str) -> <DockerDeployEcs as Deploy<'a>>::Process {
1691        DockerDeployProcessEcs {
1692            id,
1693            name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1694
1695            next_port: Rc::new(RefCell::new(1000)),
1696            rust_crate: Rc::new(RefCell::new(None)),
1697
1698            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1699
1700            docker_container_name: Rc::new(RefCell::new(None)),
1701
1702            compilation_options: self.compilation_options,
1703            config: self.config,
1704
1705            network: self.network.clone(),
1706        }
1707    }
1708}
1709
1710/// Represents a Cluster running across `count` docker containers.
1711#[derive(Clone)]
1712pub struct DockerDeployClusterSpecEcs {
1713    compilation_options: Option<String>,
1714    config: Vec<String>,
1715    count: usize,
1716    deployment_instance: String,
1717}
1718
1719impl<'a> ClusterSpec<'a, DockerDeployEcs> for DockerDeployClusterSpecEcs {
1720    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1721    fn build(self, id: usize, name_hint: &str) -> <DockerDeployEcs as Deploy<'a>>::Cluster {
1722        DockerDeployClusterEcs {
1723            id,
1724            name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1725
1726            next_port: Rc::new(RefCell::new(1000)),
1727            rust_crate: Rc::new(RefCell::new(None)),
1728
1729            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1730
1731            compilation_options: self.compilation_options,
1732            config: self.config,
1733
1734            count: self.count,
1735        }
1736    }
1737}
1738
1739/// Represents an external process outside of the management of hydro deploy.
1740pub struct DockerDeployExternalSpecEcs {
1741    name: String,
1742    deployment_instance: String,
1743}
1744
1745impl<'a> ExternalSpec<'a, DockerDeployEcs> for DockerDeployExternalSpecEcs {
1746    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1747    fn build(self, id: usize, name_hint: &str) -> <DockerDeployEcs as Deploy<'a>>::External {
1748        DockerDeployExternalEcs {
1749            name: self.name,
1750            next_port: Rc::new(RefCell::new(10000)),
1751            ports: Rc::new(RefCell::new(HashMap::new())),
1752            connection_info: Rc::new(RefCell::new(HashMap::new())),
1753            deployment_instance: self.deployment_instance,
1754        }
1755    }
1756}