1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::query_parameters::{BuildImageOptions, TagImageOptions};
10use bytes::Bytes;
11use dfir_lang::graph::DfirGraph;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use http_body_util::Full;
14use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
15use hydro_deploy::{LinuxCompileType, RustCrate};
16use nanoid::nanoid;
17use proc_macro2::Span;
18use serde_json::{Map, Value, json};
19use sinktools::lazy::LazySink;
20use stageleft::QuotedWithContext;
21use syn::parse_quote;
22use tar::{Builder, Header};
23use tokio::net::TcpStream;
24use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
25use tracing::{Instrument, instrument, trace};
26
27use super::deploy_runtime_containerized_ecs::*;
28
29struct TaskConfig {
31 task_name: String,
32 image_uri: String,
33 deployment_instance: String,
34 exposed_ports: Vec<u16>,
35 region: String,
36}
37
38fn generate_cloudformation_template(
40 deployment_instance: &str,
41 tasks: &[TaskConfig],
42) -> anyhow::Result<String> {
43 let mut resources: Map<String, Value> = Map::new();
44
45 resources.insert(
47 "EcsTaskExecutionRole".to_string(),
48 json!({
49 "Type": "AWS::IAM::Role",
50 "Properties": {
51 "RoleName": format!("hydro-exec-{}", deployment_instance),
52 "AssumeRolePolicyDocument": {
53 "Version": "2012-10-17",
54 "Statement": [{
55 "Effect": "Allow",
56 "Principal": { "Service": "ecs-tasks.amazonaws.com" },
57 "Action": "sts:AssumeRole"
58 }]
59 },
60 "ManagedPolicyArns": [
61 "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
62 ],
63 "Policies": [{
64 "PolicyName": "CloudWatchLogsCreateLogGroup",
65 "PolicyDocument": {
66 "Version": "2012-10-17",
67 "Statement": [{
68 "Effect": "Allow",
69 "Action": "logs:CreateLogGroup",
70 "Resource": "*"
71 }]
72 }
73 }]
74 }
75 }),
76 );
77
78 resources.insert(
79 "EcsTaskRole".to_string(),
80 json!({
81 "Type": "AWS::IAM::Role",
82 "Properties": {
83 "RoleName": format!("hydro-task-{}", deployment_instance),
84 "AssumeRolePolicyDocument": {
85 "Version": "2012-10-17",
86 "Statement": [{
87 "Effect": "Allow",
88 "Principal": { "Service": "ecs-tasks.amazonaws.com" },
89 "Action": "sts:AssumeRole"
90 }]
91 },
92 "Policies": [{
93 "PolicyName": "HydroEcsAccess",
94 "PolicyDocument": {
95 "Version": "2012-10-17",
96 "Statement": [{
97 "Effect": "Allow",
98 "Action": [
99 "ecs:ListTasks",
100 "ecs:DescribeTasks",
101 "ec2:DescribeNetworkInterfaces"
102 ],
103 "Resource": "*"
104 }]
105 }
106 }]
107 }
108 }),
109 );
110
111 resources.insert(
112 "VPC".to_string(),
113 json!({
114 "Type": "AWS::EC2::VPC",
115 "Properties": {
116 "CidrBlock": "10.0.0.0/16",
117 "EnableDnsSupport": true,
118 "EnableDnsHostnames": true,
119 "Tags": [{ "Key": "Name", "Value": format!("hydro-vpc-{}", deployment_instance) }]
120 }
121 }),
122 );
123
124 resources.insert("Subnet".to_string(), json!({
125 "Type": "AWS::EC2::Subnet",
126 "Properties": {
127 "VpcId": { "Ref": "VPC" },
128 "CidrBlock": "10.0.1.0/24",
129 "MapPublicIpOnLaunch": true,
130 "Tags": [{ "Key": "Name", "Value": format!("hydro-subnet-{}", deployment_instance) }]
131 }
132 }));
133
134 resources.insert(
135 "InternetGateway".to_string(),
136 json!({
137 "Type": "AWS::EC2::InternetGateway",
138 "Properties": {
139 "Tags": [{ "Key": "Name", "Value": format!("hydro-igw-{}", deployment_instance) }]
140 }
141 }),
142 );
143
144 resources.insert(
145 "VPCGatewayAttachment".to_string(),
146 json!({
147 "Type": "AWS::EC2::VPCGatewayAttachment",
148 "Properties": {
149 "VpcId": { "Ref": "VPC" },
150 "InternetGatewayId": { "Ref": "InternetGateway" }
151 }
152 }),
153 );
154
155 resources.insert(
156 "RouteTable".to_string(),
157 json!({
158 "Type": "AWS::EC2::RouteTable",
159 "Properties": {
160 "VpcId": { "Ref": "VPC" },
161 "Tags": [{ "Key": "Name", "Value": format!("hydro-rt-{}", deployment_instance) }]
162 }
163 }),
164 );
165
166 resources.insert(
167 "Route".to_string(),
168 json!({
169 "Type": "AWS::EC2::Route",
170 "DependsOn": "VPCGatewayAttachment",
171 "Properties": {
172 "RouteTableId": { "Ref": "RouteTable" },
173 "DestinationCidrBlock": "0.0.0.0/0",
174 "GatewayId": { "Ref": "InternetGateway" }
175 }
176 }),
177 );
178
179 resources.insert(
180 "SubnetRouteTableAssociation".to_string(),
181 json!({
182 "Type": "AWS::EC2::SubnetRouteTableAssociation",
183 "Properties": {
184 "SubnetId": { "Ref": "Subnet" },
185 "RouteTableId": { "Ref": "RouteTable" }
186 }
187 }),
188 );
189
190 resources.insert(
191 "SecurityGroup".to_string(),
192 json!({
193 "Type": "AWS::EC2::SecurityGroup",
194 "Properties": {
195 "GroupDescription": "Security group for Hydro ECS tasks",
196 "VpcId": { "Ref": "VPC" },
197 "SecurityGroupIngress": [{
198 "IpProtocol": "-1",
199 "CidrIp": "0.0.0.0/0"
200 }],
201 "Tags": [{ "Key": "Name", "Value": format!("hydro-sg-{}", deployment_instance) }]
202 }
203 }),
204 );
205
206 resources.insert(
207 "EcsCluster".to_string(),
208 json!({
209 "Type": "AWS::ECS::Cluster",
210 "Properties": {
211 "ClusterName": format!("hydro-{}", deployment_instance)
212 }
213 }),
214 );
215
216 for task in tasks {
218 let safe_name = task.task_name.replace('-', "");
219
220 let port_mappings: Vec<Value> = task
222 .exposed_ports
223 .iter()
224 .map(|p| {
225 json!({
226 "ContainerPort": *p as i32,
227 "Protocol": "tcp"
228 })
229 })
230 .collect();
231
232 resources.insert(format!("TaskDef{}", safe_name), json!({
234 "Type": "AWS::ECS::TaskDefinition",
235 "Properties": {
236 "Family": task.task_name,
237 "RequiresCompatibilities": ["FARGATE"],
238 "NetworkMode": "awsvpc",
239 "Cpu": "256",
240 "Memory": "512",
241 "ExecutionRoleArn": { "Fn::GetAtt": ["EcsTaskExecutionRole", "Arn"] },
242 "TaskRoleArn": { "Fn::GetAtt": ["EcsTaskRole", "Arn"] },
243 "ContainerDefinitions": [{
244 "Name": task.task_name,
245 "Image": task.image_uri,
246 "PortMappings": port_mappings,
247 "LogConfiguration": {
248 "LogDriver": "awslogs",
249 "Options": {
250 "awslogs-group": "/ecs/hydro",
251 "awslogs-region": task.region,
252 "awslogs-stream-prefix": "ecs",
253 "awslogs-create-group": "true"
254 }
255 },
256 "Environment": [
257 { "Name": "CONTAINER_NAME", "Value": task.task_name },
258 { "Name": "DEPLOYMENT_INSTANCE", "Value": task.deployment_instance },
259 { "Name": "RUST_LOG", "Value": "trace,aws_runtime=info,aws_sdk_ecs=info,aws_smithy_runtime=info,aws_smithy_runtime_api=info,aws_config=info,hyper_util=info,aws_smithy_http_client=info,aws_sigv4=info" },
260 { "Name": "RUST_BACKTRACE", "Value": "1" },
261 { "Name": "NO_COLOR", "Value": "1" }
262 ]
263 }]
264 }
265 }));
266
267 resources.insert(
269 format!("EcsService{}", safe_name),
270 json!({
271 "Type": "AWS::ECS::Service",
272 "DependsOn": ["Route", format!("TaskDef{}", safe_name)],
273 "Properties": {
274 "ServiceName": task.task_name,
275 "Cluster": { "Ref": "EcsCluster" },
276 "TaskDefinition": { "Ref": format!("TaskDef{}", safe_name) },
277 "DesiredCount": 1,
278 "LaunchType": "FARGATE",
279 "NetworkConfiguration": {
280 "AwsvpcConfiguration": {
281 "Subnets": [{ "Ref": "Subnet" }],
282 "SecurityGroups": [{ "Ref": "SecurityGroup" }],
283 "AssignPublicIp": "ENABLED"
284 }
285 }
286 }
287 }),
288 );
289 }
290
291 let template = json!({
292 "AWSTemplateFormatVersion": "2010-09-09",
293 "Description": "Hydro ECS Infrastructure",
294 "Resources": resources,
295 "Outputs": {
296 "ClusterName": {
297 "Value": { "Ref": "EcsCluster" }
298 }
299 }
300 });
301
302 Ok(serde_json::to_string(&template)?)
303}
304
305#[instrument(level = "trace", skip_all, fields(%deployment_instance))]
306async fn deploy_stack(
307 deployment_instance: &str,
308 tasks: &[TaskConfig],
309) -> Result<String, anyhow::Error> {
310 use aws_config::BehaviorVersion;
311 use aws_sdk_cloudformation::Client as CfnClient;
312 use aws_sdk_cloudformation::types::{Capability, StackStatus};
313
314 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
315 let cfn_client = CfnClient::new(&config);
316
317 let stack_name = format!("hydro-{}", deployment_instance);
318 let template = generate_cloudformation_template(deployment_instance, tasks)?;
319
320 trace!(name: "creating_stack", %stack_name);
321 cfn_client
322 .create_stack()
323 .stack_name(&stack_name)
324 .template_body(&template)
325 .capabilities(Capability::CapabilityNamedIam)
326 .send()
327 .await?;
328
329 trace!(name: "waiting_for_stack", %stack_name);
331 loop {
332 let describe = cfn_client
333 .describe_stacks()
334 .stack_name(&stack_name)
335 .send()
336 .await?;
337
338 let stack = describe
339 .stacks()
340 .first()
341 .ok_or_else(|| anyhow::anyhow!("Stack not found"))?;
342
343 match stack.stack_status() {
344 Some(StackStatus::CreateComplete) => {
345 trace!(name: "stack_created", %stack_name);
346 break;
347 }
348 Some(StackStatus::CreateFailed)
349 | Some(StackStatus::RollbackComplete)
350 | Some(StackStatus::RollbackFailed) => {
351 return Err(anyhow::anyhow!(
352 "Stack creation failed: {:?}",
353 stack.stack_status_reason()
354 ));
355 }
356 status => {
357 trace!(name: "stack_status", ?status);
358 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
359 }
360 }
361 }
362
363 let describe = cfn_client
365 .describe_stacks()
366 .stack_name(&stack_name)
367 .send()
368 .await?;
369
370 let stack = describe
371 .stacks()
372 .first()
373 .ok_or_else(|| anyhow::anyhow!("Stack not found"))?;
374
375 let cluster_name = stack
376 .outputs()
377 .iter()
378 .find(|o| o.output_key() == Some("ClusterName"))
379 .and_then(|o| o.output_value())
380 .map(|s| s.to_string())
381 .ok_or_else(|| anyhow::anyhow!("Missing ClusterName output"))?;
382
383 Ok(cluster_name)
384}
385use crate::compile::builder::ExternalPortId;
386use crate::compile::deploy::DeployResult;
387use crate::compile::deploy_provider::{
388 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
389};
390use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
391use crate::location::dynamic::LocationId;
392use crate::location::member_id::TaglessMemberId;
393use crate::location::{MembershipEvent, NetworkHint};
394
395#[derive(Clone, Debug)]
397pub struct DockerNetworkEcs {
398 _name: String,
399}
400
401impl DockerNetworkEcs {
402 pub fn new(name: String) -> Self {
404 Self {
405 _name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
406 }
407 }
408}
409
410#[derive(Clone)]
412pub struct DockerDeployProcessEcs {
413 id: usize,
414 name: String,
415 next_port: Rc<RefCell<u16>>,
416 rust_crate: Rc<RefCell<Option<RustCrate>>>,
417
418 exposed_ports: Rc<RefCell<Vec<u16>>>,
419
420 docker_container_name: Rc<RefCell<Option<String>>>,
421
422 compilation_options: Option<String>,
423
424 config: Vec<String>,
425
426 network: DockerNetworkEcs,
427}
428
429impl Node for DockerDeployProcessEcs {
430 type Port = u16;
431 type Meta = ();
432 type InstantiateEnv = DockerDeployEcs;
433
434 #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
435 fn next_port(&self) -> Self::Port {
436 let port = {
437 let mut borrow = self.next_port.borrow_mut();
438 let port = *borrow;
439 *borrow += 1;
440 port
441 };
442
443 port
444 }
445
446 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
447 fn update_meta(&self, _meta: &Self::Meta) {}
448
449 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
450 fn instantiate(
451 &self,
452 _env: &mut Self::InstantiateEnv,
453 meta: &mut Self::Meta,
454 graph: DfirGraph,
455 extra_stmts: Vec<syn::Stmt>,
456 ) {
457 let (bin_name, config) = create_graph_trybuild(
458 graph,
459 extra_stmts.clone(),
460 &Some(self.name.clone()),
461 true,
462 LinkingMode::Static,
463 );
464
465 let mut ret = RustCrate::new(config.project_dir)
466 .target_dir(config.target_dir)
467 .example(bin_name.clone())
468 .no_default_features();
469
470 ret = ret.display_name("test_display_name");
471
472 ret = ret.features(vec!["hydro___feature_ecs_runtime".to_string()]);
473
474 if let Some(features) = config.features {
475 ret = ret.features(features);
476 }
477
478 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
479 ret = ret.config("build.incremental = false");
480
481 *self.rust_crate.borrow_mut() = Some(ret);
482 }
483}
484
485#[derive(Clone)]
487pub struct DockerDeployClusterEcs {
488 id: usize,
489 name: String,
490 next_port: Rc<RefCell<u16>>,
491 rust_crate: Rc<RefCell<Option<RustCrate>>>,
492
493 docker_container_name: Rc<RefCell<Vec<String>>>,
494
495 compilation_options: Option<String>,
496
497 config: Vec<String>,
498
499 count: usize,
500}
501
502impl Node for DockerDeployClusterEcs {
503 type Port = u16;
504 type Meta = ();
505 type InstantiateEnv = DockerDeployEcs;
506
507 #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
508 fn next_port(&self) -> Self::Port {
509 let port = {
510 let mut borrow = self.next_port.borrow_mut();
511 let port = *borrow;
512 *borrow += 1;
513 port
514 };
515
516 port
517 }
518
519 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
520 fn update_meta(&self, _meta: &Self::Meta) {}
521
522 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, extra_stmts = extra_stmts.len()))]
523 fn instantiate(
524 &self,
525 _env: &mut Self::InstantiateEnv,
526 _meta: &mut Self::Meta,
527 graph: DfirGraph,
528 extra_stmts: Vec<syn::Stmt>,
529 ) {
530 let (bin_name, config) = create_graph_trybuild(
531 graph,
532 extra_stmts.clone(),
533 &Some(self.name.clone()),
534 true,
535 LinkingMode::Static,
536 );
537
538 let mut ret = RustCrate::new(config.project_dir)
539 .target_dir(config.target_dir)
540 .example(bin_name.clone())
541 .no_default_features();
542
543 ret = ret.display_name("test_display_name");
544
545 ret = ret.features(vec!["hydro___feature_ecs_runtime".to_string()]);
546
547 if let Some(features) = config.features {
548 ret = ret.features(features);
549 }
550
551 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
552 ret = ret.config("build.incremental = false");
553
554 *self.rust_crate.borrow_mut() = Some(ret);
555 }
556}
557
558#[derive(Clone, Debug)]
560pub struct DockerDeployExternalEcs {
561 name: String,
562 next_port: Rc<RefCell<u16>>,
563
564 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
565
566 #[expect(clippy::type_complexity, reason = "internal code")]
567 connection_info:
568 Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetworkEcs)>>>,
569
570 deployment_instance: String,
571}
572
573impl Node for DockerDeployExternalEcs {
574 type Port = u16;
575 type Meta = ();
576 type InstantiateEnv = DockerDeployEcs;
577
578 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
579 fn next_port(&self) -> Self::Port {
580 let port = {
581 let mut borrow = self.next_port.borrow_mut();
582 let port = *borrow;
583 *borrow += 1;
584 port
585 };
586
587 port
588 }
589
590 #[instrument(level = "trace", skip_all, fields(name = self.name))]
591 fn update_meta(&self, _meta: &Self::Meta) {}
592
593 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
594 fn instantiate(
595 &self,
596 _env: &mut Self::InstantiateEnv,
597 meta: &mut Self::Meta,
598 graph: DfirGraph,
599 extra_stmts: Vec<syn::Stmt>,
600 ) {
601 trace!(name: "surface", surface = graph.surface_syntax_string());
602 }
603}
604
605type DynSourceSink<Out, In, InErr> = (
606 Pin<Box<dyn Stream<Item = Out>>>,
607 Pin<Box<dyn Sink<In, Error = InErr>>>,
608);
609
610impl<'a> RegisterPort<'a, DockerDeployEcs> for DockerDeployExternalEcs {
611 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
612 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
613 self.ports.borrow_mut().insert(external_port_id, port);
614 }
615
616 fn as_bytes_bidi(
617 &self,
618 external_port_id: ExternalPortId,
619 ) -> impl Future<
620 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
621 > + 'a {
622 let _span =
623 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered(); async { todo!() }
625 }
626
627 fn as_bincode_bidi<InT, OutT>(
628 &self,
629 external_port_id: ExternalPortId,
630 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
631 where
632 InT: serde::Serialize + 'static,
633 OutT: serde::de::DeserializeOwned + 'static,
634 {
635 let _span =
636 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered(); async { todo!() }
638 }
639
640 fn as_bincode_sink<T>(
641 &self,
642 external_port_id: ExternalPortId,
643 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
644 where
645 T: serde::Serialize + 'static,
646 {
647 let guard =
648 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
649
650 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
651 let (docker_container_name, remote_port, _network) = self
652 .connection_info
653 .borrow()
654 .get(&local_port)
655 .unwrap()
656 .clone();
657 let deployment_instance = self.deployment_instance.clone();
658
659 async move {
660 use aws_config::BehaviorVersion;
661 use aws_sdk_ecs::Client as EcsClient;
662
663 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
664 let ecs_client = EcsClient::new(&config);
665
666 let task_name = docker_container_name.borrow().as_ref().unwrap().clone();
667 trace!(name: "query_ecs", %task_name);
668
669 let cluster_name = format!("hydro-{}", deployment_instance);
670
671 let task_arn = loop {
672 let tasks = ecs_client
673 .list_tasks()
674 .cluster(&cluster_name)
675 .family(&task_name)
676 .send()
677 .await
678 .unwrap();
679
680 if let Some(arn) = tasks.task_arns().first() {
681 break arn.clone();
682 }
683
684 trace!(name: "waiting_for_task", %task_name);
685 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
686 };
687 trace!(name: "found_task", %task_arn);
688
689 let eni_id = loop {
690 let task_details = ecs_client
691 .describe_tasks()
692 .cluster(&cluster_name)
693 .tasks(&task_arn)
694 .send()
695 .await
696 .unwrap();
697
698 if let Some(eni) = task_details.tasks().first().and_then(|task| {
699 task.attachments()
700 .iter()
701 .flat_map(|a| a.details())
702 .find(|d| d.name() == Some("networkInterfaceId"))
703 .and_then(|d| d.value())
704 }) {
705 break eni.to_string();
706 }
707
708 trace!(name: "waiting_for_eni", %task_arn);
709 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
710 };
711 trace!(name: "found_eni", %eni_id);
712
713 let ec2_client = aws_sdk_ec2::Client::new(&config);
714
715 let remote_ip_address = loop {
716 let eni_info = ec2_client
717 .describe_network_interfaces()
718 .network_interface_ids(&eni_id)
719 .send()
720 .await
721 .unwrap();
722
723 if let Some(ip) = eni_info
724 .network_interfaces()
725 .first()
726 .and_then(|ni| ni.association())
727 .and_then(|assoc| assoc.public_ip())
728 {
729 break ip.to_string();
730 }
731
732 trace!(name: "waiting_for_public_ip", %eni_id);
733 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
734 };
735 trace!(name: "resolved_ip", %remote_ip_address);
736
737 Box::pin(
738 LazySink::new(move || {
739 Box::pin(async move {
740 trace!(name: "connecting", %remote_ip_address, %remote_port);
741
742 let stream =
743 TcpStream::connect(format!("{remote_ip_address}:{remote_port}"))
744 .await?;
745
746 trace!(name: "connected", %remote_ip_address, %remote_port);
747
748 Result::<_, std::io::Error>::Ok(FramedWrite::new(
749 stream,
750 LengthDelimitedCodec::new(),
751 ))
752 })
753 })
754 .with(move |v| async move { Ok(Bytes::from(bincode::serialize(&v).unwrap())) }),
755 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
756 }
757 .instrument(guard.exit())
758 }
759
760 fn as_bincode_source<T>(
761 &self,
762 external_port_id: ExternalPortId,
763 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
764 where
765 T: serde::de::DeserializeOwned + 'static,
766 {
767 let guard =
768 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
769
770 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
771 let (docker_container_name, remote_port, _network) = self
772 .connection_info
773 .borrow()
774 .get(&local_port)
775 .unwrap()
776 .clone();
777 let deployment_instance = self.deployment_instance.clone();
778
779 async move {
780 use aws_config::BehaviorVersion;
781 use aws_sdk_ecs::Client as EcsClient;
782
783 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
784 let ecs_client = EcsClient::new(&config);
785
786 let task_name = docker_container_name.borrow().as_ref().unwrap().clone();
787 trace!(name: "query_ecs", %task_name);
788
789 let cluster_name = format!("hydro-{}", deployment_instance);
790
791 let task_arn = loop {
792 let tasks = ecs_client
793 .list_tasks()
794 .cluster(&cluster_name)
795 .family(&task_name)
796 .send()
797 .await
798 .unwrap();
799
800 if let Some(arn) = tasks.task_arns().first() {
801 break arn.clone();
802 }
803
804 trace!(name: "waiting_for_task", %task_name);
805 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
806 };
807 trace!(name: "found_task", %task_arn);
808
809 let eni_id = loop {
810 let task_details = ecs_client
811 .describe_tasks()
812 .cluster(&cluster_name)
813 .tasks(&task_arn)
814 .send()
815 .await
816 .unwrap();
817
818 if let Some(eni) = task_details.tasks().first().and_then(|task| {
819 task.attachments()
820 .iter()
821 .flat_map(|a| a.details())
822 .find(|d| d.name() == Some("networkInterfaceId"))
823 .and_then(|d| d.value())
824 }) {
825 break eni.to_string();
826 }
827
828 trace!(name: "waiting_for_eni", %task_arn);
829 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
830 };
831 trace!(name: "found_eni", %eni_id);
832
833 let ec2_client = aws_sdk_ec2::Client::new(&config);
834
835 let remote_ip_address = loop {
836 let eni_info = ec2_client
837 .describe_network_interfaces()
838 .network_interface_ids(&eni_id)
839 .send()
840 .await
841 .unwrap();
842
843 if let Some(ip) = eni_info
844 .network_interfaces()
845 .first()
846 .and_then(|ni| ni.association())
847 .and_then(|assoc| assoc.public_ip())
848 {
849 break ip.to_string();
850 }
851
852 trace!(name: "waiting_for_public_ip", %eni_id);
853 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
854 };
855 trace!(name: "resolved_ip", %remote_ip_address);
856
857 trace!(name: "connecting", %remote_ip_address, %remote_port);
858
859 let stream = TcpStream::connect(format!("{remote_ip_address}:{remote_port}"))
860 .await
861 .unwrap();
862
863 trace!(name: "connected", %remote_ip_address, %remote_port);
864
865 Box::pin(
866 FramedRead::new(stream, LengthDelimitedCodec::new())
867 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
868 ) as Pin<Box<dyn Stream<Item = T>>>
869 }
870 .instrument(guard.exit())
871 }
872}
873
874pub struct DockerDeployEcs {
876 docker_processes: Vec<DockerDeployProcessSpecEcs>,
877 docker_clusters: Vec<DockerDeployClusterSpecEcs>,
878 network: DockerNetworkEcs,
879 deployment_instance: String,
880}
881
882#[instrument(level = "trace", skip_all, fields(%image_name))]
883async fn build_and_create_image(
884 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
885 compilation_options: &Option<String>,
886 config: &[String],
887 image_name: &str,
888) -> Result<(), anyhow::Error> {
889 let mut rust_crate = rust_crate
890 .borrow_mut()
891 .take()
892 .unwrap()
893 .rustflags(compilation_options.clone().unwrap_or("".to_string()));
894
895 for cfg in config {
896 rust_crate = rust_crate.config(cfg);
897 }
898
899 let build_output = match build_crate_memoized(
900 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Glibc)),
901 )
902 .await
903 {
904 Ok(build_output) => build_output,
905 Err(BuildError::FailedToBuildCrate {
906 exit_status,
907 diagnostics,
908 text_lines,
909 stderr_lines,
910 }) => {
911 let diagnostics = diagnostics
912 .into_iter()
913 .map(|d| d.rendered.unwrap())
914 .collect::<Vec<_>>()
915 .join("\n");
916 let text_lines = text_lines.join("\n");
917 let stderr_lines = stderr_lines.join("\n");
918
919 anyhow::bail!(
920 r#"
921Failed to build crate {exit_status:?}
922--- diagnostics
923---
924{diagnostics}
925---
926---
927---
928
929--- text_lines
930---
931---
932{text_lines}
933---
934---
935---
936
937--- stderr_lines
938---
939---
940{stderr_lines}
941---
942---
943---"#
944 );
945 }
946 Err(err) => {
947 anyhow::bail!("Failed to build crate {err:?}");
948 }
949 };
950
951 let docker = Docker::connect_with_local_defaults()?;
952
953 let mut tar_data = Vec::new();
954 {
955 let mut tar = Builder::new(&mut tar_data);
956
957 let dockerfile_content = br#"
958 FROM debian:trixie-slim
959 RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
960 COPY app /app
961 CMD ["/app"]
962 "#;
963 let mut header = Header::new_gnu();
964 header.set_path("Dockerfile")?;
965 header.set_size(dockerfile_content.len() as u64);
966 header.set_cksum();
967 tar.append(&header, &dockerfile_content[..])?;
968
969 let mut header = Header::new_gnu();
970 header.set_path("app")?;
971 header.set_size(build_output.bin_data.len() as u64);
972 header.set_mode(0o755);
973 header.set_cksum();
974 tar.append(&header, &build_output.bin_data[..])?;
975
976 tar.finish()?;
977 }
978
979 let build_options = BuildImageOptions {
980 dockerfile: "Dockerfile".to_owned(),
981 t: Some(image_name.to_string()),
982 rm: true,
983 ..Default::default()
984 };
985
986 use bollard::errors::Error;
987
988 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
989 let mut build_stream = docker.build_image(build_options, None, Some(body));
990 while let Some(msg) = build_stream.next().await {
991 match msg {
992 Ok(_) => {}
993 Err(e) => match e {
994 Error::DockerStreamError { error } => {
995 return Err(anyhow::anyhow!(
996 "Docker build failed: DockerStreamError: {{ error: {error} }}"
997 ));
998 }
999 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
1000 },
1001 }
1002 }
1003
1004 Ok(())
1005}
1006
1007#[instrument(level = "trace", skip_all, fields(%image_name))]
1008async fn upload_image_to_ecr(docker: &Docker, image_name: &str) -> Result<(), anyhow::Error> {
1009 use aws_config::BehaviorVersion;
1010 use aws_sdk_ecr::Client as EcrClient;
1011 use base64::Engine;
1012 use bollard::auth::DockerCredentials;
1013 use bollard::query_parameters::PushImageOptions;
1014
1015 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1016 let ecr_client = EcrClient::new(&config);
1017
1018 match ecr_client
1020 .create_repository()
1021 .repository_name(image_name)
1022 .send()
1023 .await
1024 {
1025 Ok(_) => trace!(name: "repository_created", %image_name),
1026 Err(error) => {
1027 trace!(name: "repository_creation_result", ?error);
1029 }
1030 }
1031
1032 let auth_response = ecr_client.get_authorization_token().send().await?;
1033
1034 let auth_token = auth_response
1035 .authorization_data()
1036 .first()
1037 .ok_or_else(|| anyhow::anyhow!("No ECR authorization data"))?;
1038
1039 let endpoint = auth_token
1040 .proxy_endpoint()
1041 .ok_or_else(|| anyhow::anyhow!("No ECR endpoint"))?;
1042 let token = auth_token
1043 .authorization_token()
1044 .ok_or_else(|| anyhow::anyhow!("No ECR token"))?;
1045
1046 let decoded = String::from_utf8(base64::prelude::BASE64_STANDARD.decode(token)?)?;
1047 let (username, password) = decoded
1048 .split_once(':')
1049 .ok_or_else(|| anyhow::anyhow!("Invalid ECR token format"))?;
1050
1051 let registry = endpoint.trim_start_matches("https://");
1052 let ecr_image_name = format!("{registry}/{image_name}");
1053
1054 match ecr_client
1056 .create_repository()
1057 .repository_name(image_name)
1058 .send()
1059 .await
1060 {
1061 Ok(_) => {}
1062 Err(e) => {
1063 let is_already_exists = e
1065 .as_service_error()
1066 .map(|se| se.is_repository_already_exists_exception())
1067 .unwrap_or(false);
1068 if !is_already_exists {
1069 return Err(anyhow::anyhow!("Failed to create ECR repository: {}", e));
1070 }
1071 }
1072 }
1073
1074 docker
1075 .tag_image(
1076 image_name,
1077 Some(TagImageOptions {
1078 repo: Some(ecr_image_name.clone()),
1079 ..Default::default()
1080 }),
1081 )
1082 .await?;
1083
1084 let mut push_stream = docker.push_image(
1085 &ecr_image_name,
1086 Some(PushImageOptions {
1087 ..Default::default()
1088 }),
1089 Some(DockerCredentials {
1090 username: Some(username.to_string()),
1091 password: Some(password.to_string()),
1092 ..Default::default()
1093 }),
1094 );
1095
1096 while let Some(msg) = push_stream.next().await {
1097 match msg {
1098 Ok(_) => {}
1099 Err(e) => match e {
1100 bollard::errors::Error::DockerStreamError { error } => {
1101 return Err(anyhow::anyhow!(
1102 "Docker push failed: DockerStreamError: {{ error: {error} }}"
1103 ));
1104 }
1105 _ => return Err(anyhow::anyhow!("Docker push failed: {}", e)),
1106 },
1107 }
1108 msg?;
1109 }
1110
1111 Ok(())
1112}
1113
1114impl DockerDeployEcs {
1115 pub fn new(network: DockerNetworkEcs) -> Self {
1117 Self {
1118 docker_processes: Vec::new(),
1119 docker_clusters: Vec::new(),
1120 network,
1121 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
1122 }
1123 }
1124
1125 pub fn add_localhost_docker(
1127 &mut self,
1128 compilation_options: Option<String>,
1129 config: Vec<String>,
1130 ) -> DockerDeployProcessSpecEcs {
1131 let process = DockerDeployProcessSpecEcs {
1132 compilation_options,
1133 config,
1134 network: self.network.clone(),
1135 deployment_instance: self.deployment_instance.clone(),
1136 };
1137
1138 self.docker_processes.push(process.clone());
1139
1140 process
1141 }
1142
1143 pub fn add_localhost_docker_cluster(
1145 &mut self,
1146 compilation_options: Option<String>,
1147 config: Vec<String>,
1148 count: usize,
1149 ) -> DockerDeployClusterSpecEcs {
1150 let cluster = DockerDeployClusterSpecEcs {
1151 compilation_options,
1152 config,
1153 count,
1154 deployment_instance: self.deployment_instance.clone(),
1155 };
1156
1157 self.docker_clusters.push(cluster.clone());
1158
1159 cluster
1160 }
1161
1162 pub fn add_external(&self, name: String) -> DockerDeployExternalSpecEcs {
1164 DockerDeployExternalSpecEcs {
1165 name,
1166 deployment_instance: self.deployment_instance.clone(),
1167 }
1168 }
1169
1170 pub fn get_deployment_instance(&self) -> String {
1172 self.deployment_instance.clone()
1173 }
1174
1175 #[instrument(level = "trace", skip_all)]
1177 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1178 for (_, _, process) in nodes.get_all_processes() {
1179 build_and_create_image(
1180 &process.rust_crate,
1181 &process.compilation_options,
1182 &process.config,
1183 &process.name,
1184 )
1185 .await?;
1186 }
1187
1188 for (_, _, cluster) in nodes.get_all_clusters() {
1189 build_and_create_image(
1190 &cluster.rust_crate,
1191 &cluster.compilation_options,
1192 &cluster.config,
1193 &cluster.name,
1194 )
1195 .await?;
1196 }
1197
1198 let docker = Docker::connect_with_local_defaults()?;
1200
1201 for (_, _, process) in nodes.get_all_processes() {
1202 upload_image_to_ecr(&docker, &process.name).await?;
1203 }
1204
1205 for (_, _, cluster) in nodes.get_all_clusters() {
1206 upload_image_to_ecr(&docker, &cluster.name).await?;
1207 }
1208
1209 Ok(())
1210 }
1211
1212 #[instrument(level = "trace", skip_all)]
1214 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1215 use aws_config::BehaviorVersion;
1216 use aws_sdk_ecs::Client as EcsClient;
1217
1218 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1219 let region = config.region().unwrap().to_string();
1220
1221 let ecr_client = aws_sdk_ecr::Client::new(&config);
1223 let auth_response = ecr_client.get_authorization_token().send().await?;
1224 let auth_token = auth_response
1225 .authorization_data()
1226 .first()
1227 .ok_or_else(|| anyhow::anyhow!("No ECR authorization data"))?;
1228 let endpoint = auth_token
1229 .proxy_endpoint()
1230 .ok_or_else(|| anyhow::anyhow!("No ECR endpoint"))?;
1231 let registry = endpoint.trim_start_matches("https://");
1232
1233 let mut tasks = Vec::new();
1235
1236 for (_, _, process) in nodes.get_all_processes() {
1237 let task_name = get_docker_container_name(&process.name, None);
1238 *process.docker_container_name.borrow_mut() = Some(task_name.clone());
1239
1240 let image_uri = format!("{registry}/{}", process.name);
1241 let exposed_ports = process.exposed_ports.borrow().clone();
1242
1243 tasks.push(TaskConfig {
1244 task_name,
1245 image_uri,
1246 deployment_instance: self.deployment_instance.clone(),
1247 exposed_ports,
1248 region: region.clone(),
1249 });
1250 }
1251
1252 for (_, _, cluster) in nodes.get_all_clusters() {
1253 let image_uri = format!("{registry}/{}", cluster.name);
1254
1255 for num in 0..cluster.count {
1256 let task_name = get_docker_container_name(&cluster.name, Some(num));
1257 cluster
1258 .docker_container_name
1259 .borrow_mut()
1260 .push(task_name.clone());
1261
1262 tasks.push(TaskConfig {
1263 task_name,
1264 image_uri: image_uri.clone(),
1265 deployment_instance: self.deployment_instance.clone(),
1266 exposed_ports: vec![],
1267 region: region.clone(),
1268 });
1269 }
1270 }
1271
1272 let cluster_name = deploy_stack(&self.deployment_instance, &tasks).await?;
1274 trace!(name: "stack_deployed", %cluster_name);
1275
1276 let ecs_client = EcsClient::new(&config);
1278 trace!("waiting_for_services_to_stabilize");
1279
1280 for task in &tasks {
1281 trace!(name: "waiting_for_service", service_name = %task.task_name);
1282 loop {
1283 let services = ecs_client
1284 .describe_services()
1285 .cluster(&cluster_name)
1286 .services(&task.task_name)
1287 .send()
1288 .await?;
1289
1290 if let Some(service) = services.services().first() {
1291 let desired = service.desired_count();
1292 let running = service.running_count();
1293 if running >= desired && desired > 0 {
1294 trace!(name: "service_running", service_name = %task.task_name, %running, %desired);
1295 break;
1296 }
1297 }
1298 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1299 }
1300 }
1301
1302 Ok(())
1303 }
1304
1305 #[instrument(level = "trace", skip_all)]
1307 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1308 use aws_config::BehaviorVersion;
1309 use aws_sdk_ecs::Client as EcsClient;
1310
1311 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1312 let ecs_client = EcsClient::new(&config);
1313 let cluster_name = format!("hydro-{}", self.deployment_instance);
1314
1315 for (_, _, process) in nodes.get_all_processes() {
1317 let service_name = get_docker_container_name(&process.name, None);
1318 let _ = ecs_client
1319 .update_service()
1320 .cluster(&cluster_name)
1321 .service(&service_name)
1322 .desired_count(0)
1323 .send()
1324 .await;
1325 }
1326
1327 for (_, _, cluster) in nodes.get_all_clusters() {
1328 for num in 0..cluster.count {
1329 let service_name = get_docker_container_name(&cluster.name, Some(num));
1330 let _ = ecs_client
1331 .update_service()
1332 .cluster(&cluster_name)
1333 .service(&service_name)
1334 .desired_count(0)
1335 .send()
1336 .await;
1337 }
1338 }
1339
1340 Ok(())
1341 }
1342
1343 #[instrument(level = "trace", skip_all)]
1345 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
1346 use aws_config::BehaviorVersion;
1347 use aws_sdk_cloudformation::types::StackStatus;
1348
1349 let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
1350
1351 let cfn_client = aws_sdk_cloudformation::Client::new(&config);
1353 let stack_name = format!("hydro-{}", self.deployment_instance);
1354 let _ = cfn_client
1355 .delete_stack()
1356 .stack_name(&stack_name)
1357 .send()
1358 .await;
1359 trace!(name: "stack_deletion_initiated", %stack_name);
1360
1361 while let Ok(resp) = cfn_client
1363 .describe_stacks()
1364 .stack_name(&stack_name)
1365 .send()
1366 .await
1367 {
1368 if let Some(stack) = resp.stacks().first() {
1369 match stack.stack_status() {
1370 Some(StackStatus::DeleteComplete) => break,
1371 Some(StackStatus::DeleteFailed) => {
1372 trace!(name: "stack_deletion_failed", %stack_name);
1373 break;
1374 }
1375 _ => {
1376 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1377 }
1378 }
1379 } else {
1380 break;
1381 }
1382 }
1383 trace!(name: "stack_deleted", %stack_name);
1384
1385 let ecr_client = aws_sdk_ecr::Client::new(&config);
1387 for (_, _, process) in nodes.get_all_processes() {
1388 let _ = ecr_client
1389 .delete_repository()
1390 .repository_name(&process.name)
1391 .force(true)
1392 .send()
1393 .await;
1394 }
1395 for (_, _, cluster) in nodes.get_all_clusters() {
1396 let _ = ecr_client
1397 .delete_repository()
1398 .repository_name(&cluster.name)
1399 .force(true)
1400 .send()
1401 .await;
1402 }
1403
1404 Ok(())
1405 }
1406}
1407
1408impl<'a> Deploy<'a> for DockerDeployEcs {
1409 type Meta = ();
1410 type InstantiateEnv = Self;
1411
1412 type Process = DockerDeployProcessEcs;
1413 type Cluster = DockerDeployClusterEcs;
1414 type External = DockerDeployExternalEcs;
1415
1416 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
1417 fn o2o_sink_source(
1418 p1: &Self::Process,
1419 p1_port: &<Self::Process as Node>::Port,
1420 p2: &Self::Process,
1421 p2_port: &<Self::Process as Node>::Port,
1422 ) -> (syn::Expr, syn::Expr) {
1423 deploy_containerized_o2o(&p2.name, *p2_port)
1425 }
1426
1427 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
1428 fn o2o_connect(
1429 p1: &Self::Process,
1430 p1_port: &<Self::Process as Node>::Port,
1431 p2: &Self::Process,
1432 p2_port: &<Self::Process as Node>::Port,
1433 ) -> Box<dyn FnOnce()> {
1434 let serialized = format!(
1435 "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1436 p1.name, p2.name
1437 );
1438
1439 Box::new(move || {
1440 trace!(name: "o2o_connect thunk", %serialized);
1441 })
1442 }
1443
1444 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1445 fn o2m_sink_source(
1446 p1: &Self::Process,
1447 p1_port: &<Self::Process as Node>::Port,
1448 c2: &Self::Cluster,
1449 c2_port: &<Self::Cluster as Node>::Port,
1450 ) -> (syn::Expr, syn::Expr) {
1451 deploy_containerized_o2m(*c2_port)
1452 }
1453
1454 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1455 fn o2m_connect(
1456 p1: &Self::Process,
1457 p1_port: &<Self::Process as Node>::Port,
1458 c2: &Self::Cluster,
1459 c2_port: &<Self::Cluster as Node>::Port,
1460 ) -> Box<dyn FnOnce()> {
1461 let serialized = format!(
1462 "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
1463 p1.name, c2.name
1464 );
1465
1466 Box::new(move || {
1467 trace!(name: "o2m_connect thunk", %serialized);
1468 })
1469 }
1470
1471 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1472 fn m2o_sink_source(
1473 c1: &Self::Cluster,
1474 c1_port: &<Self::Cluster as Node>::Port,
1475 p2: &Self::Process,
1476 p2_port: &<Self::Process as Node>::Port,
1477 ) -> (syn::Expr, syn::Expr) {
1478 deploy_containerized_m2o(*p2_port, &p2.name)
1480 }
1481
1482 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1483 fn m2o_connect(
1484 c1: &Self::Cluster,
1485 c1_port: &<Self::Cluster as Node>::Port,
1486 p2: &Self::Process,
1487 p2_port: &<Self::Process as Node>::Port,
1488 ) -> Box<dyn FnOnce()> {
1489 let serialized = format!(
1490 "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
1491 c1.name, p2.name
1492 );
1493
1494 Box::new(move || {
1495 trace!(name: "m2o_connect thunk", %serialized);
1496 })
1497 }
1498
1499 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1500 fn m2m_sink_source(
1501 c1: &Self::Cluster,
1502 c1_port: &<Self::Cluster as Node>::Port,
1503 c2: &Self::Cluster,
1504 c2_port: &<Self::Cluster as Node>::Port,
1505 ) -> (syn::Expr, syn::Expr) {
1506 deploy_containerized_m2m(*c2_port)
1507 }
1508
1509 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1510 fn m2m_connect(
1511 c1: &Self::Cluster,
1512 c1_port: &<Self::Cluster as Node>::Port,
1513 c2: &Self::Cluster,
1514 c2_port: &<Self::Cluster as Node>::Port,
1515 ) -> Box<dyn FnOnce()> {
1516 let serialized = format!(
1517 "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
1518 c1.name, c2.name
1519 );
1520
1521 Box::new(move || {
1522 trace!(name: "m2m_connect thunk", %serialized);
1523 })
1524 }
1525
1526 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, ?codec_type, %shared_handle, extra_stmts = extra_stmts.len()))]
1527 fn e2o_many_source(
1528 extra_stmts: &mut Vec<syn::Stmt>,
1529 p2: &Self::Process,
1530 p2_port: &<Self::Process as Node>::Port,
1531 codec_type: &syn::Type,
1532 shared_handle: String,
1533 ) -> syn::Expr {
1534 todo!()
1535 }
1536
1537 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1538 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1539 todo!()
1540 }
1541
1542 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
1543 fn e2o_source(
1544 extra_stmts: &mut Vec<syn::Stmt>,
1545 p1: &Self::External,
1546 p1_port: &<Self::External as Node>::Port,
1547 p2: &Self::Process,
1548 p2_port: &<Self::Process as Node>::Port,
1549 codec_type: &syn::Type,
1550 shared_handle: String,
1551 ) -> syn::Expr {
1552 p1.connection_info.borrow_mut().insert(
1553 *p1_port,
1554 (
1555 p2.docker_container_name.clone(),
1556 *p2_port,
1557 p2.network.clone(),
1558 ),
1559 );
1560
1561 p2.exposed_ports.borrow_mut().push(*p2_port);
1562
1563 let socket_ident = syn::Ident::new(
1564 &format!("__hydro_deploy_{}_socket", &shared_handle),
1565 Span::call_site(),
1566 );
1567
1568 let source_ident = syn::Ident::new(
1569 &format!("__hydro_deploy_{}_source", &shared_handle),
1570 Span::call_site(),
1571 );
1572
1573 let sink_ident = syn::Ident::new(
1574 &format!("__hydro_deploy_{}_sink", &shared_handle),
1575 Span::call_site(),
1576 );
1577
1578 let bind_addr = format!("0.0.0.0:{}", p2_port);
1579
1580 extra_stmts.push(syn::parse_quote! {
1581 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1582 });
1583
1584 let create_expr =
1585 deploy_containerized_external_sink_source_ident(bind_addr, socket_ident.clone());
1586
1587 extra_stmts.push(syn::parse_quote! {
1588 let (#sink_ident, #source_ident) = (#create_expr).split();
1589 });
1590
1591 parse_quote!(#source_ident)
1592 }
1593
1594 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1595 fn e2o_connect(
1596 p1: &Self::External,
1597 p1_port: &<Self::External as Node>::Port,
1598 p2: &Self::Process,
1599 p2_port: &<Self::Process as Node>::Port,
1600 many: bool,
1601 server_hint: NetworkHint,
1602 ) -> Box<dyn FnOnce()> {
1603 let serialized = format!(
1604 "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1605 p1.name, p2.name
1606 );
1607
1608 Box::new(move || {
1609 trace!(name: "e2o_connect thunk", %serialized);
1610 })
1611 }
1612
1613 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1614 fn o2e_sink(
1615 p1: &Self::Process,
1616 p1_port: &<Self::Process as Node>::Port,
1617 p2: &Self::External,
1618 p2_port: &<Self::External as Node>::Port,
1619 shared_handle: String,
1620 ) -> syn::Expr {
1621 let sink_ident = syn::Ident::new(
1622 &format!("__hydro_deploy_{}_sink", &shared_handle),
1623 Span::call_site(),
1624 );
1625 parse_quote!(#sink_ident)
1626 }
1627
1628 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1629 fn cluster_ids(
1630 of_cluster: usize,
1631 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1632 cluster_ids()
1633 }
1634
1635 #[instrument(level = "trace", skip_all)]
1636 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1637 cluster_self_id()
1638 }
1639
1640 #[instrument(level = "trace", skip_all, fields(?location_id))]
1641 fn cluster_membership_stream(
1642 location_id: &LocationId,
1643 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1644 {
1645 cluster_membership_stream(location_id)
1646 }
1647}
1648
1649const CONTAINER_ALPHABET: [char; 36] = [
1650 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1651 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1652];
1653
1654#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location, %deployment_instance))]
1655fn get_docker_image_name(name_hint: &str, location: usize, deployment_instance: &str) -> String {
1656 let name_hint = name_hint
1657 .split("::")
1658 .last()
1659 .unwrap()
1660 .to_string()
1661 .to_ascii_lowercase()
1662 .replace(".", "-")
1663 .replace("_", "-")
1664 .replace("::", "-");
1665
1666 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1667
1668 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location}")
1669}
1670
1671#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1672fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1673 if let Some(instance) = instance {
1674 format!("{image_name}-{instance}")
1675 } else {
1676 image_name.to_string()
1677 }
1678}
1679#[derive(Clone)]
1681pub struct DockerDeployProcessSpecEcs {
1682 compilation_options: Option<String>,
1683 config: Vec<String>,
1684 network: DockerNetworkEcs,
1685 deployment_instance: String,
1686}
1687
1688impl<'a> ProcessSpec<'a, DockerDeployEcs> for DockerDeployProcessSpecEcs {
1689 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1690 fn build(self, id: usize, name_hint: &'_ str) -> <DockerDeployEcs as Deploy<'a>>::Process {
1691 DockerDeployProcessEcs {
1692 id,
1693 name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1694
1695 next_port: Rc::new(RefCell::new(1000)),
1696 rust_crate: Rc::new(RefCell::new(None)),
1697
1698 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1699
1700 docker_container_name: Rc::new(RefCell::new(None)),
1701
1702 compilation_options: self.compilation_options,
1703 config: self.config,
1704
1705 network: self.network.clone(),
1706 }
1707 }
1708}
1709
1710#[derive(Clone)]
1712pub struct DockerDeployClusterSpecEcs {
1713 compilation_options: Option<String>,
1714 config: Vec<String>,
1715 count: usize,
1716 deployment_instance: String,
1717}
1718
1719impl<'a> ClusterSpec<'a, DockerDeployEcs> for DockerDeployClusterSpecEcs {
1720 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1721 fn build(self, id: usize, name_hint: &str) -> <DockerDeployEcs as Deploy<'a>>::Cluster {
1722 DockerDeployClusterEcs {
1723 id,
1724 name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1725
1726 next_port: Rc::new(RefCell::new(1000)),
1727 rust_crate: Rc::new(RefCell::new(None)),
1728
1729 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1730
1731 compilation_options: self.compilation_options,
1732 config: self.config,
1733
1734 count: self.count,
1735 }
1736 }
1737}
1738
1739pub struct DockerDeployExternalSpecEcs {
1741 name: String,
1742 deployment_instance: String,
1743}
1744
1745impl<'a> ExternalSpec<'a, DockerDeployEcs> for DockerDeployExternalSpecEcs {
1746 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1747 fn build(self, id: usize, name_hint: &str) -> <DockerDeployEcs as Deploy<'a>>::External {
1748 DockerDeployExternalEcs {
1749 name: self.name,
1750 next_port: Rc::new(RefCell::new(10000)),
1751 ports: Rc::new(RefCell::new(HashMap::new())),
1752 connection_info: Rc::new(RefCell::new(HashMap::new())),
1753 deployment_instance: self.deployment_instance,
1754 }
1755 }
1756}