hydro_deploy/
lib.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8use rust_crate::build::BuildOutput;
9use rust_crate::tracing_options::TracingOptions;
10use tokio::sync::{mpsc, oneshot};
11
12pub mod deployment;
13pub use deployment::Deployment;
14
15pub mod progress;
16
17pub mod localhost;
18pub use localhost::LocalhostHost;
19
20pub mod ssh;
21
22pub mod gcp;
23pub use gcp::GcpComputeEngineHost;
24
25pub mod azure;
26pub use azure::AzureHost;
27
28pub mod rust_crate;
29pub use rust_crate::RustCrate;
30
31pub mod custom_service;
32pub use custom_service::CustomService;
33
34pub mod terraform;
35
36pub mod util;
37
38#[derive(Default)]
39pub struct ResourcePool {
40    pub terraform: terraform::TerraformPool,
41}
42
43pub struct ResourceBatch {
44    pub terraform: terraform::TerraformBatch,
45}
46
47impl ResourceBatch {
48    fn new() -> ResourceBatch {
49        ResourceBatch {
50            terraform: terraform::TerraformBatch::default(),
51        }
52    }
53
54    async fn provision(
55        self,
56        pool: &mut ResourcePool,
57        last_result: Option<Arc<ResourceResult>>,
58    ) -> Result<ResourceResult> {
59        Ok(ResourceResult {
60            terraform: self.terraform.provision(&mut pool.terraform).await?,
61            _last_result: last_result,
62        })
63    }
64}
65
66#[derive(Debug)]
67pub struct ResourceResult {
68    pub terraform: terraform::TerraformResult,
69    _last_result: Option<Arc<ResourceResult>>,
70}
71
72#[derive(Clone)]
73pub struct TracingResults {
74    pub folded_data: Vec<u8>,
75}
76
77#[async_trait]
78pub trait LaunchedBinary: Send + Sync {
79    fn stdin(&self) -> mpsc::UnboundedSender<String>;
80
81    /// Provides a oneshot channel to handshake with the binary,
82    /// with the guarantee that as long as deploy is holding on
83    /// to a handle, none of the messages will also be broadcast
84    /// to the user-facing [`LaunchedBinary::stdout`] channel.
85    fn deploy_stdout(&self) -> oneshot::Receiver<String>;
86
87    fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
88    fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
89    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
90    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
91
92    fn tracing_results(&self) -> Option<&TracingResults>;
93
94    fn exit_code(&self) -> Option<i32>;
95
96    /// Wait for the process to stop on its own. Returns the exit code.
97    async fn wait(&mut self) -> Result<i32>;
98    /// If the process is still running, force stop it. Then run post-run tasks.
99    async fn stop(&mut self) -> Result<()>;
100}
101
102#[async_trait]
103pub trait LaunchedHost: Send + Sync {
104    /// Given a pre-selected network type, computes concrete information needed for a service
105    /// to listen to network connections (such as the IP address to bind to).
106    fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig;
107
108    async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
109
110    async fn launch_binary(
111        &self,
112        id: String,
113        binary: &BuildOutput,
114        args: &[String],
115        perf: Option<TracingOptions>,
116    ) -> Result<Box<dyn LaunchedBinary>>;
117
118    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
119}
120
121/// Types of connections that a host can make to another host.
122pub enum ServerStrategy {
123    UnixSocket,
124    InternalTcpPort,
125    ExternalTcpPort(
126        /// The port number to bind to, which must be explicit to open the firewall.
127        u16,
128    ),
129    Demux(HashMap<u32, ServerStrategy>),
130    Merge(Vec<ServerStrategy>),
131    Tagged(Box<ServerStrategy>, u32),
132    Null,
133}
134
135/// Like BindType, but includes metadata for determining whether a connection is possible.
136pub enum ClientStrategy<'a> {
137    UnixSocket(
138        /// Unique identifier for the host this socket will be on.
139        usize,
140    ),
141    InternalTcpPort(
142        /// The host that this port is available on.
143        &'a dyn Host,
144    ),
145    ForwardedTcpPort(
146        /// The host that this port is available on.
147        &'a dyn Host,
148    ),
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
152pub enum HostTargetType {
153    Local,
154    Linux,
155}
156
157pub type HostStrategyGetter = Box<dyn FnOnce(&dyn std::any::Any) -> ServerStrategy>;
158
159pub trait Host: Send + Sync {
160    fn target_type(&self) -> HostTargetType;
161
162    fn request_port(&self, bind_type: &ServerStrategy);
163
164    /// An identifier for this host, which is unique within a deployment.
165    fn id(&self) -> usize;
166
167    /// Configures the host to support copying and running a custom binary.
168    fn request_custom_binary(&self);
169
170    /// Makes requests for physical resources (servers) that this host needs to run.
171    ///
172    /// This should be called before `provision` is called.
173    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
174
175    /// Connects to the acquired resources and prepares the host to run services.
176    ///
177    /// This should be called after `collect_resources` is called.
178    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
179
180    fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
181
182    /// Identifies a network type that this host can use for connections if it is the server.
183    /// The host will be `None` if the connection is from the same host as the target.
184    fn strategy_as_server<'a>(
185        &'a self,
186        connection_from: &dyn Host,
187    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
188
189    /// Determines whether this host can connect to another host using the given strategy.
190    fn can_connect_to(&self, typ: ClientStrategy) -> bool;
191
192    /// Returns a reference to the host as a trait object.
193    fn as_any(&self) -> &dyn std::any::Any;
194}
195
196#[async_trait]
197pub trait Service: Send + Sync {
198    /// Makes requests for physical resources server ports that this service needs to run.
199    /// This should **not** recursively call `collect_resources` on the host, since
200    /// we guarantee that `collect_resources` is only called once per host.
201    ///
202    /// This should also perform any "free", non-blocking computations (compilations),
203    /// because the `deploy` method will be called after these resources are allocated.
204    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
205
206    /// Connects to the acquired resources and prepares the service to be launched.
207    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;
208
209    /// Launches the service, which should start listening for incoming network
210    /// connections. The service should not start computing at this point.
211    async fn ready(&mut self) -> Result<()>;
212
213    /// Starts the service by having it connect to other services and start computations.
214    async fn start(&mut self) -> Result<()>;
215
216    /// Stops the service by having it disconnect from other services and stop computations.
217    async fn stop(&mut self) -> Result<()>;
218}
219
220pub trait ServiceBuilder {
221    type Service: Service + 'static;
222    fn build(self, id: usize) -> Self::Service;
223}
224
225impl<S: Service + 'static, T: FnOnce(usize) -> S> ServiceBuilder for T {
226    type Service = S;
227    fn build(self, id: usize) -> Self::Service {
228        self(id)
229    }
230}