1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{MembershipEvent, NetworkHint};
41
42#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45 name: String,
46}
47
48impl DockerNetwork {
49 pub fn new(name: String) -> Self {
51 Self {
52 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct DockerDeployProcess {
60 id: usize,
61 name: String,
62 next_port: Rc<RefCell<u16>>,
63 rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65 exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67 docker_container_name: Rc<RefCell<Option<String>>>,
68
69 compilation_options: Option<String>,
70
71 config: Vec<String>,
72
73 network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77 type Port = u16;
78 type Meta = ();
79 type InstantiateEnv = DockerDeploy;
80
81 #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
82 fn next_port(&self) -> Self::Port {
83 let port = {
84 let mut borrow = self.next_port.borrow_mut();
85 let port = *borrow;
86 *borrow += 1;
87 port
88 };
89
90 port
91 }
92
93 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
94 fn update_meta(&self, _meta: &Self::Meta) {}
95
96 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
97 fn instantiate(
98 &self,
99 _env: &mut Self::InstantiateEnv,
100 meta: &mut Self::Meta,
101 graph: DfirGraph,
102 extra_stmts: Vec<syn::Stmt>,
103 ) {
104 let (bin_name, config) = create_graph_trybuild(
105 graph,
106 extra_stmts,
107 &Some(self.name.clone()),
108 true,
109 LinkingMode::Static,
110 );
111
112 let mut ret = RustCrate::new(config.project_dir)
113 .target_dir(config.target_dir)
114 .example(bin_name.clone())
115 .no_default_features();
116
117 ret = ret.display_name("test_display_name");
118
119 ret = ret.features(vec!["hydro___feature_docker_runtime".to_string()]);
120
121 if let Some(features) = config.features {
122 ret = ret.features(features);
123 }
124
125 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
126 ret = ret.config("build.incremental = false");
127
128 *self.rust_crate.borrow_mut() = Some(ret);
129 }
130}
131
132#[derive(Clone)]
134pub struct DockerDeployCluster {
135 id: usize,
136 name: String,
137 next_port: Rc<RefCell<u16>>,
138 rust_crate: Rc<RefCell<Option<RustCrate>>>,
139
140 docker_container_name: Rc<RefCell<Vec<String>>>,
141
142 compilation_options: Option<String>,
143
144 config: Vec<String>,
145
146 count: usize,
147}
148
149impl Node for DockerDeployCluster {
150 type Port = u16;
151 type Meta = ();
152 type InstantiateEnv = DockerDeploy;
153
154 #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
155 fn next_port(&self) -> Self::Port {
156 let port = {
157 let mut borrow = self.next_port.borrow_mut();
158 let port = *borrow;
159 *borrow += 1;
160 port
161 };
162
163 port
164 }
165
166 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
167 fn update_meta(&self, _meta: &Self::Meta) {}
168
169 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, extra_stmts = extra_stmts.len()))]
170 fn instantiate(
171 &self,
172 _env: &mut Self::InstantiateEnv,
173 _meta: &mut Self::Meta,
174 graph: DfirGraph,
175 extra_stmts: Vec<syn::Stmt>,
176 ) {
177 let (bin_name, config) = create_graph_trybuild(
178 graph,
179 extra_stmts,
180 &Some(self.name.clone()),
181 true,
182 LinkingMode::Static,
183 );
184
185 let mut ret = RustCrate::new(config.project_dir)
186 .target_dir(config.target_dir)
187 .example(bin_name.clone())
188 .no_default_features();
189
190 ret = ret.display_name("test_display_name");
191
192 ret = ret.features(vec!["hydro___feature_docker_runtime".to_string()]);
193
194 if let Some(features) = config.features {
195 ret = ret.features(features);
196 }
197
198 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
199 ret = ret.config("build.incremental = false");
200
201 *self.rust_crate.borrow_mut() = Some(ret);
202 }
203}
204
205#[derive(Clone, Debug)]
207pub struct DockerDeployExternal {
208 name: String,
209 next_port: Rc<RefCell<u16>>,
210
211 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
212
213 #[expect(clippy::type_complexity, reason = "internal code")]
214 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
215}
216
217impl Node for DockerDeployExternal {
218 type Port = u16;
219 type Meta = ();
220 type InstantiateEnv = DockerDeploy;
221
222 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
223 fn next_port(&self) -> Self::Port {
224 let port = {
225 let mut borrow = self.next_port.borrow_mut();
226 let port = *borrow;
227 *borrow += 1;
228 port
229 };
230
231 port
232 }
233
234 #[instrument(level = "trace", skip_all, fields(name = self.name))]
235 fn update_meta(&self, _meta: &Self::Meta) {}
236
237 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
238 fn instantiate(
239 &self,
240 _env: &mut Self::InstantiateEnv,
241 meta: &mut Self::Meta,
242 graph: DfirGraph,
243 extra_stmts: Vec<syn::Stmt>,
244 ) {
245 trace!(name: "surface", surface = graph.surface_syntax_string());
246 }
247}
248
249type DynSourceSink<Out, In, InErr> = (
250 Pin<Box<dyn Stream<Item = Out>>>,
251 Pin<Box<dyn Sink<In, Error = InErr>>>,
252);
253
254impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
255 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
256 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
257 self.ports.borrow_mut().insert(external_port_id, port);
258 }
259
260 fn as_bytes_bidi(
261 &self,
262 external_port_id: ExternalPortId,
263 ) -> impl Future<
264 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
265 > + 'a {
266 let _span =
267 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered(); async { todo!() }
269 }
270
271 fn as_bincode_bidi<InT, OutT>(
272 &self,
273 external_port_id: ExternalPortId,
274 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
275 where
276 InT: serde::Serialize + 'static,
277 OutT: serde::de::DeserializeOwned + 'static,
278 {
279 let _span =
280 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered(); async { todo!() }
282 }
283
284 fn as_bincode_sink<T>(
285 &self,
286 external_port_id: ExternalPortId,
287 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
288 where
289 T: serde::Serialize + 'static,
290 {
291 let guard =
292 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
293
294 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
295 let (docker_container_name, remote_port, _) = self
296 .connection_info
297 .borrow()
298 .get(&local_port)
299 .unwrap()
300 .clone();
301
302 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
303
304 async move {
305 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
306 let remote_ip_address = "localhost";
307
308 Box::pin(
309 LazySink::new(move || {
310 Box::pin(async move {
311 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
312
313 let stream =
314 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
315 .await?;
316
317 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
318
319 Result::<_, std::io::Error>::Ok(FramedWrite::new(
320 stream,
321 LengthDelimitedCodec::new(),
322 ))
323 })
324 })
325 .with(move |v| async move {
326 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
327 }),
328 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
329 }
330 .instrument(guard.exit())
331 }
332
333 fn as_bincode_source<T>(
334 &self,
335 external_port_id: ExternalPortId,
336 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
337 where
338 T: serde::de::DeserializeOwned + 'static,
339 {
340 let guard =
341 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
342
343 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
344 let (docker_container_name, remote_port, _) = self
345 .connection_info
346 .borrow()
347 .get(&local_port)
348 .unwrap()
349 .clone();
350
351 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
352
353 async move {
354
355 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
356 let remote_ip_address = "localhost";
357
358 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
359
360 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
361 .await
362 .unwrap();
363
364 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
365
366 Box::pin(
367 FramedRead::new(stream, LengthDelimitedCodec::new())
368 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
369 ) as Pin<Box<dyn Stream<Item = T>>>
370 }
371 .instrument(guard.exit())
372 }
373}
374
375#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
376async fn find_dynamically_allocated_docker_port(
377 docker_container_name: &str,
378 destination_port: u16,
379) -> u16 {
380 let docker = Docker::connect_with_local_defaults().unwrap();
381
382 let container_info = docker
383 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
384 .await
385 .unwrap();
386
387 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
388
389 let remote_port = container_info
391 .network_settings
392 .as_ref()
393 .unwrap()
394 .ports
395 .as_ref()
396 .unwrap()
397 .get(&format!("{destination_port}/tcp"))
398 .unwrap()
399 .as_ref()
400 .unwrap()
401 .iter()
402 .find(|v| v.host_ip == Some("0.0.0.0".to_string()))
403 .unwrap()
404 .host_port
405 .as_ref()
406 .unwrap()
407 .parse()
408 .unwrap();
409
410 remote_port
411}
412
413pub struct DockerDeploy {
415 docker_processes: Vec<DockerDeployProcessSpec>,
416 docker_clusters: Vec<DockerDeployClusterSpec>,
417 network: DockerNetwork,
418 deployment_instance: String,
419}
420
421#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
422async fn create_and_start_container(
423 docker: &Docker,
424 container_name: &str,
425 image_name: &str,
426 network_name: &str,
427 deployment_instance: &str,
428) -> Result<(), anyhow::Error> {
429 let config = ContainerCreateBody {
430 image: Some(image_name.to_string()),
431 hostname: Some(container_name.to_string()),
432 host_config: Some(HostConfig {
433 binds: Some(vec![
434 "/var/run/docker.sock:/var/run/docker.sock".to_string(),
435 ]),
436 publish_all_ports: Some(true),
437 port_bindings: Some(HashMap::new()), ..Default::default()
439 }),
440 env: Some(vec![
441 format!("CONTAINER_NAME={container_name}"),
442 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
443 format!("RUST_LOG=trace"),
444 ]),
445 networking_config: Some(NetworkingConfig {
446 endpoints_config: Some(HashMap::from([(
447 network_name.to_string(),
448 EndpointSettings {
449 ..Default::default()
450 },
451 )])),
452 }),
453 tty: Some(true),
454 ..Default::default()
455 };
456
457 let options = CreateContainerOptions {
458 name: Some(container_name.to_string()),
459 ..Default::default()
460 };
461
462 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
463 docker.create_container(Some(options), config).await?;
464 docker
465 .start_container(container_name, None::<StartContainerOptions>)
466 .await?;
467
468 Ok(())
469}
470
471#[instrument(level = "trace", skip_all, fields(%image_name))]
472async fn build_and_create_image(
473 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
474 compilation_options: &Option<String>,
475 config: &[String],
476 exposed_ports: &[u16],
477 image_name: &str,
478) -> Result<(), anyhow::Error> {
479 let mut rust_crate = rust_crate
480 .borrow_mut()
481 .take()
482 .unwrap()
483 .rustflags(compilation_options.clone().unwrap_or("".to_string()));
484
485 for cfg in config {
486 rust_crate = rust_crate.config(cfg);
487 }
488
489 let build_output = match build_crate_memoized(
490 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
491 )
492 .await
493 {
494 Ok(build_output) => build_output,
495 Err(BuildError::FailedToBuildCrate {
496 exit_status,
497 diagnostics,
498 text_lines,
499 stderr_lines,
500 }) => {
501 let diagnostics = diagnostics
502 .into_iter()
503 .map(|d| d.rendered.unwrap())
504 .collect::<Vec<_>>()
505 .join("\n");
506 let text_lines = text_lines.join("\n");
507 let stderr_lines = stderr_lines.join("\n");
508
509 anyhow::bail!(
510 r#"
511Failed to build crate {exit_status:?}
512--- diagnostics
513---
514{diagnostics}
515---
516---
517---
518
519--- text_lines
520---
521---
522{text_lines}
523---
524---
525---
526
527--- stderr_lines
528---
529---
530{stderr_lines}
531---
532---
533---"#
534 );
535 }
536 Err(err) => {
537 anyhow::bail!("Failed to build crate {err:?}");
538 }
539 };
540
541 let docker = Docker::connect_with_local_defaults()?;
542
543 let mut tar_data = Vec::new();
544 {
545 let mut tar = Builder::new(&mut tar_data);
546
547 let exposed_ports = exposed_ports
548 .iter()
549 .map(|port| format!("EXPOSE {port}/tcp"))
550 .collect::<Vec<_>>()
551 .join("\n");
552
553 let dockerfile_content = format!(
554 r#"
555 FROM scratch
556 {exposed_ports}
557 COPY app /app
558 CMD ["/app"]
559 "#,
560 );
561
562 trace!(name: "dockerfile", %dockerfile_content);
563
564 let mut header = Header::new_gnu();
565 header.set_path("Dockerfile")?;
566 header.set_size(dockerfile_content.len() as u64);
567 header.set_cksum();
568 tar.append(&header, dockerfile_content.as_bytes())?;
569
570 let mut header = Header::new_gnu();
571 header.set_path("app")?;
572 header.set_size(build_output.bin_data.len() as u64);
573 header.set_mode(0o755);
574 header.set_cksum();
575 tar.append(&header, &build_output.bin_data[..])?;
576
577 tar.finish()?;
578 }
579
580 let build_options = BuildImageOptions {
581 dockerfile: "Dockerfile".to_owned(),
582 t: Some(image_name.to_string()),
583 rm: true,
584 ..Default::default()
585 };
586
587 use bollard::errors::Error;
588
589 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
590 let mut build_stream = docker.build_image(build_options, None, Some(body));
591 while let Some(msg) = build_stream.next().await {
592 match msg {
593 Ok(_) => {}
594 Err(e) => match e {
595 Error::DockerStreamError { error } => {
596 return Err(anyhow::anyhow!(
597 "Docker build failed: DockerStreamError: {{ error: {error} }}"
598 ));
599 }
600 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
601 },
602 }
603 }
604
605 Ok(())
606}
607
608impl DockerDeploy {
609 pub fn new(network: DockerNetwork) -> Self {
611 Self {
612 docker_processes: Vec::new(),
613 docker_clusters: Vec::new(),
614 network,
615 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
616 }
617 }
618
619 pub fn add_localhost_docker(
621 &mut self,
622 compilation_options: Option<String>,
623 config: Vec<String>,
624 ) -> DockerDeployProcessSpec {
625 let process = DockerDeployProcessSpec {
626 compilation_options,
627 config,
628 network: self.network.clone(),
629 deployment_instance: self.deployment_instance.clone(),
630 };
631
632 self.docker_processes.push(process.clone());
633
634 process
635 }
636
637 pub fn add_localhost_docker_cluster(
639 &mut self,
640 compilation_options: Option<String>,
641 config: Vec<String>,
642 count: usize,
643 ) -> DockerDeployClusterSpec {
644 let cluster = DockerDeployClusterSpec {
645 compilation_options,
646 config,
647 count,
648 deployment_instance: self.deployment_instance.clone(),
649 };
650
651 self.docker_clusters.push(cluster.clone());
652
653 cluster
654 }
655
656 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
658 DockerDeployExternalSpec { name }
659 }
660
661 pub fn get_deployment_instance(&self) -> String {
663 self.deployment_instance.clone()
664 }
665
666 #[instrument(level = "trace", skip_all)]
668 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
669 for (_, _, process) in nodes.get_all_processes() {
670 let exposed_ports = process.exposed_ports.borrow().clone();
671
672 build_and_create_image(
673 &process.rust_crate,
674 &process.compilation_options,
675 &process.config,
676 &exposed_ports,
677 &process.name,
678 )
679 .await?;
680 }
681
682 for (_, _, cluster) in nodes.get_all_clusters() {
683 build_and_create_image(
684 &cluster.rust_crate,
685 &cluster.compilation_options,
686 &cluster.config,
687 &[], &cluster.name,
689 )
690 .await?;
691 }
692
693 Ok(())
694 }
695
696 #[instrument(level = "trace", skip_all)]
698 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
699 let docker = Docker::connect_with_local_defaults()?;
700
701 match docker
702 .create_network(NetworkCreateRequest {
703 name: self.network.name.clone(),
704 driver: Some("bridge".to_string()),
705 ..Default::default()
706 })
707 .await
708 {
709 Ok(v) => v.id,
710 Err(e) => {
711 panic!("Failed to create docker network: {e:?}");
712 }
713 };
714
715 for (_, _, process) in nodes.get_all_processes() {
716 let docker_container_name: String = get_docker_container_name(&process.name, None);
717 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
718
719 create_and_start_container(
720 &docker,
721 &docker_container_name,
722 &process.name,
723 &self.network.name,
724 &self.deployment_instance,
725 )
726 .await?;
727 }
728
729 for (_, _, cluster) in nodes.get_all_clusters() {
730 for num in 0..cluster.count {
731 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
732 cluster
733 .docker_container_name
734 .borrow_mut()
735 .push(docker_container_name.clone());
736
737 create_and_start_container(
738 &docker,
739 &docker_container_name,
740 &cluster.name,
741 &self.network.name,
742 &self.deployment_instance,
743 )
744 .await?;
745 }
746 }
747
748 Ok(())
749 }
750
751 #[instrument(level = "trace", skip_all)]
753 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
754 let docker = Docker::connect_with_local_defaults()?;
755
756 for (_, _, process) in nodes.get_all_processes() {
757 let docker_container_name: String = get_docker_container_name(&process.name, None);
758
759 docker
760 .kill_container(&docker_container_name, None::<KillContainerOptions>)
761 .await?;
762 }
763
764 for (_, _, cluster) in nodes.get_all_clusters() {
765 for num in 0..cluster.count {
766 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
767
768 docker
769 .kill_container(&docker_container_name, None::<KillContainerOptions>)
770 .await?;
771 }
772 }
773
774 Ok(())
775 }
776
777 #[instrument(level = "trace", skip_all)]
779 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
780 let docker = Docker::connect_with_local_defaults()?;
781
782 for (_, _, process) in nodes.get_all_processes() {
783 let docker_container_name: String = get_docker_container_name(&process.name, None);
784
785 docker
786 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
787 .await?;
788 }
789
790 for (_, _, cluster) in nodes.get_all_clusters() {
791 for num in 0..cluster.count {
792 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
793
794 docker
795 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
796 .await?;
797 }
798 }
799
800 docker
801 .remove_network(&self.network.name)
802 .await
803 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
804
805 use bollard::query_parameters::RemoveImageOptions;
806
807 for (_, _, process) in nodes.get_all_processes() {
808 docker
809 .remove_image(&process.name, None::<RemoveImageOptions>, None)
810 .await?;
811 }
812
813 for (_, _, cluster) in nodes.get_all_clusters() {
814 docker
815 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
816 .await?;
817 }
818
819 Ok(())
820 }
821}
822
823impl<'a> Deploy<'a> for DockerDeploy {
824 type Meta = ();
825 type InstantiateEnv = Self;
826
827 type Process = DockerDeployProcess;
828 type Cluster = DockerDeployCluster;
829 type External = DockerDeployExternal;
830
831 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
832 fn o2o_sink_source(
833 p1: &Self::Process,
834 p1_port: &<Self::Process as Node>::Port,
835 p2: &Self::Process,
836 p2_port: &<Self::Process as Node>::Port,
837 ) -> (syn::Expr, syn::Expr) {
838 let bind_addr = format!("0.0.0.0:{}", p2_port);
839 let target = format!("{}:{p2_port}", p2.name);
840
841 deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
842 }
843
844 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
845 fn o2o_connect(
846 p1: &Self::Process,
847 p1_port: &<Self::Process as Node>::Port,
848 p2: &Self::Process,
849 p2_port: &<Self::Process as Node>::Port,
850 ) -> Box<dyn FnOnce()> {
851 let serialized = format!(
852 "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
853 p1.name, p2.name
854 );
855
856 Box::new(move || {
857 trace!(name: "o2o_connect thunk", %serialized);
858 })
859 }
860
861 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
862 fn o2m_sink_source(
863 p1: &Self::Process,
864 p1_port: &<Self::Process as Node>::Port,
865 c2: &Self::Cluster,
866 c2_port: &<Self::Cluster as Node>::Port,
867 ) -> (syn::Expr, syn::Expr) {
868 deploy_containerized_o2m(*c2_port)
869 }
870
871 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
872 fn o2m_connect(
873 p1: &Self::Process,
874 p1_port: &<Self::Process as Node>::Port,
875 c2: &Self::Cluster,
876 c2_port: &<Self::Cluster as Node>::Port,
877 ) -> Box<dyn FnOnce()> {
878 let serialized = format!(
879 "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
880 p1.name, c2.name
881 );
882
883 Box::new(move || {
884 trace!(name: "o2m_connect thunk", %serialized);
885 })
886 }
887
888 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
889 fn m2o_sink_source(
890 c1: &Self::Cluster,
891 c1_port: &<Self::Cluster as Node>::Port,
892 p2: &Self::Process,
893 p2_port: &<Self::Process as Node>::Port,
894 ) -> (syn::Expr, syn::Expr) {
895 deploy_containerized_m2o(*p2_port, &p2.name)
896 }
897
898 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
899 fn m2o_connect(
900 c1: &Self::Cluster,
901 c1_port: &<Self::Cluster as Node>::Port,
902 p2: &Self::Process,
903 p2_port: &<Self::Process as Node>::Port,
904 ) -> Box<dyn FnOnce()> {
905 let serialized = format!(
906 "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
907 c1.name, p2.name
908 );
909
910 Box::new(move || {
911 trace!(name: "m2o_connect thunk", %serialized);
912 })
913 }
914
915 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
916 fn m2m_sink_source(
917 c1: &Self::Cluster,
918 c1_port: &<Self::Cluster as Node>::Port,
919 c2: &Self::Cluster,
920 c2_port: &<Self::Cluster as Node>::Port,
921 ) -> (syn::Expr, syn::Expr) {
922 deploy_containerized_m2m(*c2_port)
923 }
924
925 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
926 fn m2m_connect(
927 c1: &Self::Cluster,
928 c1_port: &<Self::Cluster as Node>::Port,
929 c2: &Self::Cluster,
930 c2_port: &<Self::Cluster as Node>::Port,
931 ) -> Box<dyn FnOnce()> {
932 let serialized = format!(
933 "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
934 c1.name, c2.name
935 );
936
937 Box::new(move || {
938 trace!(name: "m2m_connect thunk", %serialized);
939 })
940 }
941
942 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
943 fn e2o_many_source(
944 extra_stmts: &mut Vec<syn::Stmt>,
945 p2: &Self::Process,
946 p2_port: &<Self::Process as Node>::Port,
947 _codec_type: &syn::Type,
948 shared_handle: String,
949 ) -> syn::Expr {
950 todo!()
951 }
952
953 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
954 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
955 todo!()
956 }
957
958 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
959 fn e2o_source(
960 extra_stmts: &mut Vec<syn::Stmt>,
961 p1: &Self::External,
962 p1_port: &<Self::External as Node>::Port,
963 p2: &Self::Process,
964 p2_port: &<Self::Process as Node>::Port,
965 _codec_type: &syn::Type,
966 shared_handle: String,
967 ) -> syn::Expr {
968 p1.connection_info.borrow_mut().insert(
969 *p1_port,
970 (
971 p2.docker_container_name.clone(),
972 *p2_port,
973 p2.network.clone(),
974 ),
975 );
976
977 p2.exposed_ports.borrow_mut().push(*p2_port);
978
979 let socket_ident = syn::Ident::new(
980 &format!("__hydro_deploy_{}_socket", &shared_handle),
981 Span::call_site(),
982 );
983
984 let source_ident = syn::Ident::new(
985 &format!("__hydro_deploy_{}_source", &shared_handle),
986 Span::call_site(),
987 );
988
989 let sink_ident = syn::Ident::new(
990 &format!("__hydro_deploy_{}_sink", &shared_handle),
991 Span::call_site(),
992 );
993
994 let bind_addr = format!("0.0.0.0:{}", p2_port);
995
996 extra_stmts.push(syn::parse_quote! {
997 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
998 });
999
1000 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident.clone());
1001
1002 extra_stmts.push(syn::parse_quote! {
1003 let (#sink_ident, #source_ident) = (#create_expr).split();
1004 });
1005
1006 parse_quote!(#source_ident)
1007 }
1008
1009 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1010 fn e2o_connect(
1011 p1: &Self::External,
1012 p1_port: &<Self::External as Node>::Port,
1013 p2: &Self::Process,
1014 p2_port: &<Self::Process as Node>::Port,
1015 many: bool,
1016 server_hint: NetworkHint,
1017 ) -> Box<dyn FnOnce()> {
1018 let serialized = format!(
1019 "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1020 p1.name, p2.name
1021 );
1022
1023 Box::new(move || {
1024 trace!(name: "e2o_connect thunk", %serialized);
1025 })
1026 }
1027
1028 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1029 fn o2e_sink(
1030 p1: &Self::Process,
1031 p1_port: &<Self::Process as Node>::Port,
1032 p2: &Self::External,
1033 p2_port: &<Self::External as Node>::Port,
1034 shared_handle: String,
1035 ) -> syn::Expr {
1036 let sink_ident = syn::Ident::new(
1037 &format!("__hydro_deploy_{}_sink", &shared_handle),
1038 Span::call_site(),
1039 );
1040 parse_quote!(#sink_ident)
1041 }
1042
1043 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1044 fn cluster_ids(
1045 of_cluster: usize,
1046 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1047 cluster_ids()
1048 }
1049
1050 #[instrument(level = "trace", skip_all)]
1051 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1052 cluster_self_id()
1053 }
1054
1055 #[instrument(level = "trace", skip_all, fields(?location_id))]
1056 fn cluster_membership_stream(
1057 location_id: &LocationId,
1058 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1059 {
1060 cluster_membership_stream(location_id)
1061 }
1062}
1063
1064const CONTAINER_ALPHABET: [char; 36] = [
1065 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1066 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1067];
1068
1069#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location, %deployment_instance))]
1070fn get_docker_image_name(name_hint: &str, location: usize, deployment_instance: &str) -> String {
1071 let name_hint = name_hint
1072 .split("::")
1073 .last()
1074 .unwrap()
1075 .to_string()
1076 .to_ascii_lowercase()
1077 .replace(".", "-")
1078 .replace("_", "-")
1079 .replace("::", "-");
1080
1081 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1082
1083 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location}")
1084}
1085
1086#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1087fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1088 if let Some(instance) = instance {
1089 format!("{image_name}-{instance}")
1090 } else {
1091 image_name.to_string()
1092 }
1093}
1094#[derive(Clone)]
1096pub struct DockerDeployProcessSpec {
1097 compilation_options: Option<String>,
1098 config: Vec<String>,
1099 network: DockerNetwork,
1100 deployment_instance: String,
1101}
1102
1103impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1104 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1105 fn build(self, id: usize, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1106 DockerDeployProcess {
1107 id,
1108 name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1109
1110 next_port: Rc::new(RefCell::new(1000)),
1111 rust_crate: Rc::new(RefCell::new(None)),
1112
1113 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1114
1115 docker_container_name: Rc::new(RefCell::new(None)),
1116
1117 compilation_options: self.compilation_options,
1118 config: self.config,
1119
1120 network: self.network.clone(),
1121 }
1122 }
1123}
1124
1125#[derive(Clone)]
1127pub struct DockerDeployClusterSpec {
1128 compilation_options: Option<String>,
1129 config: Vec<String>,
1130 count: usize,
1131 deployment_instance: String,
1132}
1133
1134impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1135 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1136 fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1137 DockerDeployCluster {
1138 id,
1139 name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1140
1141 next_port: Rc::new(RefCell::new(1000)),
1142 rust_crate: Rc::new(RefCell::new(None)),
1143
1144 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1145
1146 compilation_options: self.compilation_options,
1147 config: self.config,
1148
1149 count: self.count,
1150 }
1151 }
1152}
1153
1154pub struct DockerDeployExternalSpec {
1156 name: String,
1157}
1158
1159impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1160 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1161 fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1162 DockerDeployExternal {
1163 name: self.name,
1164 next_port: Rc::new(RefCell::new(10000)),
1165 ports: Rc::new(RefCell::new(HashMap::new())),
1166 connection_info: Rc::new(RefCell::new(HashMap::new())),
1167 }
1168 }
1169}