1use std::cell::RefCell;
5use std::collections::BTreeMap;
6use std::pin::Pin;
7use std::rc::Rc;
8
9use bytes::Bytes;
10use dfir_lang::graph::DfirGraph;
11use futures::{Sink, Stream};
12use proc_macro2::Span;
13use serde::{Deserialize, Serialize};
14use stageleft::QuotedWithContext;
15use syn::parse_quote;
16use tracing::{instrument, trace};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct HydroManifest {
21 pub processes: BTreeMap<String, ProcessManifest>,
23 pub clusters: BTreeMap<String, ClusterManifest>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct BuildConfig {
30 pub project_dir: String,
32 pub target_dir: String,
34 pub bin_name: String,
36 pub package_name: String,
38 pub features: Vec<String>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", rename_all = "lowercase")]
45pub enum PortInfo {
46 Tcp {
48 port: u16,
50 },
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ProcessManifest {
56 pub build: BuildConfig,
58 pub ports: BTreeMap<String, PortInfo>,
60 pub task_family: String,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ClusterManifest {
67 pub build: BuildConfig,
69 pub ports: BTreeMap<String, PortInfo>,
71 pub default_count: usize,
73 pub task_family_prefix: String,
75}
76
77use super::deploy_runtime_containerized_ecs::*;
78use crate::compile::builder::ExternalPortId;
79use crate::compile::deploy::DeployResult;
80use crate::compile::deploy_provider::{
81 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
82};
83use crate::compile::trybuild::generate::create_graph_trybuild;
84use crate::location::dynamic::LocationId;
85use crate::location::member_id::TaglessMemberId;
86use crate::location::{LocationKey, MembershipEvent, NetworkHint};
87
88#[derive(Clone)]
90pub struct EcsDeployProcess {
91 id: LocationKey,
92 name: String,
93 next_port: Rc<RefCell<u16>>,
94
95 exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
96
97 trybuild_config:
98 Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
99}
100
101impl Node for EcsDeployProcess {
102 type Port = u16;
103 type Meta = ();
104 type InstantiateEnv = EcsDeploy;
105
106 #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
107 fn next_port(&self) -> Self::Port {
108 let port = {
109 let mut borrow = self.next_port.borrow_mut();
110 let port = *borrow;
111 *borrow += 1;
112 port
113 };
114
115 port
116 }
117
118 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
119 fn update_meta(&self, _meta: &Self::Meta) {}
120
121 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
122 fn instantiate(
123 &self,
124 _env: &mut Self::InstantiateEnv,
125 meta: &mut Self::Meta,
126 graph: DfirGraph,
127 extra_stmts: &[syn::Stmt],
128 sidecars: &[syn::Expr],
129 ) {
130 let (bin_name, config) = create_graph_trybuild(
131 graph,
132 extra_stmts,
133 sidecars,
134 Some(&self.name),
135 crate::compile::trybuild::generate::DeployMode::Containerized,
136 crate::compile::trybuild::generate::LinkingMode::Static,
137 );
138
139 *self.trybuild_config.borrow_mut() = Some((bin_name, config));
141 }
142}
143
144#[derive(Clone)]
146pub struct EcsDeployCluster {
147 id: LocationKey,
148 name: String,
149 next_port: Rc<RefCell<u16>>,
150
151 exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
152
153 count: usize,
154
155 trybuild_config:
157 Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
158}
159
160impl Node for EcsDeployCluster {
161 type Port = u16;
162 type Meta = ();
163 type InstantiateEnv = EcsDeploy;
164
165 #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
166 fn next_port(&self) -> Self::Port {
167 let port = {
168 let mut borrow = self.next_port.borrow_mut();
169 let port = *borrow;
170 *borrow += 1;
171 port
172 };
173
174 port
175 }
176
177 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
178 fn update_meta(&self, _meta: &Self::Meta) {}
179
180 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, extra_stmts = extra_stmts.len()))]
181 fn instantiate(
182 &self,
183 _env: &mut Self::InstantiateEnv,
184 _meta: &mut Self::Meta,
185 graph: DfirGraph,
186 extra_stmts: &[syn::Stmt],
187 sidecars: &[syn::Expr],
188 ) {
189 let (bin_name, config) = create_graph_trybuild(
190 graph,
191 extra_stmts,
192 sidecars,
193 Some(&self.name),
194 crate::compile::trybuild::generate::DeployMode::Containerized,
195 crate::compile::trybuild::generate::LinkingMode::Static,
196 );
197
198 *self.trybuild_config.borrow_mut() = Some((bin_name, config));
200 }
201}
202
203#[derive(Clone, Debug)]
205pub struct EcsDeployExternal {
206 name: String,
207 next_port: Rc<RefCell<u16>>,
208}
209
210impl Node for EcsDeployExternal {
211 type Port = u16;
212 type Meta = ();
213 type InstantiateEnv = EcsDeploy;
214
215 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
216 fn next_port(&self) -> Self::Port {
217 let port = {
218 let mut borrow = self.next_port.borrow_mut();
219 let port = *borrow;
220 *borrow += 1;
221 port
222 };
223
224 port
225 }
226
227 #[instrument(level = "trace", skip_all, fields(name = self.name))]
228 fn update_meta(&self, _meta: &Self::Meta) {}
229
230 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
231 fn instantiate(
232 &self,
233 _env: &mut Self::InstantiateEnv,
234 meta: &mut Self::Meta,
235 graph: DfirGraph,
236 extra_stmts: &[syn::Stmt],
237 sidecars: &[syn::Expr],
238 ) {
239 trace!(name: "surface", surface = graph.surface_syntax_string());
240 }
241}
242
243type DynSourceSink<Out, In, InErr> = (
244 Pin<Box<dyn Stream<Item = Out>>>,
245 Pin<Box<dyn Sink<In, Error = InErr>>>,
246);
247
248impl<'a> RegisterPort<'a, EcsDeploy> for EcsDeployExternal {
249 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
250 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {}
251
252 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
253 fn as_bytes_bidi(
254 &self,
255 _external_port_id: ExternalPortId,
256 ) -> impl Future<
257 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
258 > + 'a {
259 async { unimplemented!() }
260 }
261
262 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
263 fn as_bincode_bidi<InT, OutT>(
264 &self,
265 _external_port_id: ExternalPortId,
266 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
267 where
268 InT: Serialize + 'static,
269 OutT: serde::de::DeserializeOwned + 'static,
270 {
271 async { unimplemented!() }
272 }
273
274 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
275 fn as_bincode_sink<T>(
276 &self,
277 _external_port_id: ExternalPortId,
278 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
279 where
280 T: Serialize + 'static,
281 {
282 async { unimplemented!() }
283 }
284
285 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
286 fn as_bincode_source<T>(
287 &self,
288 _external_port_id: ExternalPortId,
289 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
290 where
291 T: serde::de::DeserializeOwned + 'static,
292 {
293 async { unimplemented!() }
294 }
295}
296
297pub struct EcsDeploy;
299
300impl Default for EcsDeploy {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306impl EcsDeploy {
307 pub fn new() -> Self {
309 Self
310 }
311
312 pub fn add_ecs_process(&mut self) -> EcsDeployProcessSpec {
314 EcsDeployProcessSpec
315 }
316
317 pub fn add_ecs_cluster(&mut self, count: usize) -> EcsDeployClusterSpec {
319 EcsDeployClusterSpec { count }
320 }
321
322 pub fn add_external(&self, name: String) -> EcsDeployExternalSpec {
324 EcsDeployExternalSpec { name }
325 }
326
327 #[instrument(level = "trace", skip_all)]
335 pub fn export(&self, nodes: &DeployResult<'_, Self>) -> HydroManifest {
336 let mut manifest = HydroManifest {
337 processes: BTreeMap::new(),
338 clusters: BTreeMap::new(),
339 };
340
341 for (location_id, name_hint, process) in nodes.get_all_processes() {
343 let LocationId::Process(_) = location_id else {
344 unreachable!()
345 };
346
347 let (bin_name, trybuild_config) = process
348 .trybuild_config
349 .borrow()
350 .clone()
351 .expect("trybuild_config should be set after instantiate");
352
353 manifest.processes.insert(
354 name_hint.to_owned(),
355 ProcessManifest {
356 build: build_info_from_config(&bin_name, &trybuild_config),
357 ports: process.exposed_ports.borrow().clone(),
358 task_family: process.name.clone(),
359 },
360 );
361 }
362
363 for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
365 let LocationId::Cluster(_) = location_id else {
366 unreachable!()
367 };
368
369 let (bin_name, trybuild_config) = cluster
370 .trybuild_config
371 .borrow()
372 .clone()
373 .expect("trybuild_config should be set after instantiate");
374
375 manifest.clusters.insert(
376 name_hint.to_owned(),
377 ClusterManifest {
378 build: build_info_from_config(&bin_name, &trybuild_config),
379 ports: cluster.exposed_ports.borrow().clone(),
380 default_count: cluster.count,
381 task_family_prefix: cluster.name.clone(),
382 },
383 );
384 }
385
386 manifest
387 }
388}
389
390fn build_info_from_config(
391 bin_name: &str,
392 config: &crate::compile::trybuild::generate::TrybuildConfig,
393) -> BuildConfig {
394 let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
395 if let Some(extra) = &config.features {
396 features.extend(extra.clone());
397 }
398 let crate_name = config
399 .project_dir
400 .file_name()
401 .and_then(|n| n.to_str())
402 .unwrap_or("unknown")
403 .replace("_", "-");
404
405 let package_name = format!("{}-hydro-trybuild", crate_name);
406
407 BuildConfig {
408 project_dir: config.project_dir.to_string_lossy().into_owned(),
409 target_dir: config.target_dir.to_string_lossy().into_owned(),
410 bin_name: bin_name.to_owned(),
411 package_name,
412 features,
413 }
414}
415
416impl<'a> Deploy<'a> for EcsDeploy {
417 type InstantiateEnv = Self;
418 type Process = EcsDeployProcess;
419 type Cluster = EcsDeployCluster;
420 type External = EcsDeployExternal;
421 type Meta = ();
422
423 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
424 fn o2o_sink_source(
425 _env: &mut Self::InstantiateEnv,
426 p1: &Self::Process,
427 p1_port: &<Self::Process as Node>::Port,
428 p2: &Self::Process,
429 p2_port: &<Self::Process as Node>::Port,
430 name: Option<&str>,
431 networking_info: &crate::networking::NetworkingInfo,
432 ) -> (syn::Expr, syn::Expr) {
433 match networking_info {
434 crate::networking::NetworkingInfo::Tcp {
435 fault: crate::networking::TcpFault::FailStop,
436 } => {}
437 _ => panic!("Unsupported networking info: {:?}", networking_info),
438 }
439
440 deploy_containerized_o2o(
441 &p2.name,
442 name.expect("channel name is required for containerized deployment"),
443 )
444 }
445
446 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
447 fn o2o_connect(
448 p1: &Self::Process,
449 p1_port: &<Self::Process as Node>::Port,
450 p2: &Self::Process,
451 p2_port: &<Self::Process as Node>::Port,
452 ) -> Box<dyn FnOnce()> {
453 let serialized = format!(
454 "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
455 p1.name, p2.name
456 );
457
458 Box::new(move || {
459 trace!(name: "o2o_connect thunk", %serialized);
460 })
461 }
462
463 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
464 fn o2m_sink_source(
465 _env: &mut Self::InstantiateEnv,
466 p1: &Self::Process,
467 p1_port: &<Self::Process as Node>::Port,
468 c2: &Self::Cluster,
469 c2_port: &<Self::Cluster as Node>::Port,
470 name: Option<&str>,
471 networking_info: &crate::networking::NetworkingInfo,
472 ) -> (syn::Expr, syn::Expr) {
473 match networking_info {
474 crate::networking::NetworkingInfo::Tcp {
475 fault: crate::networking::TcpFault::FailStop,
476 } => {}
477 _ => panic!("Unsupported networking info: {:?}", networking_info),
478 }
479
480 deploy_containerized_o2m(
481 name.expect("channel name is required for containerized deployment"),
482 )
483 }
484
485 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
486 fn o2m_connect(
487 p1: &Self::Process,
488 p1_port: &<Self::Process as Node>::Port,
489 c2: &Self::Cluster,
490 c2_port: &<Self::Cluster as Node>::Port,
491 ) -> Box<dyn FnOnce()> {
492 let serialized = format!(
493 "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
494 p1.name, c2.name
495 );
496
497 Box::new(move || {
498 trace!(name: "o2m_connect thunk", %serialized);
499 })
500 }
501
502 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
503 fn m2o_sink_source(
504 _env: &mut Self::InstantiateEnv,
505 c1: &Self::Cluster,
506 c1_port: &<Self::Cluster as Node>::Port,
507 p2: &Self::Process,
508 p2_port: &<Self::Process as Node>::Port,
509 name: Option<&str>,
510 networking_info: &crate::networking::NetworkingInfo,
511 ) -> (syn::Expr, syn::Expr) {
512 match networking_info {
513 crate::networking::NetworkingInfo::Tcp {
514 fault: crate::networking::TcpFault::FailStop,
515 } => {}
516 _ => panic!("Unsupported networking info: {:?}", networking_info),
517 }
518
519 deploy_containerized_m2o(
520 &p2.name,
521 name.expect("channel name is required for containerized deployment"),
522 )
523 }
524
525 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
526 fn m2o_connect(
527 c1: &Self::Cluster,
528 c1_port: &<Self::Cluster as Node>::Port,
529 p2: &Self::Process,
530 p2_port: &<Self::Process as Node>::Port,
531 ) -> Box<dyn FnOnce()> {
532 let serialized = format!(
533 "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
534 c1.name, p2.name
535 );
536
537 Box::new(move || {
538 trace!(name: "m2o_connect thunk", %serialized);
539 })
540 }
541
542 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
543 fn m2m_sink_source(
544 _env: &mut Self::InstantiateEnv,
545 c1: &Self::Cluster,
546 c1_port: &<Self::Cluster as Node>::Port,
547 c2: &Self::Cluster,
548 c2_port: &<Self::Cluster as Node>::Port,
549 name: Option<&str>,
550 networking_info: &crate::networking::NetworkingInfo,
551 ) -> (syn::Expr, syn::Expr) {
552 match networking_info {
553 crate::networking::NetworkingInfo::Tcp {
554 fault: crate::networking::TcpFault::FailStop,
555 } => {}
556 _ => panic!("Unsupported networking info: {:?}", networking_info),
557 }
558
559 deploy_containerized_m2m(
560 name.expect("channel name is required for containerized deployment"),
561 )
562 }
563
564 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
565 fn m2m_connect(
566 c1: &Self::Cluster,
567 c1_port: &<Self::Cluster as Node>::Port,
568 c2: &Self::Cluster,
569 c2_port: &<Self::Cluster as Node>::Port,
570 ) -> Box<dyn FnOnce()> {
571 let serialized = format!(
572 "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
573 c1.name, c2.name
574 );
575
576 Box::new(move || {
577 trace!(name: "m2m_connect thunk", %serialized);
578 })
579 }
580
581 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
582 fn e2o_many_source(
583 extra_stmts: &mut Vec<syn::Stmt>,
584 p2: &Self::Process,
585 p2_port: &<Self::Process as Node>::Port,
586 codec_type: &syn::Type,
587 shared_handle: String,
588 ) -> syn::Expr {
589 p2.exposed_ports
590 .borrow_mut()
591 .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
592
593 let socket_ident = syn::Ident::new(
594 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
595 Span::call_site(),
596 );
597
598 let source_ident = syn::Ident::new(
599 &format!("__hydro_deploy_many_{}_source", &shared_handle),
600 Span::call_site(),
601 );
602
603 let sink_ident = syn::Ident::new(
604 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
605 Span::call_site(),
606 );
607
608 let membership_ident = syn::Ident::new(
609 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
610 Span::call_site(),
611 );
612
613 let bind_addr = format!("0.0.0.0:{}", p2_port);
614
615 extra_stmts.push(syn::parse_quote! {
616 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
617 });
618
619 let root = crate::staging_util::get_this_crate();
620
621 extra_stmts.push(syn::parse_quote! {
622 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
623 });
624
625 parse_quote!(#source_ident)
626 }
627
628 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
629 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
630 let sink_ident = syn::Ident::new(
631 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
632 Span::call_site(),
633 );
634 parse_quote!(#sink_ident)
635 }
636
637 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
638 fn e2o_source(
639 extra_stmts: &mut Vec<syn::Stmt>,
640 p1: &Self::External,
641 p1_port: &<Self::External as Node>::Port,
642 p2: &Self::Process,
643 p2_port: &<Self::Process as Node>::Port,
644 codec_type: &syn::Type,
645 shared_handle: String,
646 ) -> syn::Expr {
647 p2.exposed_ports
649 .borrow_mut()
650 .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
651
652 let source_ident = syn::Ident::new(
653 &format!("__hydro_deploy_{}_source", &shared_handle),
654 Span::call_site(),
655 );
656
657 let bind_addr = format!("0.0.0.0:{}", p2_port);
658
659 let socket_ident = syn::Ident::new(
662 &format!("__hydro_deploy_{}_socket", &shared_handle),
663 Span::call_site(),
664 );
665
666 let sink_ident = syn::Ident::new(
667 &format!("__hydro_deploy_{}_sink", &shared_handle),
668 Span::call_site(),
669 );
670
671 extra_stmts.push(syn::parse_quote! {
672 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
673 });
674
675 let create_expr = deploy_containerized_external_sink_source_ident(bind_addr, socket_ident);
676
677 extra_stmts.push(syn::parse_quote! {
678 let (#sink_ident, #source_ident) = (#create_expr).split();
679 });
680
681 parse_quote!(#source_ident)
682 }
683
684 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
685 fn e2o_connect(
686 p1: &Self::External,
687 p1_port: &<Self::External as Node>::Port,
688 p2: &Self::Process,
689 p2_port: &<Self::Process as Node>::Port,
690 many: bool,
691 server_hint: NetworkHint,
692 ) -> Box<dyn FnOnce()> {
693 let serialized = format!(
694 "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
695 p1.name, p2.name
696 );
697
698 Box::new(move || {
699 trace!(name: "e2o_connect thunk", %serialized);
700 })
701 }
702
703 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
704 fn o2e_sink(
705 p1: &Self::Process,
706 p1_port: &<Self::Process as Node>::Port,
707 p2: &Self::External,
708 p2_port: &<Self::External as Node>::Port,
709 shared_handle: String,
710 ) -> syn::Expr {
711 let sink_ident = syn::Ident::new(
712 &format!("__hydro_deploy_{}_sink", &shared_handle),
713 Span::call_site(),
714 );
715 parse_quote!(#sink_ident)
716 }
717
718 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
719 fn cluster_ids(
720 of_cluster: LocationKey,
721 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
722 cluster_ids()
723 }
724
725 #[instrument(level = "trace", skip_all)]
726 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
727 cluster_self_id()
728 }
729
730 #[instrument(level = "trace", skip_all, fields(?location_id))]
731 fn cluster_membership_stream(
732 _env: &mut Self::InstantiateEnv,
733 _at_location: &LocationId,
734 location_id: &LocationId,
735 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
736 {
737 cluster_membership_stream(location_id)
738 }
739}
740
741#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location))]
742fn get_ecs_image_name(name_hint: &str, location: LocationKey) -> String {
743 let name_hint = name_hint
744 .split("::")
745 .last()
746 .unwrap()
747 .to_ascii_lowercase()
748 .replace(".", "-")
749 .replace("_", "-")
750 .replace("::", "-");
751
752 format!("hy-{name_hint}-{location}")
753}
754
755#[derive(Clone)]
757pub struct EcsDeployProcessSpec;
758
759impl<'a> ProcessSpec<'a, EcsDeploy> for EcsDeployProcessSpec {
760 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
761 fn build(self, id: LocationKey, name_hint: &'_ str) -> <EcsDeploy as Deploy<'a>>::Process {
762 EcsDeployProcess {
763 id,
764 name: get_ecs_image_name(name_hint, id),
765 next_port: Rc::new(RefCell::new(10001)),
766 exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
767 trybuild_config: Rc::new(RefCell::new(None)),
768 }
769 }
770}
771
772#[derive(Clone)]
774pub struct EcsDeployClusterSpec {
775 count: usize,
776}
777
778impl<'a> ClusterSpec<'a, EcsDeploy> for EcsDeployClusterSpec {
779 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
780 fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::Cluster {
781 EcsDeployCluster {
782 id,
783 name: get_ecs_image_name(name_hint, id),
784 next_port: Rc::new(RefCell::new(10001)),
785 exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
786 count: self.count,
787 trybuild_config: Rc::new(RefCell::new(None)),
788 }
789 }
790}
791
792pub struct EcsDeployExternalSpec {
794 name: String,
795}
796
797impl<'a> ExternalSpec<'a, EcsDeploy> for EcsDeployExternalSpec {
798 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
799 fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::External {
800 EcsDeployExternal {
801 name: self.name,
802 next_port: Rc::new(RefCell::new(10000)),
803 }
804 }
805}