hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{MembershipEvent, NetworkHint};
41
42/// represents a docker network
43#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45    name: String,
46}
47
48impl DockerNetwork {
49    /// creates a new docker network (will actually be created when deployment.start() is called).
50    pub fn new(name: String) -> Self {
51        Self {
52            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53        }
54    }
55}
56
57/// Represents a process running in a docker container
58#[derive(Clone)]
59pub struct DockerDeployProcess {
60    id: usize,
61    name: String,
62    next_port: Rc<RefCell<u16>>,
63    rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65    exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67    docker_container_name: Rc<RefCell<Option<String>>>,
68
69    compilation_options: Option<String>,
70
71    config: Vec<String>,
72
73    network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77    type Port = u16;
78    type Meta = ();
79    type InstantiateEnv = DockerDeploy;
80
81    #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
82    fn next_port(&self) -> Self::Port {
83        let port = {
84            let mut borrow = self.next_port.borrow_mut();
85            let port = *borrow;
86            *borrow += 1;
87            port
88        };
89
90        port
91    }
92
93    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
94    fn update_meta(&self, _meta: &Self::Meta) {}
95
96    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
97    fn instantiate(
98        &self,
99        _env: &mut Self::InstantiateEnv,
100        meta: &mut Self::Meta,
101        graph: DfirGraph,
102        extra_stmts: Vec<syn::Stmt>,
103    ) {
104        let (bin_name, config) = create_graph_trybuild(
105            graph,
106            extra_stmts,
107            &Some(self.name.clone()),
108            true,
109            LinkingMode::Static,
110        );
111
112        let mut ret = RustCrate::new(config.project_dir)
113            .target_dir(config.target_dir)
114            .example(bin_name.clone())
115            .no_default_features();
116
117        ret = ret.display_name("test_display_name");
118
119        ret = ret.features(vec!["hydro___feature_docker_runtime".to_string()]);
120
121        if let Some(features) = config.features {
122            ret = ret.features(features);
123        }
124
125        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
126        ret = ret.config("build.incremental = false");
127
128        *self.rust_crate.borrow_mut() = Some(ret);
129    }
130}
131
132/// Represents a logical cluster, which can be a variable amount of individual containers.
133#[derive(Clone)]
134pub struct DockerDeployCluster {
135    id: usize,
136    name: String,
137    next_port: Rc<RefCell<u16>>,
138    rust_crate: Rc<RefCell<Option<RustCrate>>>,
139
140    docker_container_name: Rc<RefCell<Vec<String>>>,
141
142    compilation_options: Option<String>,
143
144    config: Vec<String>,
145
146    count: usize,
147}
148
149impl Node for DockerDeployCluster {
150    type Port = u16;
151    type Meta = ();
152    type InstantiateEnv = DockerDeploy;
153
154    #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
155    fn next_port(&self) -> Self::Port {
156        let port = {
157            let mut borrow = self.next_port.borrow_mut();
158            let port = *borrow;
159            *borrow += 1;
160            port
161        };
162
163        port
164    }
165
166    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
167    fn update_meta(&self, _meta: &Self::Meta) {}
168
169    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, extra_stmts = extra_stmts.len()))]
170    fn instantiate(
171        &self,
172        _env: &mut Self::InstantiateEnv,
173        _meta: &mut Self::Meta,
174        graph: DfirGraph,
175        extra_stmts: Vec<syn::Stmt>,
176    ) {
177        let (bin_name, config) = create_graph_trybuild(
178            graph,
179            extra_stmts,
180            &Some(self.name.clone()),
181            true,
182            LinkingMode::Static,
183        );
184
185        let mut ret = RustCrate::new(config.project_dir)
186            .target_dir(config.target_dir)
187            .example(bin_name.clone())
188            .no_default_features();
189
190        ret = ret.display_name("test_display_name");
191
192        ret = ret.features(vec!["hydro___feature_docker_runtime".to_string()]);
193
194        if let Some(features) = config.features {
195            ret = ret.features(features);
196        }
197
198        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
199        ret = ret.config("build.incremental = false");
200
201        *self.rust_crate.borrow_mut() = Some(ret);
202    }
203}
204
205/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
206#[derive(Clone, Debug)]
207pub struct DockerDeployExternal {
208    name: String,
209    next_port: Rc<RefCell<u16>>,
210
211    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
212
213    #[expect(clippy::type_complexity, reason = "internal code")]
214    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
215}
216
217impl Node for DockerDeployExternal {
218    type Port = u16;
219    type Meta = ();
220    type InstantiateEnv = DockerDeploy;
221
222    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
223    fn next_port(&self) -> Self::Port {
224        let port = {
225            let mut borrow = self.next_port.borrow_mut();
226            let port = *borrow;
227            *borrow += 1;
228            port
229        };
230
231        port
232    }
233
234    #[instrument(level = "trace", skip_all, fields(name = self.name))]
235    fn update_meta(&self, _meta: &Self::Meta) {}
236
237    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
238    fn instantiate(
239        &self,
240        _env: &mut Self::InstantiateEnv,
241        meta: &mut Self::Meta,
242        graph: DfirGraph,
243        extra_stmts: Vec<syn::Stmt>,
244    ) {
245        trace!(name: "surface", surface = graph.surface_syntax_string());
246    }
247}
248
249type DynSourceSink<Out, In, InErr> = (
250    Pin<Box<dyn Stream<Item = Out>>>,
251    Pin<Box<dyn Sink<In, Error = InErr>>>,
252);
253
254impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
255    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
256    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
257        self.ports.borrow_mut().insert(external_port_id, port);
258    }
259
260    fn as_bytes_bidi(
261        &self,
262        external_port_id: ExternalPortId,
263    ) -> impl Future<
264        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
265    > + 'a {
266        let _span =
267            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered(); // the instrument macro doesn't work here because of lifetime issues?
268        async { todo!() }
269    }
270
271    fn as_bincode_bidi<InT, OutT>(
272        &self,
273        external_port_id: ExternalPortId,
274    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
275    where
276        InT: serde::Serialize + 'static,
277        OutT: serde::de::DeserializeOwned + 'static,
278    {
279        let _span =
280            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered(); // the instrument macro doesn't work here because of lifetime issues?
281        async { todo!() }
282    }
283
284    fn as_bincode_sink<T>(
285        &self,
286        external_port_id: ExternalPortId,
287    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
288    where
289        T: serde::Serialize + 'static,
290    {
291        let guard =
292            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
293
294        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
295        let (docker_container_name, remote_port, _) = self
296            .connection_info
297            .borrow()
298            .get(&local_port)
299            .unwrap()
300            .clone();
301
302        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
303
304        async move {
305            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
306            let remote_ip_address = "localhost";
307
308            Box::pin(
309                LazySink::new(move || {
310                    Box::pin(async move {
311                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
312
313                        let stream =
314                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
315                                .await?;
316
317                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
318
319                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
320                            stream,
321                            LengthDelimitedCodec::new(),
322                        ))
323                    })
324                })
325                .with(move |v| async move {
326                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
327                }),
328            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
329        }
330        .instrument(guard.exit())
331    }
332
333    fn as_bincode_source<T>(
334        &self,
335        external_port_id: ExternalPortId,
336    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
337    where
338        T: serde::de::DeserializeOwned + 'static,
339    {
340        let guard =
341            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
342
343        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
344        let (docker_container_name, remote_port, _) = self
345            .connection_info
346            .borrow()
347            .get(&local_port)
348            .unwrap()
349            .clone();
350
351        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
352
353        async move {
354
355            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
356            let remote_ip_address = "localhost";
357
358            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
359
360            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
361                .await
362                .unwrap();
363
364            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
365
366            Box::pin(
367                FramedRead::new(stream, LengthDelimitedCodec::new())
368                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
369            ) as Pin<Box<dyn Stream<Item = T>>>
370        }
371        .instrument(guard.exit())
372    }
373}
374
375#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
376async fn find_dynamically_allocated_docker_port(
377    docker_container_name: &str,
378    destination_port: u16,
379) -> u16 {
380    let docker = Docker::connect_with_local_defaults().unwrap();
381
382    let container_info = docker
383        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
384        .await
385        .unwrap();
386
387    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
388
389    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
390    let remote_port = container_info
391        .network_settings
392        .as_ref()
393        .unwrap()
394        .ports
395        .as_ref()
396        .unwrap()
397        .get(&format!("{destination_port}/tcp"))
398        .unwrap()
399        .as_ref()
400        .unwrap()
401        .iter()
402        .find(|v| v.host_ip == Some("0.0.0.0".to_string()))
403        .unwrap()
404        .host_port
405        .as_ref()
406        .unwrap()
407        .parse()
408        .unwrap();
409
410    remote_port
411}
412
413/// For deploying to a local docker instance
414pub struct DockerDeploy {
415    docker_processes: Vec<DockerDeployProcessSpec>,
416    docker_clusters: Vec<DockerDeployClusterSpec>,
417    network: DockerNetwork,
418    deployment_instance: String,
419}
420
421#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
422async fn create_and_start_container(
423    docker: &Docker,
424    container_name: &str,
425    image_name: &str,
426    network_name: &str,
427    deployment_instance: &str,
428) -> Result<(), anyhow::Error> {
429    let config = ContainerCreateBody {
430        image: Some(image_name.to_string()),
431        hostname: Some(container_name.to_string()),
432        host_config: Some(HostConfig {
433            binds: Some(vec![
434                "/var/run/docker.sock:/var/run/docker.sock".to_string(),
435            ]),
436            publish_all_ports: Some(true),
437            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
438            ..Default::default()
439        }),
440        env: Some(vec![
441            format!("CONTAINER_NAME={container_name}"),
442            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
443            format!("RUST_LOG=trace"),
444        ]),
445        networking_config: Some(NetworkingConfig {
446            endpoints_config: Some(HashMap::from([(
447                network_name.to_string(),
448                EndpointSettings {
449                    ..Default::default()
450                },
451            )])),
452        }),
453        tty: Some(true),
454        ..Default::default()
455    };
456
457    let options = CreateContainerOptions {
458        name: Some(container_name.to_string()),
459        ..Default::default()
460    };
461
462    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
463    docker.create_container(Some(options), config).await?;
464    docker
465        .start_container(container_name, None::<StartContainerOptions>)
466        .await?;
467
468    Ok(())
469}
470
471#[instrument(level = "trace", skip_all, fields(%image_name))]
472async fn build_and_create_image(
473    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
474    compilation_options: &Option<String>,
475    config: &[String],
476    exposed_ports: &[u16],
477    image_name: &str,
478) -> Result<(), anyhow::Error> {
479    let mut rust_crate = rust_crate
480        .borrow_mut()
481        .take()
482        .unwrap()
483        .rustflags(compilation_options.clone().unwrap_or("".to_string()));
484
485    for cfg in config {
486        rust_crate = rust_crate.config(cfg);
487    }
488
489    let build_output = match build_crate_memoized(
490        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
491    )
492    .await
493    {
494        Ok(build_output) => build_output,
495        Err(BuildError::FailedToBuildCrate {
496            exit_status,
497            diagnostics,
498            text_lines,
499            stderr_lines,
500        }) => {
501            let diagnostics = diagnostics
502                .into_iter()
503                .map(|d| d.rendered.unwrap())
504                .collect::<Vec<_>>()
505                .join("\n");
506            let text_lines = text_lines.join("\n");
507            let stderr_lines = stderr_lines.join("\n");
508
509            anyhow::bail!(
510                r#"
511Failed to build crate {exit_status:?}
512--- diagnostics
513---
514{diagnostics}
515---
516---
517---
518
519--- text_lines
520---
521---
522{text_lines}
523---
524---
525---
526
527--- stderr_lines
528---
529---
530{stderr_lines}
531---
532---
533---"#
534            );
535        }
536        Err(err) => {
537            anyhow::bail!("Failed to build crate {err:?}");
538        }
539    };
540
541    let docker = Docker::connect_with_local_defaults()?;
542
543    let mut tar_data = Vec::new();
544    {
545        let mut tar = Builder::new(&mut tar_data);
546
547        let exposed_ports = exposed_ports
548            .iter()
549            .map(|port| format!("EXPOSE {port}/tcp"))
550            .collect::<Vec<_>>()
551            .join("\n");
552
553        let dockerfile_content = format!(
554            r#"
555                FROM scratch
556                {exposed_ports}
557                COPY app /app
558                CMD ["/app"]
559            "#,
560        );
561
562        trace!(name: "dockerfile", %dockerfile_content);
563
564        let mut header = Header::new_gnu();
565        header.set_path("Dockerfile")?;
566        header.set_size(dockerfile_content.len() as u64);
567        header.set_cksum();
568        tar.append(&header, dockerfile_content.as_bytes())?;
569
570        let mut header = Header::new_gnu();
571        header.set_path("app")?;
572        header.set_size(build_output.bin_data.len() as u64);
573        header.set_mode(0o755);
574        header.set_cksum();
575        tar.append(&header, &build_output.bin_data[..])?;
576
577        tar.finish()?;
578    }
579
580    let build_options = BuildImageOptions {
581        dockerfile: "Dockerfile".to_owned(),
582        t: Some(image_name.to_string()),
583        rm: true,
584        ..Default::default()
585    };
586
587    use bollard::errors::Error;
588
589    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
590    let mut build_stream = docker.build_image(build_options, None, Some(body));
591    while let Some(msg) = build_stream.next().await {
592        match msg {
593            Ok(_) => {}
594            Err(e) => match e {
595                Error::DockerStreamError { error } => {
596                    return Err(anyhow::anyhow!(
597                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
598                    ));
599                }
600                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
601            },
602        }
603    }
604
605    Ok(())
606}
607
608impl DockerDeploy {
609    /// Create a new deployment
610    pub fn new(network: DockerNetwork) -> Self {
611        Self {
612            docker_processes: Vec::new(),
613            docker_clusters: Vec::new(),
614            network,
615            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
616        }
617    }
618
619    /// Add an internal docker service to the deployment.
620    pub fn add_localhost_docker(
621        &mut self,
622        compilation_options: Option<String>,
623        config: Vec<String>,
624    ) -> DockerDeployProcessSpec {
625        let process = DockerDeployProcessSpec {
626            compilation_options,
627            config,
628            network: self.network.clone(),
629            deployment_instance: self.deployment_instance.clone(),
630        };
631
632        self.docker_processes.push(process.clone());
633
634        process
635    }
636
637    /// Add an internal docker cluster to the deployment.
638    pub fn add_localhost_docker_cluster(
639        &mut self,
640        compilation_options: Option<String>,
641        config: Vec<String>,
642        count: usize,
643    ) -> DockerDeployClusterSpec {
644        let cluster = DockerDeployClusterSpec {
645            compilation_options,
646            config,
647            count,
648            deployment_instance: self.deployment_instance.clone(),
649        };
650
651        self.docker_clusters.push(cluster.clone());
652
653        cluster
654    }
655
656    /// Add an external process to the deployment.
657    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
658        DockerDeployExternalSpec { name }
659    }
660
661    /// Get the deployment instance from this deployment.
662    pub fn get_deployment_instance(&self) -> String {
663        self.deployment_instance.clone()
664    }
665
666    /// Create docker images.
667    #[instrument(level = "trace", skip_all)]
668    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
669        for (_, _, process) in nodes.get_all_processes() {
670            let exposed_ports = process.exposed_ports.borrow().clone();
671
672            build_and_create_image(
673                &process.rust_crate,
674                &process.compilation_options,
675                &process.config,
676                &exposed_ports,
677                &process.name,
678            )
679            .await?;
680        }
681
682        for (_, _, cluster) in nodes.get_all_clusters() {
683            build_and_create_image(
684                &cluster.rust_crate,
685                &cluster.compilation_options,
686                &cluster.config,
687                &[], // clusters don't have exposed ports.
688                &cluster.name,
689            )
690            .await?;
691        }
692
693        Ok(())
694    }
695
696    /// Start the deployment, tell docker to create containers from the existing provisioned images.
697    #[instrument(level = "trace", skip_all)]
698    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
699        let docker = Docker::connect_with_local_defaults()?;
700
701        match docker
702            .create_network(NetworkCreateRequest {
703                name: self.network.name.clone(),
704                driver: Some("bridge".to_string()),
705                ..Default::default()
706            })
707            .await
708        {
709            Ok(v) => v.id,
710            Err(e) => {
711                panic!("Failed to create docker network: {e:?}");
712            }
713        };
714
715        for (_, _, process) in nodes.get_all_processes() {
716            let docker_container_name: String = get_docker_container_name(&process.name, None);
717            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
718
719            create_and_start_container(
720                &docker,
721                &docker_container_name,
722                &process.name,
723                &self.network.name,
724                &self.deployment_instance,
725            )
726            .await?;
727        }
728
729        for (_, _, cluster) in nodes.get_all_clusters() {
730            for num in 0..cluster.count {
731                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
732                cluster
733                    .docker_container_name
734                    .borrow_mut()
735                    .push(docker_container_name.clone());
736
737                create_and_start_container(
738                    &docker,
739                    &docker_container_name,
740                    &cluster.name,
741                    &self.network.name,
742                    &self.deployment_instance,
743                )
744                .await?;
745            }
746        }
747
748        Ok(())
749    }
750
751    /// Stop the deployment, destroy all containers
752    #[instrument(level = "trace", skip_all)]
753    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
754        let docker = Docker::connect_with_local_defaults()?;
755
756        for (_, _, process) in nodes.get_all_processes() {
757            let docker_container_name: String = get_docker_container_name(&process.name, None);
758
759            docker
760                .kill_container(&docker_container_name, None::<KillContainerOptions>)
761                .await?;
762        }
763
764        for (_, _, cluster) in nodes.get_all_clusters() {
765            for num in 0..cluster.count {
766                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
767
768                docker
769                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
770                    .await?;
771            }
772        }
773
774        Ok(())
775    }
776
777    /// remove containers, images, and networks.
778    #[instrument(level = "trace", skip_all)]
779    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
780        let docker = Docker::connect_with_local_defaults()?;
781
782        for (_, _, process) in nodes.get_all_processes() {
783            let docker_container_name: String = get_docker_container_name(&process.name, None);
784
785            docker
786                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
787                .await?;
788        }
789
790        for (_, _, cluster) in nodes.get_all_clusters() {
791            for num in 0..cluster.count {
792                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
793
794                docker
795                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
796                    .await?;
797            }
798        }
799
800        docker
801            .remove_network(&self.network.name)
802            .await
803            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
804
805        use bollard::query_parameters::RemoveImageOptions;
806
807        for (_, _, process) in nodes.get_all_processes() {
808            docker
809                .remove_image(&process.name, None::<RemoveImageOptions>, None)
810                .await?;
811        }
812
813        for (_, _, cluster) in nodes.get_all_clusters() {
814            docker
815                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
816                .await?;
817        }
818
819        Ok(())
820    }
821}
822
823impl<'a> Deploy<'a> for DockerDeploy {
824    type Meta = ();
825    type InstantiateEnv = Self;
826
827    type Process = DockerDeployProcess;
828    type Cluster = DockerDeployCluster;
829    type External = DockerDeployExternal;
830
831    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
832    fn o2o_sink_source(
833        p1: &Self::Process,
834        p1_port: &<Self::Process as Node>::Port,
835        p2: &Self::Process,
836        p2_port: &<Self::Process as Node>::Port,
837    ) -> (syn::Expr, syn::Expr) {
838        let bind_addr = format!("0.0.0.0:{}", p2_port);
839        let target = format!("{}:{p2_port}", p2.name);
840
841        deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
842    }
843
844    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
845    fn o2o_connect(
846        p1: &Self::Process,
847        p1_port: &<Self::Process as Node>::Port,
848        p2: &Self::Process,
849        p2_port: &<Self::Process as Node>::Port,
850    ) -> Box<dyn FnOnce()> {
851        let serialized = format!(
852            "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
853            p1.name, p2.name
854        );
855
856        Box::new(move || {
857            trace!(name: "o2o_connect thunk", %serialized);
858        })
859    }
860
861    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
862    fn o2m_sink_source(
863        p1: &Self::Process,
864        p1_port: &<Self::Process as Node>::Port,
865        c2: &Self::Cluster,
866        c2_port: &<Self::Cluster as Node>::Port,
867    ) -> (syn::Expr, syn::Expr) {
868        deploy_containerized_o2m(*c2_port)
869    }
870
871    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
872    fn o2m_connect(
873        p1: &Self::Process,
874        p1_port: &<Self::Process as Node>::Port,
875        c2: &Self::Cluster,
876        c2_port: &<Self::Cluster as Node>::Port,
877    ) -> Box<dyn FnOnce()> {
878        let serialized = format!(
879            "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
880            p1.name, c2.name
881        );
882
883        Box::new(move || {
884            trace!(name: "o2m_connect thunk", %serialized);
885        })
886    }
887
888    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
889    fn m2o_sink_source(
890        c1: &Self::Cluster,
891        c1_port: &<Self::Cluster as Node>::Port,
892        p2: &Self::Process,
893        p2_port: &<Self::Process as Node>::Port,
894    ) -> (syn::Expr, syn::Expr) {
895        deploy_containerized_m2o(*p2_port, &p2.name)
896    }
897
898    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
899    fn m2o_connect(
900        c1: &Self::Cluster,
901        c1_port: &<Self::Cluster as Node>::Port,
902        p2: &Self::Process,
903        p2_port: &<Self::Process as Node>::Port,
904    ) -> Box<dyn FnOnce()> {
905        let serialized = format!(
906            "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
907            c1.name, p2.name
908        );
909
910        Box::new(move || {
911            trace!(name: "m2o_connect thunk", %serialized);
912        })
913    }
914
915    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
916    fn m2m_sink_source(
917        c1: &Self::Cluster,
918        c1_port: &<Self::Cluster as Node>::Port,
919        c2: &Self::Cluster,
920        c2_port: &<Self::Cluster as Node>::Port,
921    ) -> (syn::Expr, syn::Expr) {
922        deploy_containerized_m2m(*c2_port)
923    }
924
925    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
926    fn m2m_connect(
927        c1: &Self::Cluster,
928        c1_port: &<Self::Cluster as Node>::Port,
929        c2: &Self::Cluster,
930        c2_port: &<Self::Cluster as Node>::Port,
931    ) -> Box<dyn FnOnce()> {
932        let serialized = format!(
933            "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
934            c1.name, c2.name
935        );
936
937        Box::new(move || {
938            trace!(name: "m2m_connect thunk", %serialized);
939        })
940    }
941
942    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
943    fn e2o_many_source(
944        extra_stmts: &mut Vec<syn::Stmt>,
945        p2: &Self::Process,
946        p2_port: &<Self::Process as Node>::Port,
947        _codec_type: &syn::Type,
948        shared_handle: String,
949    ) -> syn::Expr {
950        todo!()
951    }
952
953    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
954    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
955        todo!()
956    }
957
958    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
959    fn e2o_source(
960        extra_stmts: &mut Vec<syn::Stmt>,
961        p1: &Self::External,
962        p1_port: &<Self::External as Node>::Port,
963        p2: &Self::Process,
964        p2_port: &<Self::Process as Node>::Port,
965        _codec_type: &syn::Type,
966        shared_handle: String,
967    ) -> syn::Expr {
968        p1.connection_info.borrow_mut().insert(
969            *p1_port,
970            (
971                p2.docker_container_name.clone(),
972                *p2_port,
973                p2.network.clone(),
974            ),
975        );
976
977        p2.exposed_ports.borrow_mut().push(*p2_port);
978
979        let socket_ident = syn::Ident::new(
980            &format!("__hydro_deploy_{}_socket", &shared_handle),
981            Span::call_site(),
982        );
983
984        let source_ident = syn::Ident::new(
985            &format!("__hydro_deploy_{}_source", &shared_handle),
986            Span::call_site(),
987        );
988
989        let sink_ident = syn::Ident::new(
990            &format!("__hydro_deploy_{}_sink", &shared_handle),
991            Span::call_site(),
992        );
993
994        let bind_addr = format!("0.0.0.0:{}", p2_port);
995
996        extra_stmts.push(syn::parse_quote! {
997            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
998        });
999
1000        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident.clone());
1001
1002        extra_stmts.push(syn::parse_quote! {
1003            let (#sink_ident, #source_ident) = (#create_expr).split();
1004        });
1005
1006        parse_quote!(#source_ident)
1007    }
1008
1009    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1010    fn e2o_connect(
1011        p1: &Self::External,
1012        p1_port: &<Self::External as Node>::Port,
1013        p2: &Self::Process,
1014        p2_port: &<Self::Process as Node>::Port,
1015        many: bool,
1016        server_hint: NetworkHint,
1017    ) -> Box<dyn FnOnce()> {
1018        let serialized = format!(
1019            "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1020            p1.name, p2.name
1021        );
1022
1023        Box::new(move || {
1024            trace!(name: "e2o_connect thunk", %serialized);
1025        })
1026    }
1027
1028    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1029    fn o2e_sink(
1030        p1: &Self::Process,
1031        p1_port: &<Self::Process as Node>::Port,
1032        p2: &Self::External,
1033        p2_port: &<Self::External as Node>::Port,
1034        shared_handle: String,
1035    ) -> syn::Expr {
1036        let sink_ident = syn::Ident::new(
1037            &format!("__hydro_deploy_{}_sink", &shared_handle),
1038            Span::call_site(),
1039        );
1040        parse_quote!(#sink_ident)
1041    }
1042
1043    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1044    fn cluster_ids(
1045        of_cluster: usize,
1046    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1047        cluster_ids()
1048    }
1049
1050    #[instrument(level = "trace", skip_all)]
1051    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1052        cluster_self_id()
1053    }
1054
1055    #[instrument(level = "trace", skip_all, fields(?location_id))]
1056    fn cluster_membership_stream(
1057        location_id: &LocationId,
1058    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1059    {
1060        cluster_membership_stream(location_id)
1061    }
1062}
1063
1064const CONTAINER_ALPHABET: [char; 36] = [
1065    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1066    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1067];
1068
1069#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location, %deployment_instance))]
1070fn get_docker_image_name(name_hint: &str, location: usize, deployment_instance: &str) -> String {
1071    let name_hint = name_hint
1072        .split("::")
1073        .last()
1074        .unwrap()
1075        .to_string()
1076        .to_ascii_lowercase()
1077        .replace(".", "-")
1078        .replace("_", "-")
1079        .replace("::", "-");
1080
1081    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1082
1083    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location}")
1084}
1085
1086#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1087fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1088    if let Some(instance) = instance {
1089        format!("{image_name}-{instance}")
1090    } else {
1091        image_name.to_string()
1092    }
1093}
1094/// Represents a Process running in a docker container
1095#[derive(Clone)]
1096pub struct DockerDeployProcessSpec {
1097    compilation_options: Option<String>,
1098    config: Vec<String>,
1099    network: DockerNetwork,
1100    deployment_instance: String,
1101}
1102
1103impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1104    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1105    fn build(self, id: usize, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1106        DockerDeployProcess {
1107            id,
1108            name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1109
1110            next_port: Rc::new(RefCell::new(1000)),
1111            rust_crate: Rc::new(RefCell::new(None)),
1112
1113            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1114
1115            docker_container_name: Rc::new(RefCell::new(None)),
1116
1117            compilation_options: self.compilation_options,
1118            config: self.config,
1119
1120            network: self.network.clone(),
1121        }
1122    }
1123}
1124
1125/// Represents a Cluster running across `count` docker containers.
1126#[derive(Clone)]
1127pub struct DockerDeployClusterSpec {
1128    compilation_options: Option<String>,
1129    config: Vec<String>,
1130    count: usize,
1131    deployment_instance: String,
1132}
1133
1134impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1135    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1136    fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1137        DockerDeployCluster {
1138            id,
1139            name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1140
1141            next_port: Rc::new(RefCell::new(1000)),
1142            rust_crate: Rc::new(RefCell::new(None)),
1143
1144            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1145
1146            compilation_options: self.compilation_options,
1147            config: self.config,
1148
1149            count: self.count,
1150        }
1151    }
1152}
1153
1154/// Represents an external process outside of the management of hydro deploy.
1155pub struct DockerDeployExternalSpec {
1156    name: String,
1157}
1158
1159impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1160    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1161    fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1162        DockerDeployExternal {
1163            name: self.name,
1164            next_port: Rc::new(RefCell::new(10000)),
1165            ports: Rc::new(RefCell::new(HashMap::new())),
1166            connection_info: Rc::new(RefCell::new(HashMap::new())),
1167        }
1168    }
1169}