1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45 name: String,
46}
47
48impl DockerNetwork {
49 pub fn new(name: String) -> Self {
51 Self {
52 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct DockerDeployProcess {
60 key: LocationKey,
61 name: String,
62 next_port: Rc<RefCell<u16>>,
63 rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65 exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67 docker_container_name: Rc<RefCell<Option<String>>>,
68
69 compilation_options: Option<String>,
70
71 config: Vec<String>,
72
73 network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77 type Port = u16;
78 type Meta = ();
79 type InstantiateEnv = DockerDeploy;
80
81 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82 fn next_port(&self) -> Self::Port {
83 let port = {
84 let mut borrow = self.next_port.borrow_mut();
85 let port = *borrow;
86 *borrow += 1;
87 port
88 };
89
90 port
91 }
92
93 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94 fn update_meta(&self, _meta: &Self::Meta) {}
95
96 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97 fn instantiate(
98 &self,
99 _env: &mut Self::InstantiateEnv,
100 meta: &mut Self::Meta,
101 graph: DfirGraph,
102 extra_stmts: &[syn::Stmt],
103 sidecars: &[syn::Expr],
104 ) {
105 let (bin_name, config) = create_graph_trybuild(
106 graph,
107 extra_stmts,
108 sidecars,
109 Some(&self.name),
110 crate::compile::trybuild::generate::DeployMode::Containerized,
111 LinkingMode::Static,
112 );
113
114 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115 .target_dir(config.target_dir)
116 .example(bin_name)
117 .no_default_features();
118
119 ret = ret.display_name("test_display_name");
120
121 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123 if let Some(features) = config.features {
124 ret = ret.features(features);
125 }
126
127 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128 ret = ret.config("build.incremental = false");
129
130 *self.rust_crate.borrow_mut() = Some(ret);
131 }
132}
133
134#[derive(Clone)]
136pub struct DockerDeployCluster {
137 key: LocationKey,
138 name: String,
139 next_port: Rc<RefCell<u16>>,
140 rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142 docker_container_name: Rc<RefCell<Vec<String>>>,
143
144 compilation_options: Option<String>,
145
146 config: Vec<String>,
147
148 count: usize,
149}
150
151impl Node for DockerDeployCluster {
152 type Port = u16;
153 type Meta = ();
154 type InstantiateEnv = DockerDeploy;
155
156 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157 fn next_port(&self) -> Self::Port {
158 let port = {
159 let mut borrow = self.next_port.borrow_mut();
160 let port = *borrow;
161 *borrow += 1;
162 port
163 };
164
165 port
166 }
167
168 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169 fn update_meta(&self, _meta: &Self::Meta) {}
170
171 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172 fn instantiate(
173 &self,
174 _env: &mut Self::InstantiateEnv,
175 _meta: &mut Self::Meta,
176 graph: DfirGraph,
177 extra_stmts: &[syn::Stmt],
178 sidecars: &[syn::Expr],
179 ) {
180 let (bin_name, config) = create_graph_trybuild(
181 graph,
182 extra_stmts,
183 sidecars,
184 Some(&self.name),
185 crate::compile::trybuild::generate::DeployMode::Containerized,
186 LinkingMode::Static,
187 );
188
189 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190 .target_dir(config.target_dir)
191 .example(bin_name)
192 .no_default_features();
193
194 ret = ret.display_name("test_display_name");
195
196 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198 if let Some(features) = config.features {
199 ret = ret.features(features);
200 }
201
202 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203 ret = ret.config("build.incremental = false");
204
205 *self.rust_crate.borrow_mut() = Some(ret);
206 }
207}
208
209#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212 name: String,
213 next_port: Rc<RefCell<u16>>,
214
215 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217 #[expect(clippy::type_complexity, reason = "internal code")]
218 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222 type Port = u16;
223 type Meta = ();
224 type InstantiateEnv = DockerDeploy;
225
226 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227 fn next_port(&self) -> Self::Port {
228 let port = {
229 let mut borrow = self.next_port.borrow_mut();
230 let port = *borrow;
231 *borrow += 1;
232 port
233 };
234
235 port
236 }
237
238 #[instrument(level = "trace", skip_all, fields(name = self.name))]
239 fn update_meta(&self, _meta: &Self::Meta) {}
240
241 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242 fn instantiate(
243 &self,
244 _env: &mut Self::InstantiateEnv,
245 meta: &mut Self::Meta,
246 graph: DfirGraph,
247 extra_stmts: &[syn::Stmt],
248 sidecars: &[syn::Expr],
249 ) {
250 trace!(name: "surface", surface = graph.surface_syntax_string());
251 }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255 Pin<Box<dyn Stream<Item = Out>>>,
256 Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262 self.ports.borrow_mut().insert(external_port_id, port);
263 }
264
265 fn as_bytes_bidi(
266 &self,
267 external_port_id: ExternalPortId,
268 ) -> impl Future<
269 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270 > + 'a {
271 let guard =
272 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275 let (docker_container_name, remote_port, _) = self
276 .connection_info
277 .borrow()
278 .get(&local_port)
279 .unwrap()
280 .clone();
281
282 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284 async move {
285 let local_port =
286 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287 let remote_ip_address = "localhost";
288
289 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292 .await
293 .unwrap();
294
295 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297 let (rx, tx) = stream.into_split();
298
299 let source = Box::pin(
300 FramedRead::new(rx, LengthDelimitedCodec::new()),
301 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306 (source, sink)
307 }
308 .instrument(guard.exit())
309 }
310
311 fn as_bincode_bidi<InT, OutT>(
312 &self,
313 external_port_id: ExternalPortId,
314 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315 where
316 InT: serde::Serialize + 'static,
317 OutT: serde::de::DeserializeOwned + 'static,
318 {
319 let guard =
320 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323 let (docker_container_name, remote_port, _) = self
324 .connection_info
325 .borrow()
326 .get(&local_port)
327 .unwrap()
328 .clone();
329
330 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332 async move {
333 let local_port =
334 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335 let remote_ip_address = "localhost";
336
337 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340 .await
341 .unwrap();
342
343 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345 let (rx, tx) = stream.into_split();
346
347 let source = Box::pin(
348 FramedRead::new(rx, LengthDelimitedCodec::new())
349 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350 ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352 let sink = Box::pin(
353 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355 }),
356 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358 (source, sink)
359 }
360 .instrument(guard.exit())
361 }
362
363 fn as_bincode_sink<T>(
364 &self,
365 external_port_id: ExternalPortId,
366 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367 where
368 T: serde::Serialize + 'static,
369 {
370 let guard =
371 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374 let (docker_container_name, remote_port, _) = self
375 .connection_info
376 .borrow()
377 .get(&local_port)
378 .unwrap()
379 .clone();
380
381 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383 async move {
384 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385 let remote_ip_address = "localhost";
386
387 Box::pin(
388 LazySink::new(move || {
389 Box::pin(async move {
390 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392 let stream =
393 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394 .await?;
395
396 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398 Result::<_, std::io::Error>::Ok(FramedWrite::new(
399 stream,
400 LengthDelimitedCodec::new(),
401 ))
402 })
403 })
404 .with(move |v| async move {
405 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406 }),
407 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408 }
409 .instrument(guard.exit())
410 }
411
412 fn as_bincode_source<T>(
413 &self,
414 external_port_id: ExternalPortId,
415 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416 where
417 T: serde::de::DeserializeOwned + 'static,
418 {
419 let guard =
420 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423 let (docker_container_name, remote_port, _) = self
424 .connection_info
425 .borrow()
426 .get(&local_port)
427 .unwrap()
428 .clone();
429
430 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432 async move {
433
434 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435 let remote_ip_address = "localhost";
436
437 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440 .await
441 .unwrap();
442
443 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445 Box::pin(
446 FramedRead::new(stream, LengthDelimitedCodec::new())
447 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448 ) as Pin<Box<dyn Stream<Item = T>>>
449 }
450 .instrument(guard.exit())
451 }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456 docker_container_name: &str,
457 destination_port: u16,
458) -> u16 {
459 let docker = Docker::connect_with_local_defaults().unwrap();
460
461 let container_info = docker
462 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463 .await
464 .unwrap();
465
466 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468 let remote_port = container_info
470 .network_settings
471 .as_ref()
472 .unwrap()
473 .ports
474 .as_ref()
475 .unwrap()
476 .get(&format!("{destination_port}/tcp"))
477 .unwrap()
478 .as_ref()
479 .unwrap()
480 .iter()
481 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482 .unwrap()
483 .host_port
484 .as_ref()
485 .unwrap()
486 .parse()
487 .unwrap();
488
489 remote_port
490}
491
492pub struct DockerDeploy {
494 docker_processes: Vec<DockerDeployProcessSpec>,
495 docker_clusters: Vec<DockerDeployClusterSpec>,
496 network: DockerNetwork,
497 deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502 docker: &Docker,
503 container_name: &str,
504 image_name: &str,
505 network_name: &str,
506 deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508 let config = ContainerCreateBody {
509 image: Some(image_name.to_owned()),
510 hostname: Some(container_name.to_owned()),
511 host_config: Some(HostConfig {
512 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513 publish_all_ports: Some(true),
514 port_bindings: Some(HashMap::new()), ..Default::default()
516 }),
517 env: Some(vec![
518 format!("CONTAINER_NAME={container_name}"),
519 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520 format!("RUST_LOG=trace"),
521 ]),
522 networking_config: Some(NetworkingConfig {
523 endpoints_config: Some(HashMap::from([(
524 network_name.to_owned(),
525 EndpointSettings {
526 ..Default::default()
527 },
528 )])),
529 }),
530 tty: Some(true),
531 ..Default::default()
532 };
533
534 let options = CreateContainerOptions {
535 name: Some(container_name.to_owned()),
536 ..Default::default()
537 };
538
539 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540 docker.create_container(Some(options), config).await?;
541 docker
542 .start_container(container_name, None::<StartContainerOptions>)
543 .await?;
544
545 Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551 compilation_options: Option<&str>,
552 config: &[String],
553 exposed_ports: &[u16],
554 image_name: &str,
555) -> Result<(), anyhow::Error> {
556 let mut rust_crate = rust_crate
557 .borrow_mut()
558 .take()
559 .unwrap()
560 .rustflags(compilation_options.unwrap_or_default());
561
562 for cfg in config {
563 rust_crate = rust_crate.config(cfg);
564 }
565
566 let build_output = match build_crate_memoized(
567 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568 )
569 .await
570 {
571 Ok(build_output) => build_output,
572 Err(BuildError::FailedToBuildCrate {
573 exit_status,
574 diagnostics,
575 text_lines,
576 stderr_lines,
577 }) => {
578 let diagnostics = diagnostics
579 .into_iter()
580 .map(|d| d.rendered.unwrap())
581 .collect::<Vec<_>>()
582 .join("\n");
583 let text_lines = text_lines.join("\n");
584 let stderr_lines = stderr_lines.join("\n");
585
586 anyhow::bail!(
587 r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611 );
612 }
613 Err(err) => {
614 anyhow::bail!("Failed to build crate {err:?}");
615 }
616 };
617
618 let docker = Docker::connect_with_local_defaults()?;
619
620 let mut tar_data = Vec::new();
621 {
622 let mut tar = Builder::new(&mut tar_data);
623
624 let exposed_ports = exposed_ports
625 .iter()
626 .map(|port| format!("EXPOSE {port}/tcp"))
627 .collect::<Vec<_>>()
628 .join("\n");
629
630 let dockerfile_content = format!(
631 r#"
632 FROM scratch
633 {exposed_ports}
634 COPY app /app
635 CMD ["/app"]
636 "#,
637 );
638
639 trace!(name: "dockerfile", %dockerfile_content);
640
641 let mut header = Header::new_gnu();
642 header.set_path("Dockerfile")?;
643 header.set_size(dockerfile_content.len() as u64);
644 header.set_cksum();
645 tar.append(&header, dockerfile_content.as_bytes())?;
646
647 let mut header = Header::new_gnu();
648 header.set_path("app")?;
649 header.set_size(build_output.bin_data.len() as u64);
650 header.set_mode(0o755);
651 header.set_cksum();
652 tar.append(&header, &build_output.bin_data[..])?;
653
654 tar.finish()?;
655 }
656
657 let build_options = BuildImageOptions {
658 dockerfile: "Dockerfile".to_owned(),
659 t: Some(image_name.to_owned()),
660 rm: true,
661 ..Default::default()
662 };
663
664 use bollard::errors::Error;
665
666 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667 let mut build_stream = docker.build_image(build_options, None, Some(body));
668 while let Some(msg) = build_stream.next().await {
669 match msg {
670 Ok(_) => {}
671 Err(e) => match e {
672 Error::DockerStreamError { error } => {
673 return Err(anyhow::anyhow!(
674 "Docker build failed: DockerStreamError: {{ error: {error} }}"
675 ));
676 }
677 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678 },
679 }
680 }
681
682 Ok(())
683}
684
685impl DockerDeploy {
686 pub fn new(network: DockerNetwork) -> Self {
688 Self {
689 docker_processes: Vec::new(),
690 docker_clusters: Vec::new(),
691 network,
692 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693 }
694 }
695
696 pub fn add_localhost_docker(
698 &mut self,
699 compilation_options: Option<String>,
700 config: Vec<String>,
701 ) -> DockerDeployProcessSpec {
702 let process = DockerDeployProcessSpec {
703 compilation_options,
704 config,
705 network: self.network.clone(),
706 deployment_instance: self.deployment_instance.clone(),
707 };
708
709 self.docker_processes.push(process.clone());
710
711 process
712 }
713
714 pub fn add_localhost_docker_cluster(
716 &mut self,
717 compilation_options: Option<String>,
718 config: Vec<String>,
719 count: usize,
720 ) -> DockerDeployClusterSpec {
721 let cluster = DockerDeployClusterSpec {
722 compilation_options,
723 config,
724 count,
725 deployment_instance: self.deployment_instance.clone(),
726 };
727
728 self.docker_clusters.push(cluster.clone());
729
730 cluster
731 }
732
733 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735 DockerDeployExternalSpec { name }
736 }
737
738 pub fn get_deployment_instance(&self) -> String {
740 self.deployment_instance.clone()
741 }
742
743 #[instrument(level = "trace", skip_all)]
745 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746 for (_, _, process) in nodes.get_all_processes() {
747 let exposed_ports = process.exposed_ports.borrow().clone();
748
749 build_and_create_image(
750 &process.rust_crate,
751 process.compilation_options.as_deref(),
752 &process.config,
753 &exposed_ports,
754 &process.name,
755 )
756 .await?;
757 }
758
759 for (_, _, cluster) in nodes.get_all_clusters() {
760 build_and_create_image(
761 &cluster.rust_crate,
762 cluster.compilation_options.as_deref(),
763 &cluster.config,
764 &[], &cluster.name,
766 )
767 .await?;
768 }
769
770 Ok(())
771 }
772
773 #[instrument(level = "trace", skip_all)]
775 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776 let docker = Docker::connect_with_local_defaults()?;
777
778 match docker
779 .create_network(NetworkCreateRequest {
780 name: self.network.name.clone(),
781 driver: Some("bridge".to_owned()),
782 ..Default::default()
783 })
784 .await
785 {
786 Ok(v) => v.id,
787 Err(e) => {
788 panic!("Failed to create docker network: {e:?}");
789 }
790 };
791
792 for (_, _, process) in nodes.get_all_processes() {
793 let docker_container_name: String = get_docker_container_name(&process.name, None);
794 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796 create_and_start_container(
797 &docker,
798 &docker_container_name,
799 &process.name,
800 &self.network.name,
801 &self.deployment_instance,
802 )
803 .await?;
804 }
805
806 for (_, _, cluster) in nodes.get_all_clusters() {
807 for num in 0..cluster.count {
808 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809 cluster
810 .docker_container_name
811 .borrow_mut()
812 .push(docker_container_name.clone());
813
814 create_and_start_container(
815 &docker,
816 &docker_container_name,
817 &cluster.name,
818 &self.network.name,
819 &self.deployment_instance,
820 )
821 .await?;
822 }
823 }
824
825 Ok(())
826 }
827
828 #[instrument(level = "trace", skip_all)]
830 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831 let docker = Docker::connect_with_local_defaults()?;
832
833 for (_, _, process) in nodes.get_all_processes() {
834 let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836 docker
837 .kill_container(&docker_container_name, None::<KillContainerOptions>)
838 .await?;
839 }
840
841 for (_, _, cluster) in nodes.get_all_clusters() {
842 for num in 0..cluster.count {
843 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845 docker
846 .kill_container(&docker_container_name, None::<KillContainerOptions>)
847 .await?;
848 }
849 }
850
851 Ok(())
852 }
853
854 #[instrument(level = "trace", skip_all)]
856 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857 let docker = Docker::connect_with_local_defaults()?;
858
859 for (_, _, process) in nodes.get_all_processes() {
860 let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862 docker
863 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864 .await?;
865 }
866
867 for (_, _, cluster) in nodes.get_all_clusters() {
868 for num in 0..cluster.count {
869 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871 docker
872 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873 .await?;
874 }
875 }
876
877 docker
878 .remove_network(&self.network.name)
879 .await
880 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882 use bollard::query_parameters::RemoveImageOptions;
883
884 for (_, _, process) in nodes.get_all_processes() {
885 docker
886 .remove_image(&process.name, None::<RemoveImageOptions>, None)
887 .await?;
888 }
889
890 for (_, _, cluster) in nodes.get_all_clusters() {
891 docker
892 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893 .await?;
894 }
895
896 Ok(())
897 }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901 type Meta = ();
902 type InstantiateEnv = Self;
903
904 type Process = DockerDeployProcess;
905 type Cluster = DockerDeployCluster;
906 type External = DockerDeployExternal;
907
908 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909 fn o2o_sink_source(
910 _env: &mut Self::InstantiateEnv,
911 p1: &Self::Process,
912 p1_port: &<Self::Process as Node>::Port,
913 p2: &Self::Process,
914 p2_port: &<Self::Process as Node>::Port,
915 _name: Option<&str>,
916 networking_info: &crate::networking::NetworkingInfo,
917 ) -> (syn::Expr, syn::Expr) {
918 match networking_info {
919 crate::networking::NetworkingInfo::Tcp {
920 fault: crate::networking::TcpFault::FailStop,
921 } => {}
922 _ => panic!("Unsupported networking info: {:?}", networking_info),
923 }
924 let bind_addr = format!("0.0.0.0:{}", p2_port);
925 let target = format!("{}:{p2_port}", p2.name);
926
927 deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
928 }
929
930 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
931 fn o2o_connect(
932 p1: &Self::Process,
933 p1_port: &<Self::Process as Node>::Port,
934 p2: &Self::Process,
935 p2_port: &<Self::Process as Node>::Port,
936 ) -> Box<dyn FnOnce()> {
937 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
938
939 Box::new(move || {
940 trace!(name: "o2o_connect thunk", %serialized);
941 })
942 }
943
944 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
945 fn o2m_sink_source(
946 _env: &mut Self::InstantiateEnv,
947 p1: &Self::Process,
948 p1_port: &<Self::Process as Node>::Port,
949 c2: &Self::Cluster,
950 c2_port: &<Self::Cluster as Node>::Port,
951 _name: Option<&str>,
952 networking_info: &crate::networking::NetworkingInfo,
953 ) -> (syn::Expr, syn::Expr) {
954 match networking_info {
955 crate::networking::NetworkingInfo::Tcp {
956 fault: crate::networking::TcpFault::FailStop,
957 } => {}
958 _ => panic!("Unsupported networking info: {:?}", networking_info),
959 }
960 deploy_containerized_o2m(*c2_port)
961 }
962
963 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
964 fn o2m_connect(
965 p1: &Self::Process,
966 p1_port: &<Self::Process as Node>::Port,
967 c2: &Self::Cluster,
968 c2_port: &<Self::Cluster as Node>::Port,
969 ) -> Box<dyn FnOnce()> {
970 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
971
972 Box::new(move || {
973 trace!(name: "o2m_connect thunk", %serialized);
974 })
975 }
976
977 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
978 fn m2o_sink_source(
979 _env: &mut Self::InstantiateEnv,
980 c1: &Self::Cluster,
981 c1_port: &<Self::Cluster as Node>::Port,
982 p2: &Self::Process,
983 p2_port: &<Self::Process as Node>::Port,
984 _name: Option<&str>,
985 networking_info: &crate::networking::NetworkingInfo,
986 ) -> (syn::Expr, syn::Expr) {
987 match networking_info {
988 crate::networking::NetworkingInfo::Tcp {
989 fault: crate::networking::TcpFault::FailStop,
990 } => {}
991 _ => panic!("Unsupported networking info: {:?}", networking_info),
992 }
993 deploy_containerized_m2o(*p2_port, &p2.name)
994 }
995
996 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
997 fn m2o_connect(
998 c1: &Self::Cluster,
999 c1_port: &<Self::Cluster as Node>::Port,
1000 p2: &Self::Process,
1001 p2_port: &<Self::Process as Node>::Port,
1002 ) -> Box<dyn FnOnce()> {
1003 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1004
1005 Box::new(move || {
1006 trace!(name: "m2o_connect thunk", %serialized);
1007 })
1008 }
1009
1010 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1011 fn m2m_sink_source(
1012 _env: &mut Self::InstantiateEnv,
1013 c1: &Self::Cluster,
1014 c1_port: &<Self::Cluster as Node>::Port,
1015 c2: &Self::Cluster,
1016 c2_port: &<Self::Cluster as Node>::Port,
1017 _name: Option<&str>,
1018 networking_info: &crate::networking::NetworkingInfo,
1019 ) -> (syn::Expr, syn::Expr) {
1020 match networking_info {
1021 crate::networking::NetworkingInfo::Tcp {
1022 fault: crate::networking::TcpFault::FailStop,
1023 } => {}
1024 _ => panic!("Unsupported networking info: {:?}", networking_info),
1025 }
1026 deploy_containerized_m2m(*c2_port)
1027 }
1028
1029 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1030 fn m2m_connect(
1031 c1: &Self::Cluster,
1032 c1_port: &<Self::Cluster as Node>::Port,
1033 c2: &Self::Cluster,
1034 c2_port: &<Self::Cluster as Node>::Port,
1035 ) -> Box<dyn FnOnce()> {
1036 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1037
1038 Box::new(move || {
1039 trace!(name: "m2m_connect thunk", %serialized);
1040 })
1041 }
1042
1043 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1044 fn e2o_many_source(
1045 extra_stmts: &mut Vec<syn::Stmt>,
1046 p2: &Self::Process,
1047 p2_port: &<Self::Process as Node>::Port,
1048 codec_type: &syn::Type,
1049 shared_handle: String,
1050 ) -> syn::Expr {
1051 p2.exposed_ports.borrow_mut().push(*p2_port);
1052
1053 let socket_ident = syn::Ident::new(
1054 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1055 Span::call_site(),
1056 );
1057
1058 let source_ident = syn::Ident::new(
1059 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1060 Span::call_site(),
1061 );
1062
1063 let sink_ident = syn::Ident::new(
1064 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1065 Span::call_site(),
1066 );
1067
1068 let membership_ident = syn::Ident::new(
1069 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1070 Span::call_site(),
1071 );
1072
1073 let bind_addr = format!("0.0.0.0:{}", p2_port);
1074
1075 extra_stmts.push(syn::parse_quote! {
1076 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1077 });
1078
1079 let root = crate::staging_util::get_this_crate();
1080
1081 extra_stmts.push(syn::parse_quote! {
1082 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1083 });
1084
1085 parse_quote!(#source_ident)
1086 }
1087
1088 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1089 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1090 let sink_ident = syn::Ident::new(
1091 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1092 Span::call_site(),
1093 );
1094 parse_quote!(#sink_ident)
1095 }
1096
1097 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1098 fn e2o_source(
1099 extra_stmts: &mut Vec<syn::Stmt>,
1100 p1: &Self::External,
1101 p1_port: &<Self::External as Node>::Port,
1102 p2: &Self::Process,
1103 p2_port: &<Self::Process as Node>::Port,
1104 _codec_type: &syn::Type,
1105 shared_handle: String,
1106 ) -> syn::Expr {
1107 p1.connection_info.borrow_mut().insert(
1108 *p1_port,
1109 (
1110 p2.docker_container_name.clone(),
1111 *p2_port,
1112 p2.network.clone(),
1113 ),
1114 );
1115
1116 p2.exposed_ports.borrow_mut().push(*p2_port);
1117
1118 let socket_ident = syn::Ident::new(
1119 &format!("__hydro_deploy_{}_socket", &shared_handle),
1120 Span::call_site(),
1121 );
1122
1123 let source_ident = syn::Ident::new(
1124 &format!("__hydro_deploy_{}_source", &shared_handle),
1125 Span::call_site(),
1126 );
1127
1128 let sink_ident = syn::Ident::new(
1129 &format!("__hydro_deploy_{}_sink", &shared_handle),
1130 Span::call_site(),
1131 );
1132
1133 let bind_addr = format!("0.0.0.0:{}", p2_port);
1134
1135 extra_stmts.push(syn::parse_quote! {
1136 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1137 });
1138
1139 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1140
1141 extra_stmts.push(syn::parse_quote! {
1142 let (#sink_ident, #source_ident) = (#create_expr).split();
1143 });
1144
1145 parse_quote!(#source_ident)
1146 }
1147
1148 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1149 fn e2o_connect(
1150 p1: &Self::External,
1151 p1_port: &<Self::External as Node>::Port,
1152 p2: &Self::Process,
1153 p2_port: &<Self::Process as Node>::Port,
1154 many: bool,
1155 server_hint: NetworkHint,
1156 ) -> Box<dyn FnOnce()> {
1157 if server_hint != NetworkHint::Auto {
1158 panic!(
1159 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1160 server_hint
1161 );
1162 }
1163
1164 if many {
1166 p1.connection_info.borrow_mut().insert(
1167 *p1_port,
1168 (
1169 p2.docker_container_name.clone(),
1170 *p2_port,
1171 p2.network.clone(),
1172 ),
1173 );
1174 }
1175
1176 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1177
1178 Box::new(move || {
1179 trace!(name: "e2o_connect thunk", %serialized);
1180 })
1181 }
1182
1183 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1184 fn o2e_sink(
1185 p1: &Self::Process,
1186 p1_port: &<Self::Process as Node>::Port,
1187 p2: &Self::External,
1188 p2_port: &<Self::External as Node>::Port,
1189 shared_handle: String,
1190 ) -> syn::Expr {
1191 let sink_ident = syn::Ident::new(
1192 &format!("__hydro_deploy_{}_sink", &shared_handle),
1193 Span::call_site(),
1194 );
1195 parse_quote!(#sink_ident)
1196 }
1197
1198 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1199 fn cluster_ids(
1200 of_cluster: LocationKey,
1201 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1202 cluster_ids()
1203 }
1204
1205 #[instrument(level = "trace", skip_all)]
1206 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1207 cluster_self_id()
1208 }
1209
1210 #[instrument(level = "trace", skip_all, fields(?location_id))]
1211 fn cluster_membership_stream(
1212 _env: &mut Self::InstantiateEnv,
1213 _at_location: &LocationId,
1214 location_id: &LocationId,
1215 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1216 {
1217 cluster_membership_stream(location_id)
1218 }
1219}
1220
1221const CONTAINER_ALPHABET: [char; 36] = [
1222 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1223 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1224];
1225
1226#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1227fn get_docker_image_name(
1228 name_hint: &str,
1229 location_key: LocationKey,
1230 deployment_instance: &str,
1231) -> String {
1232 let name_hint = name_hint
1233 .split("::")
1234 .last()
1235 .unwrap()
1236 .to_ascii_lowercase()
1237 .replace(".", "-")
1238 .replace("_", "-")
1239 .replace("::", "-");
1240
1241 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1242
1243 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1244}
1245
1246#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1247fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1248 if let Some(instance) = instance {
1249 format!("{image_name}-{instance}")
1250 } else {
1251 image_name.to_owned()
1252 }
1253}
1254#[derive(Clone)]
1256pub struct DockerDeployProcessSpec {
1257 compilation_options: Option<String>,
1258 config: Vec<String>,
1259 network: DockerNetwork,
1260 deployment_instance: String,
1261}
1262
1263impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1264 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1265 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1266 DockerDeployProcess {
1267 key,
1268 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1269
1270 next_port: Rc::new(RefCell::new(1000)),
1271 rust_crate: Rc::new(RefCell::new(None)),
1272
1273 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1274
1275 docker_container_name: Rc::new(RefCell::new(None)),
1276
1277 compilation_options: self.compilation_options,
1278 config: self.config,
1279
1280 network: self.network.clone(),
1281 }
1282 }
1283}
1284
1285#[derive(Clone)]
1287pub struct DockerDeployClusterSpec {
1288 compilation_options: Option<String>,
1289 config: Vec<String>,
1290 count: usize,
1291 deployment_instance: String,
1292}
1293
1294impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1295 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1296 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1297 DockerDeployCluster {
1298 key,
1299 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1300
1301 next_port: Rc::new(RefCell::new(1000)),
1302 rust_crate: Rc::new(RefCell::new(None)),
1303
1304 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1305
1306 compilation_options: self.compilation_options,
1307 config: self.config,
1308
1309 count: self.count,
1310 }
1311 }
1312}
1313
1314pub struct DockerDeployExternalSpec {
1316 name: String,
1317}
1318
1319impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1320 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1321 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1322 DockerDeployExternal {
1323 name: self.name,
1324 next_port: Rc::new(RefCell::new(10000)),
1325 ports: Rc::new(RefCell::new(HashMap::new())),
1326 connection_info: Rc::new(RefCell::new(HashMap::new())),
1327 }
1328 }
1329}