Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42/// represents a docker network
43#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45    name: String,
46}
47
48impl DockerNetwork {
49    /// creates a new docker network (will actually be created when deployment.start() is called).
50    pub fn new(name: String) -> Self {
51        Self {
52            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53        }
54    }
55}
56
57/// Represents a process running in a docker container
58#[derive(Clone)]
59pub struct DockerDeployProcess {
60    key: LocationKey,
61    name: String,
62    next_port: Rc<RefCell<u16>>,
63    rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65    exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67    docker_container_name: Rc<RefCell<Option<String>>>,
68
69    compilation_options: Option<String>,
70
71    config: Vec<String>,
72
73    network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77    type Port = u16;
78    type Meta = ();
79    type InstantiateEnv = DockerDeploy;
80
81    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82    fn next_port(&self) -> Self::Port {
83        let port = {
84            let mut borrow = self.next_port.borrow_mut();
85            let port = *borrow;
86            *borrow += 1;
87            port
88        };
89
90        port
91    }
92
93    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94    fn update_meta(&self, _meta: &Self::Meta) {}
95
96    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97    fn instantiate(
98        &self,
99        _env: &mut Self::InstantiateEnv,
100        meta: &mut Self::Meta,
101        graph: DfirGraph,
102        extra_stmts: &[syn::Stmt],
103        sidecars: &[syn::Expr],
104    ) {
105        let (bin_name, config) = create_graph_trybuild(
106            graph,
107            extra_stmts,
108            sidecars,
109            Some(&self.name),
110            crate::compile::trybuild::generate::DeployMode::Containerized,
111            LinkingMode::Static,
112        );
113
114        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115            .target_dir(config.target_dir)
116            .example(bin_name)
117            .no_default_features();
118
119        ret = ret.display_name("test_display_name");
120
121        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123        if let Some(features) = config.features {
124            ret = ret.features(features);
125        }
126
127        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128        ret = ret.config("build.incremental = false");
129
130        *self.rust_crate.borrow_mut() = Some(ret);
131    }
132}
133
134/// Represents a logical cluster, which can be a variable amount of individual containers.
135#[derive(Clone)]
136pub struct DockerDeployCluster {
137    key: LocationKey,
138    name: String,
139    next_port: Rc<RefCell<u16>>,
140    rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142    docker_container_name: Rc<RefCell<Vec<String>>>,
143
144    compilation_options: Option<String>,
145
146    config: Vec<String>,
147
148    count: usize,
149}
150
151impl Node for DockerDeployCluster {
152    type Port = u16;
153    type Meta = ();
154    type InstantiateEnv = DockerDeploy;
155
156    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157    fn next_port(&self) -> Self::Port {
158        let port = {
159            let mut borrow = self.next_port.borrow_mut();
160            let port = *borrow;
161            *borrow += 1;
162            port
163        };
164
165        port
166    }
167
168    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169    fn update_meta(&self, _meta: &Self::Meta) {}
170
171    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172    fn instantiate(
173        &self,
174        _env: &mut Self::InstantiateEnv,
175        _meta: &mut Self::Meta,
176        graph: DfirGraph,
177        extra_stmts: &[syn::Stmt],
178        sidecars: &[syn::Expr],
179    ) {
180        let (bin_name, config) = create_graph_trybuild(
181            graph,
182            extra_stmts,
183            sidecars,
184            Some(&self.name),
185            crate::compile::trybuild::generate::DeployMode::Containerized,
186            LinkingMode::Static,
187        );
188
189        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190            .target_dir(config.target_dir)
191            .example(bin_name)
192            .no_default_features();
193
194        ret = ret.display_name("test_display_name");
195
196        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198        if let Some(features) = config.features {
199            ret = ret.features(features);
200        }
201
202        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203        ret = ret.config("build.incremental = false");
204
205        *self.rust_crate.borrow_mut() = Some(ret);
206    }
207}
208
209/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
210#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212    name: String,
213    next_port: Rc<RefCell<u16>>,
214
215    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217    #[expect(clippy::type_complexity, reason = "internal code")]
218    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222    type Port = u16;
223    type Meta = ();
224    type InstantiateEnv = DockerDeploy;
225
226    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227    fn next_port(&self) -> Self::Port {
228        let port = {
229            let mut borrow = self.next_port.borrow_mut();
230            let port = *borrow;
231            *borrow += 1;
232            port
233        };
234
235        port
236    }
237
238    #[instrument(level = "trace", skip_all, fields(name = self.name))]
239    fn update_meta(&self, _meta: &Self::Meta) {}
240
241    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242    fn instantiate(
243        &self,
244        _env: &mut Self::InstantiateEnv,
245        meta: &mut Self::Meta,
246        graph: DfirGraph,
247        extra_stmts: &[syn::Stmt],
248        sidecars: &[syn::Expr],
249    ) {
250        trace!(name: "surface", surface = graph.surface_syntax_string());
251    }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255    Pin<Box<dyn Stream<Item = Out>>>,
256    Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262        self.ports.borrow_mut().insert(external_port_id, port);
263    }
264
265    fn as_bytes_bidi(
266        &self,
267        external_port_id: ExternalPortId,
268    ) -> impl Future<
269        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270    > + 'a {
271        let guard =
272            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275        let (docker_container_name, remote_port, _) = self
276            .connection_info
277            .borrow()
278            .get(&local_port)
279            .unwrap()
280            .clone();
281
282        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284        async move {
285            let local_port =
286                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287            let remote_ip_address = "localhost";
288
289            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292                .await
293                .unwrap();
294
295            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297            let (rx, tx) = stream.into_split();
298
299            let source = Box::pin(
300                FramedRead::new(rx, LengthDelimitedCodec::new()),
301            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306            (source, sink)
307        }
308        .instrument(guard.exit())
309    }
310
311    fn as_bincode_bidi<InT, OutT>(
312        &self,
313        external_port_id: ExternalPortId,
314    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315    where
316        InT: serde::Serialize + 'static,
317        OutT: serde::de::DeserializeOwned + 'static,
318    {
319        let guard =
320            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323        let (docker_container_name, remote_port, _) = self
324            .connection_info
325            .borrow()
326            .get(&local_port)
327            .unwrap()
328            .clone();
329
330        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332        async move {
333            let local_port =
334                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335            let remote_ip_address = "localhost";
336
337            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340                .await
341                .unwrap();
342
343            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345            let (rx, tx) = stream.into_split();
346
347            let source = Box::pin(
348                FramedRead::new(rx, LengthDelimitedCodec::new())
349                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350            ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352            let sink = Box::pin(
353                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355                }),
356            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358            (source, sink)
359        }
360        .instrument(guard.exit())
361    }
362
363    fn as_bincode_sink<T>(
364        &self,
365        external_port_id: ExternalPortId,
366    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367    where
368        T: serde::Serialize + 'static,
369    {
370        let guard =
371            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374        let (docker_container_name, remote_port, _) = self
375            .connection_info
376            .borrow()
377            .get(&local_port)
378            .unwrap()
379            .clone();
380
381        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383        async move {
384            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385            let remote_ip_address = "localhost";
386
387            Box::pin(
388                LazySink::new(move || {
389                    Box::pin(async move {
390                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392                        let stream =
393                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394                                .await?;
395
396                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
399                            stream,
400                            LengthDelimitedCodec::new(),
401                        ))
402                    })
403                })
404                .with(move |v| async move {
405                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406                }),
407            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408        }
409        .instrument(guard.exit())
410    }
411
412    fn as_bincode_source<T>(
413        &self,
414        external_port_id: ExternalPortId,
415    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416    where
417        T: serde::de::DeserializeOwned + 'static,
418    {
419        let guard =
420            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423        let (docker_container_name, remote_port, _) = self
424            .connection_info
425            .borrow()
426            .get(&local_port)
427            .unwrap()
428            .clone();
429
430        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432        async move {
433
434            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435            let remote_ip_address = "localhost";
436
437            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440                .await
441                .unwrap();
442
443            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445            Box::pin(
446                FramedRead::new(stream, LengthDelimitedCodec::new())
447                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448            ) as Pin<Box<dyn Stream<Item = T>>>
449        }
450        .instrument(guard.exit())
451    }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456    docker_container_name: &str,
457    destination_port: u16,
458) -> u16 {
459    let docker = Docker::connect_with_local_defaults().unwrap();
460
461    let container_info = docker
462        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463        .await
464        .unwrap();
465
466    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
469    let remote_port = container_info
470        .network_settings
471        .as_ref()
472        .unwrap()
473        .ports
474        .as_ref()
475        .unwrap()
476        .get(&format!("{destination_port}/tcp"))
477        .unwrap()
478        .as_ref()
479        .unwrap()
480        .iter()
481        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482        .unwrap()
483        .host_port
484        .as_ref()
485        .unwrap()
486        .parse()
487        .unwrap();
488
489    remote_port
490}
491
492/// For deploying to a local docker instance
493pub struct DockerDeploy {
494    docker_processes: Vec<DockerDeployProcessSpec>,
495    docker_clusters: Vec<DockerDeployClusterSpec>,
496    network: DockerNetwork,
497    deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502    docker: &Docker,
503    container_name: &str,
504    image_name: &str,
505    network_name: &str,
506    deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508    let config = ContainerCreateBody {
509        image: Some(image_name.to_owned()),
510        hostname: Some(container_name.to_owned()),
511        host_config: Some(HostConfig {
512            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513            publish_all_ports: Some(true),
514            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
515            ..Default::default()
516        }),
517        env: Some(vec![
518            format!("CONTAINER_NAME={container_name}"),
519            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520            format!("RUST_LOG=trace"),
521        ]),
522        networking_config: Some(NetworkingConfig {
523            endpoints_config: Some(HashMap::from([(
524                network_name.to_owned(),
525                EndpointSettings {
526                    ..Default::default()
527                },
528            )])),
529        }),
530        tty: Some(true),
531        ..Default::default()
532    };
533
534    let options = CreateContainerOptions {
535        name: Some(container_name.to_owned()),
536        ..Default::default()
537    };
538
539    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540    docker.create_container(Some(options), config).await?;
541    docker
542        .start_container(container_name, None::<StartContainerOptions>)
543        .await?;
544
545    Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551    compilation_options: Option<&str>,
552    config: &[String],
553    exposed_ports: &[u16],
554    image_name: &str,
555) -> Result<(), anyhow::Error> {
556    let mut rust_crate = rust_crate
557        .borrow_mut()
558        .take()
559        .unwrap()
560        .rustflags(compilation_options.unwrap_or_default());
561
562    for cfg in config {
563        rust_crate = rust_crate.config(cfg);
564    }
565
566    let build_output = match build_crate_memoized(
567        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568    )
569    .await
570    {
571        Ok(build_output) => build_output,
572        Err(BuildError::FailedToBuildCrate {
573            exit_status,
574            diagnostics,
575            text_lines,
576            stderr_lines,
577        }) => {
578            let diagnostics = diagnostics
579                .into_iter()
580                .map(|d| d.rendered.unwrap())
581                .collect::<Vec<_>>()
582                .join("\n");
583            let text_lines = text_lines.join("\n");
584            let stderr_lines = stderr_lines.join("\n");
585
586            anyhow::bail!(
587                r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611            );
612        }
613        Err(err) => {
614            anyhow::bail!("Failed to build crate {err:?}");
615        }
616    };
617
618    let docker = Docker::connect_with_local_defaults()?;
619
620    let mut tar_data = Vec::new();
621    {
622        let mut tar = Builder::new(&mut tar_data);
623
624        let exposed_ports = exposed_ports
625            .iter()
626            .map(|port| format!("EXPOSE {port}/tcp"))
627            .collect::<Vec<_>>()
628            .join("\n");
629
630        let dockerfile_content = format!(
631            r#"
632                FROM scratch
633                {exposed_ports}
634                COPY app /app
635                CMD ["/app"]
636            "#,
637        );
638
639        trace!(name: "dockerfile", %dockerfile_content);
640
641        let mut header = Header::new_gnu();
642        header.set_path("Dockerfile")?;
643        header.set_size(dockerfile_content.len() as u64);
644        header.set_cksum();
645        tar.append(&header, dockerfile_content.as_bytes())?;
646
647        let mut header = Header::new_gnu();
648        header.set_path("app")?;
649        header.set_size(build_output.bin_data.len() as u64);
650        header.set_mode(0o755);
651        header.set_cksum();
652        tar.append(&header, &build_output.bin_data[..])?;
653
654        tar.finish()?;
655    }
656
657    let build_options = BuildImageOptions {
658        dockerfile: "Dockerfile".to_owned(),
659        t: Some(image_name.to_owned()),
660        rm: true,
661        ..Default::default()
662    };
663
664    use bollard::errors::Error;
665
666    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667    let mut build_stream = docker.build_image(build_options, None, Some(body));
668    while let Some(msg) = build_stream.next().await {
669        match msg {
670            Ok(_) => {}
671            Err(e) => match e {
672                Error::DockerStreamError { error } => {
673                    return Err(anyhow::anyhow!(
674                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
675                    ));
676                }
677                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678            },
679        }
680    }
681
682    Ok(())
683}
684
685impl DockerDeploy {
686    /// Create a new deployment
687    pub fn new(network: DockerNetwork) -> Self {
688        Self {
689            docker_processes: Vec::new(),
690            docker_clusters: Vec::new(),
691            network,
692            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693        }
694    }
695
696    /// Add an internal docker service to the deployment.
697    pub fn add_localhost_docker(
698        &mut self,
699        compilation_options: Option<String>,
700        config: Vec<String>,
701    ) -> DockerDeployProcessSpec {
702        let process = DockerDeployProcessSpec {
703            compilation_options,
704            config,
705            network: self.network.clone(),
706            deployment_instance: self.deployment_instance.clone(),
707        };
708
709        self.docker_processes.push(process.clone());
710
711        process
712    }
713
714    /// Add an internal docker cluster to the deployment.
715    pub fn add_localhost_docker_cluster(
716        &mut self,
717        compilation_options: Option<String>,
718        config: Vec<String>,
719        count: usize,
720    ) -> DockerDeployClusterSpec {
721        let cluster = DockerDeployClusterSpec {
722            compilation_options,
723            config,
724            count,
725            deployment_instance: self.deployment_instance.clone(),
726        };
727
728        self.docker_clusters.push(cluster.clone());
729
730        cluster
731    }
732
733    /// Add an external process to the deployment.
734    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735        DockerDeployExternalSpec { name }
736    }
737
738    /// Get the deployment instance from this deployment.
739    pub fn get_deployment_instance(&self) -> String {
740        self.deployment_instance.clone()
741    }
742
743    /// Create docker images.
744    #[instrument(level = "trace", skip_all)]
745    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746        for (_, _, process) in nodes.get_all_processes() {
747            let exposed_ports = process.exposed_ports.borrow().clone();
748
749            build_and_create_image(
750                &process.rust_crate,
751                process.compilation_options.as_deref(),
752                &process.config,
753                &exposed_ports,
754                &process.name,
755            )
756            .await?;
757        }
758
759        for (_, _, cluster) in nodes.get_all_clusters() {
760            build_and_create_image(
761                &cluster.rust_crate,
762                cluster.compilation_options.as_deref(),
763                &cluster.config,
764                &[], // clusters don't have exposed ports.
765                &cluster.name,
766            )
767            .await?;
768        }
769
770        Ok(())
771    }
772
773    /// Start the deployment, tell docker to create containers from the existing provisioned images.
774    #[instrument(level = "trace", skip_all)]
775    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776        let docker = Docker::connect_with_local_defaults()?;
777
778        match docker
779            .create_network(NetworkCreateRequest {
780                name: self.network.name.clone(),
781                driver: Some("bridge".to_owned()),
782                ..Default::default()
783            })
784            .await
785        {
786            Ok(v) => v.id,
787            Err(e) => {
788                panic!("Failed to create docker network: {e:?}");
789            }
790        };
791
792        for (_, _, process) in nodes.get_all_processes() {
793            let docker_container_name: String = get_docker_container_name(&process.name, None);
794            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796            create_and_start_container(
797                &docker,
798                &docker_container_name,
799                &process.name,
800                &self.network.name,
801                &self.deployment_instance,
802            )
803            .await?;
804        }
805
806        for (_, _, cluster) in nodes.get_all_clusters() {
807            for num in 0..cluster.count {
808                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809                cluster
810                    .docker_container_name
811                    .borrow_mut()
812                    .push(docker_container_name.clone());
813
814                create_and_start_container(
815                    &docker,
816                    &docker_container_name,
817                    &cluster.name,
818                    &self.network.name,
819                    &self.deployment_instance,
820                )
821                .await?;
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Stop the deployment, destroy all containers
829    #[instrument(level = "trace", skip_all)]
830    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831        let docker = Docker::connect_with_local_defaults()?;
832
833        for (_, _, process) in nodes.get_all_processes() {
834            let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836            docker
837                .kill_container(&docker_container_name, None::<KillContainerOptions>)
838                .await?;
839        }
840
841        for (_, _, cluster) in nodes.get_all_clusters() {
842            for num in 0..cluster.count {
843                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845                docker
846                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
847                    .await?;
848            }
849        }
850
851        Ok(())
852    }
853
854    /// remove containers, images, and networks.
855    #[instrument(level = "trace", skip_all)]
856    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857        let docker = Docker::connect_with_local_defaults()?;
858
859        for (_, _, process) in nodes.get_all_processes() {
860            let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862            docker
863                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864                .await?;
865        }
866
867        for (_, _, cluster) in nodes.get_all_clusters() {
868            for num in 0..cluster.count {
869                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871                docker
872                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873                    .await?;
874            }
875        }
876
877        docker
878            .remove_network(&self.network.name)
879            .await
880            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882        use bollard::query_parameters::RemoveImageOptions;
883
884        for (_, _, process) in nodes.get_all_processes() {
885            docker
886                .remove_image(&process.name, None::<RemoveImageOptions>, None)
887                .await?;
888        }
889
890        for (_, _, cluster) in nodes.get_all_clusters() {
891            docker
892                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893                .await?;
894        }
895
896        Ok(())
897    }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901    type Meta = ();
902    type InstantiateEnv = Self;
903
904    type Process = DockerDeployProcess;
905    type Cluster = DockerDeployCluster;
906    type External = DockerDeployExternal;
907
908    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909    fn o2o_sink_source(
910        _env: &mut Self::InstantiateEnv,
911        p1: &Self::Process,
912        p1_port: &<Self::Process as Node>::Port,
913        p2: &Self::Process,
914        p2_port: &<Self::Process as Node>::Port,
915        _name: Option<&str>,
916        networking_info: &crate::networking::NetworkingInfo,
917    ) -> (syn::Expr, syn::Expr) {
918        match networking_info {
919            crate::networking::NetworkingInfo::Tcp {
920                fault: crate::networking::TcpFault::FailStop,
921            } => {}
922            _ => panic!("Unsupported networking info: {:?}", networking_info),
923        }
924        let bind_addr = format!("0.0.0.0:{}", p2_port);
925        let target = format!("{}:{p2_port}", p2.name);
926
927        deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
928    }
929
930    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
931    fn o2o_connect(
932        p1: &Self::Process,
933        p1_port: &<Self::Process as Node>::Port,
934        p2: &Self::Process,
935        p2_port: &<Self::Process as Node>::Port,
936    ) -> Box<dyn FnOnce()> {
937        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
938
939        Box::new(move || {
940            trace!(name: "o2o_connect thunk", %serialized);
941        })
942    }
943
944    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
945    fn o2m_sink_source(
946        _env: &mut Self::InstantiateEnv,
947        p1: &Self::Process,
948        p1_port: &<Self::Process as Node>::Port,
949        c2: &Self::Cluster,
950        c2_port: &<Self::Cluster as Node>::Port,
951        _name: Option<&str>,
952        networking_info: &crate::networking::NetworkingInfo,
953    ) -> (syn::Expr, syn::Expr) {
954        match networking_info {
955            crate::networking::NetworkingInfo::Tcp {
956                fault: crate::networking::TcpFault::FailStop,
957            } => {}
958            _ => panic!("Unsupported networking info: {:?}", networking_info),
959        }
960        deploy_containerized_o2m(*c2_port)
961    }
962
963    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
964    fn o2m_connect(
965        p1: &Self::Process,
966        p1_port: &<Self::Process as Node>::Port,
967        c2: &Self::Cluster,
968        c2_port: &<Self::Cluster as Node>::Port,
969    ) -> Box<dyn FnOnce()> {
970        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
971
972        Box::new(move || {
973            trace!(name: "o2m_connect thunk", %serialized);
974        })
975    }
976
977    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
978    fn m2o_sink_source(
979        _env: &mut Self::InstantiateEnv,
980        c1: &Self::Cluster,
981        c1_port: &<Self::Cluster as Node>::Port,
982        p2: &Self::Process,
983        p2_port: &<Self::Process as Node>::Port,
984        _name: Option<&str>,
985        networking_info: &crate::networking::NetworkingInfo,
986    ) -> (syn::Expr, syn::Expr) {
987        match networking_info {
988            crate::networking::NetworkingInfo::Tcp {
989                fault: crate::networking::TcpFault::FailStop,
990            } => {}
991            _ => panic!("Unsupported networking info: {:?}", networking_info),
992        }
993        deploy_containerized_m2o(*p2_port, &p2.name)
994    }
995
996    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
997    fn m2o_connect(
998        c1: &Self::Cluster,
999        c1_port: &<Self::Cluster as Node>::Port,
1000        p2: &Self::Process,
1001        p2_port: &<Self::Process as Node>::Port,
1002    ) -> Box<dyn FnOnce()> {
1003        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1004
1005        Box::new(move || {
1006            trace!(name: "m2o_connect thunk", %serialized);
1007        })
1008    }
1009
1010    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1011    fn m2m_sink_source(
1012        _env: &mut Self::InstantiateEnv,
1013        c1: &Self::Cluster,
1014        c1_port: &<Self::Cluster as Node>::Port,
1015        c2: &Self::Cluster,
1016        c2_port: &<Self::Cluster as Node>::Port,
1017        _name: Option<&str>,
1018        networking_info: &crate::networking::NetworkingInfo,
1019    ) -> (syn::Expr, syn::Expr) {
1020        match networking_info {
1021            crate::networking::NetworkingInfo::Tcp {
1022                fault: crate::networking::TcpFault::FailStop,
1023            } => {}
1024            _ => panic!("Unsupported networking info: {:?}", networking_info),
1025        }
1026        deploy_containerized_m2m(*c2_port)
1027    }
1028
1029    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1030    fn m2m_connect(
1031        c1: &Self::Cluster,
1032        c1_port: &<Self::Cluster as Node>::Port,
1033        c2: &Self::Cluster,
1034        c2_port: &<Self::Cluster as Node>::Port,
1035    ) -> Box<dyn FnOnce()> {
1036        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1037
1038        Box::new(move || {
1039            trace!(name: "m2m_connect thunk", %serialized);
1040        })
1041    }
1042
1043    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1044    fn e2o_many_source(
1045        extra_stmts: &mut Vec<syn::Stmt>,
1046        p2: &Self::Process,
1047        p2_port: &<Self::Process as Node>::Port,
1048        codec_type: &syn::Type,
1049        shared_handle: String,
1050    ) -> syn::Expr {
1051        p2.exposed_ports.borrow_mut().push(*p2_port);
1052
1053        let socket_ident = syn::Ident::new(
1054            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1055            Span::call_site(),
1056        );
1057
1058        let source_ident = syn::Ident::new(
1059            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1060            Span::call_site(),
1061        );
1062
1063        let sink_ident = syn::Ident::new(
1064            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1065            Span::call_site(),
1066        );
1067
1068        let membership_ident = syn::Ident::new(
1069            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1070            Span::call_site(),
1071        );
1072
1073        let bind_addr = format!("0.0.0.0:{}", p2_port);
1074
1075        extra_stmts.push(syn::parse_quote! {
1076            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1077        });
1078
1079        let root = crate::staging_util::get_this_crate();
1080
1081        extra_stmts.push(syn::parse_quote! {
1082            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1083        });
1084
1085        parse_quote!(#source_ident)
1086    }
1087
1088    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1089    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1090        let sink_ident = syn::Ident::new(
1091            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1092            Span::call_site(),
1093        );
1094        parse_quote!(#sink_ident)
1095    }
1096
1097    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1098    fn e2o_source(
1099        extra_stmts: &mut Vec<syn::Stmt>,
1100        p1: &Self::External,
1101        p1_port: &<Self::External as Node>::Port,
1102        p2: &Self::Process,
1103        p2_port: &<Self::Process as Node>::Port,
1104        _codec_type: &syn::Type,
1105        shared_handle: String,
1106    ) -> syn::Expr {
1107        p1.connection_info.borrow_mut().insert(
1108            *p1_port,
1109            (
1110                p2.docker_container_name.clone(),
1111                *p2_port,
1112                p2.network.clone(),
1113            ),
1114        );
1115
1116        p2.exposed_ports.borrow_mut().push(*p2_port);
1117
1118        let socket_ident = syn::Ident::new(
1119            &format!("__hydro_deploy_{}_socket", &shared_handle),
1120            Span::call_site(),
1121        );
1122
1123        let source_ident = syn::Ident::new(
1124            &format!("__hydro_deploy_{}_source", &shared_handle),
1125            Span::call_site(),
1126        );
1127
1128        let sink_ident = syn::Ident::new(
1129            &format!("__hydro_deploy_{}_sink", &shared_handle),
1130            Span::call_site(),
1131        );
1132
1133        let bind_addr = format!("0.0.0.0:{}", p2_port);
1134
1135        extra_stmts.push(syn::parse_quote! {
1136            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1137        });
1138
1139        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1140
1141        extra_stmts.push(syn::parse_quote! {
1142            let (#sink_ident, #source_ident) = (#create_expr).split();
1143        });
1144
1145        parse_quote!(#source_ident)
1146    }
1147
1148    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1149    fn e2o_connect(
1150        p1: &Self::External,
1151        p1_port: &<Self::External as Node>::Port,
1152        p2: &Self::Process,
1153        p2_port: &<Self::Process as Node>::Port,
1154        many: bool,
1155        server_hint: NetworkHint,
1156    ) -> Box<dyn FnOnce()> {
1157        if server_hint != NetworkHint::Auto {
1158            panic!(
1159                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1160                server_hint
1161            );
1162        }
1163
1164        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1165        if many {
1166            p1.connection_info.borrow_mut().insert(
1167                *p1_port,
1168                (
1169                    p2.docker_container_name.clone(),
1170                    *p2_port,
1171                    p2.network.clone(),
1172                ),
1173            );
1174        }
1175
1176        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1177
1178        Box::new(move || {
1179            trace!(name: "e2o_connect thunk", %serialized);
1180        })
1181    }
1182
1183    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1184    fn o2e_sink(
1185        p1: &Self::Process,
1186        p1_port: &<Self::Process as Node>::Port,
1187        p2: &Self::External,
1188        p2_port: &<Self::External as Node>::Port,
1189        shared_handle: String,
1190    ) -> syn::Expr {
1191        let sink_ident = syn::Ident::new(
1192            &format!("__hydro_deploy_{}_sink", &shared_handle),
1193            Span::call_site(),
1194        );
1195        parse_quote!(#sink_ident)
1196    }
1197
1198    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1199    fn cluster_ids(
1200        of_cluster: LocationKey,
1201    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1202        cluster_ids()
1203    }
1204
1205    #[instrument(level = "trace", skip_all)]
1206    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1207        cluster_self_id()
1208    }
1209
1210    #[instrument(level = "trace", skip_all, fields(?location_id))]
1211    fn cluster_membership_stream(
1212        _env: &mut Self::InstantiateEnv,
1213        _at_location: &LocationId,
1214        location_id: &LocationId,
1215    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1216    {
1217        cluster_membership_stream(location_id)
1218    }
1219}
1220
1221const CONTAINER_ALPHABET: [char; 36] = [
1222    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1223    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1224];
1225
1226#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1227fn get_docker_image_name(
1228    name_hint: &str,
1229    location_key: LocationKey,
1230    deployment_instance: &str,
1231) -> String {
1232    let name_hint = name_hint
1233        .split("::")
1234        .last()
1235        .unwrap()
1236        .to_ascii_lowercase()
1237        .replace(".", "-")
1238        .replace("_", "-")
1239        .replace("::", "-");
1240
1241    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1242
1243    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1244}
1245
1246#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1247fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1248    if let Some(instance) = instance {
1249        format!("{image_name}-{instance}")
1250    } else {
1251        image_name.to_owned()
1252    }
1253}
1254/// Represents a Process running in a docker container
1255#[derive(Clone)]
1256pub struct DockerDeployProcessSpec {
1257    compilation_options: Option<String>,
1258    config: Vec<String>,
1259    network: DockerNetwork,
1260    deployment_instance: String,
1261}
1262
1263impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1264    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1265    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1266        DockerDeployProcess {
1267            key,
1268            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1269
1270            next_port: Rc::new(RefCell::new(1000)),
1271            rust_crate: Rc::new(RefCell::new(None)),
1272
1273            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1274
1275            docker_container_name: Rc::new(RefCell::new(None)),
1276
1277            compilation_options: self.compilation_options,
1278            config: self.config,
1279
1280            network: self.network.clone(),
1281        }
1282    }
1283}
1284
1285/// Represents a Cluster running across `count` docker containers.
1286#[derive(Clone)]
1287pub struct DockerDeployClusterSpec {
1288    compilation_options: Option<String>,
1289    config: Vec<String>,
1290    count: usize,
1291    deployment_instance: String,
1292}
1293
1294impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1295    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1296    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1297        DockerDeployCluster {
1298            key,
1299            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1300
1301            next_port: Rc::new(RefCell::new(1000)),
1302            rust_crate: Rc::new(RefCell::new(None)),
1303
1304            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1305
1306            compilation_options: self.compilation_options,
1307            config: self.config,
1308
1309            count: self.count,
1310        }
1311    }
1312}
1313
1314/// Represents an external process outside of the management of hydro deploy.
1315pub struct DockerDeployExternalSpec {
1316    name: String,
1317}
1318
1319impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1320    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1321    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1322        DockerDeployExternal {
1323            name: self.name,
1324            next_port: Rc::new(RefCell::new(10000)),
1325            ports: Rc::new(RefCell::new(HashMap::new())),
1326            connection_info: Rc::new(RefCell::new(HashMap::new())),
1327        }
1328    }
1329}