hydro_deploy/
lib.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use hydro_deploy_integration::ServerBindConfig;
10use rust_crate::build::BuildOutput;
11use rust_crate::tracing_options::TracingOptions;
12use tokio::sync::{mpsc, oneshot};
13
14pub mod deployment;
15pub use deployment::Deployment;
16
17pub mod progress;
18
19pub mod localhost;
20pub use localhost::LocalhostHost;
21
22pub mod ssh;
23
24pub mod gcp;
25pub use gcp::GcpComputeEngineHost;
26
27pub mod azure;
28pub use azure::AzureHost;
29
30pub mod aws;
31pub use aws::{AwsEc2Host, AwsNetwork};
32
33pub mod rust_crate;
34pub use rust_crate::RustCrate;
35
36pub mod custom_service;
37pub use custom_service::CustomService;
38
39pub mod terraform;
40
41pub mod util;
42
43#[derive(Default)]
44pub struct ResourcePool {
45    pub terraform: terraform::TerraformPool,
46}
47
48pub struct ResourceBatch {
49    pub terraform: terraform::TerraformBatch,
50}
51
52impl ResourceBatch {
53    fn new() -> ResourceBatch {
54        ResourceBatch {
55            terraform: terraform::TerraformBatch::default(),
56        }
57    }
58
59    async fn provision(
60        self,
61        pool: &mut ResourcePool,
62        last_result: Option<Arc<ResourceResult>>,
63    ) -> Result<ResourceResult> {
64        Ok(ResourceResult {
65            terraform: self.terraform.provision(&mut pool.terraform).await?,
66            _last_result: last_result,
67        })
68    }
69}
70
71#[derive(Debug)]
72pub struct ResourceResult {
73    pub terraform: terraform::TerraformResult,
74    _last_result: Option<Arc<ResourceResult>>,
75}
76
77#[derive(Clone)]
78pub struct TracingResults {
79    pub folded_data: Vec<u8>,
80}
81
82#[async_trait]
83pub trait LaunchedBinary: Send + Sync {
84    fn stdin(&self) -> mpsc::UnboundedSender<String>;
85
86    /// Provides a oneshot channel to handshake with the binary,
87    /// with the guarantee that as long as deploy is holding on
88    /// to a handle, none of the messages will also be broadcast
89    /// to the user-facing [`LaunchedBinary::stdout`] channel.
90    fn deploy_stdout(&self) -> oneshot::Receiver<String>;
91
92    fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
93    fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
94    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
95    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
96
97    fn tracing_results(&self) -> Option<&TracingResults>;
98
99    fn exit_code(&self) -> Option<i32>;
100
101    /// Wait for the process to stop on its own. Returns the exit code.
102    async fn wait(&mut self) -> Result<i32>;
103    /// If the process is still running, force stop it. Then run post-run tasks.
104    async fn stop(&mut self) -> Result<()>;
105}
106
107#[async_trait]
108pub trait LaunchedHost: Send + Sync {
109    /// Given a pre-selected network type, computes concrete information needed for a service
110    /// to listen to network connections (such as the IP address to bind to).
111    fn base_server_config(&self, strategy: &BaseServerStrategy) -> ServerBindConfig;
112
113    fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig {
114        match strategy {
115            ServerStrategy::Direct(b) => self.base_server_config(b),
116            ServerStrategy::Many(b) => {
117                ServerBindConfig::MultiConnection(Box::new(self.base_server_config(b)))
118            }
119            ServerStrategy::Demux(demux) => {
120                let mut config_map = HashMap::new();
121                for (key, underlying) in demux {
122                    config_map.insert(*key, self.server_config(underlying));
123                }
124
125                ServerBindConfig::Demux(config_map)
126            }
127            ServerStrategy::Merge(merge) => {
128                let mut configs = vec![];
129                for underlying in merge {
130                    configs.push(self.server_config(underlying));
131                }
132
133                ServerBindConfig::Merge(configs)
134            }
135            ServerStrategy::Tagged(underlying, id) => {
136                ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
137            }
138            ServerStrategy::Null => ServerBindConfig::Null,
139        }
140    }
141
142    async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
143
144    async fn launch_binary(
145        &self,
146        id: String,
147        binary: &BuildOutput,
148        args: &[String],
149        perf: Option<TracingOptions>,
150    ) -> Result<Box<dyn LaunchedBinary>>;
151
152    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
153}
154
155pub enum BaseServerStrategy {
156    UnixSocket,
157    InternalTcpPort(Option<u16>),
158    ExternalTcpPort(
159        /// The port number to bind to, which must be explicit to open the firewall.
160        u16,
161    ),
162}
163
164/// Types of connection that a service can receive when configured as the server.
165pub enum ServerStrategy {
166    Direct(BaseServerStrategy),
167    Many(BaseServerStrategy),
168    Demux(HashMap<u32, ServerStrategy>),
169    Merge(Vec<ServerStrategy>),
170    Tagged(Box<ServerStrategy>, u32),
171    Null,
172}
173
174/// Like BindType, but includes metadata for determining whether a connection is possible.
175pub enum ClientStrategy<'a> {
176    UnixSocket(
177        /// Unique identifier for the host this socket will be on.
178        usize,
179    ),
180    InternalTcpPort(
181        /// The host that this port is available on.
182        &'a dyn Host,
183    ),
184    ForwardedTcpPort(
185        /// The host that this port is available on.
186        &'a dyn Host,
187    ),
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
191pub enum HostTargetType {
192    Local,
193    Linux,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
197pub enum PortNetworkHint {
198    Auto,
199    TcpPort(Option<u16>),
200}
201
202pub type HostStrategyGetter = Box<dyn FnOnce(&dyn Any) -> BaseServerStrategy>;
203
204pub trait Host: Any + Send + Sync + Debug {
205    fn target_type(&self) -> HostTargetType;
206
207    fn request_port_base(&self, bind_type: &BaseServerStrategy);
208
209    fn request_port(&self, bind_type: &ServerStrategy) {
210        match bind_type {
211            ServerStrategy::Direct(base) => self.request_port_base(base),
212            ServerStrategy::Many(base) => self.request_port_base(base),
213            ServerStrategy::Demux(demux) => {
214                for bind_type in demux.values() {
215                    self.request_port(bind_type);
216                }
217            }
218            ServerStrategy::Merge(merge) => {
219                for bind_type in merge {
220                    self.request_port(bind_type);
221                }
222            }
223            ServerStrategy::Tagged(underlying, _) => {
224                self.request_port(underlying);
225            }
226            ServerStrategy::Null => {}
227        }
228    }
229
230    /// An identifier for this host, which is unique within a deployment.
231    fn id(&self) -> usize;
232
233    /// Configures the host to support copying and running a custom binary.
234    fn request_custom_binary(&self);
235
236    /// Makes requests for physical resources (servers) that this host needs to run.
237    ///
238    /// This should be called before `provision` is called.
239    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
240
241    /// Connects to the acquired resources and prepares the host to run services.
242    ///
243    /// This should be called after `collect_resources` is called.
244    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
245
246    fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
247
248    /// Identifies a network type that this host can use for connections if it is the server.
249    /// The host will be `None` if the connection is from the same host as the target.
250    fn strategy_as_server<'a>(
251        &'a self,
252        connection_from: &dyn Host,
253        server_tcp_port_hint: PortNetworkHint,
254    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
255
256    /// Determines whether this host can connect to another host using the given strategy.
257    fn can_connect_to(&self, typ: ClientStrategy) -> bool;
258}
259
260#[async_trait]
261pub trait Service: Send + Sync {
262    /// Makes requests for physical resources server ports that this service needs to run.
263    /// This should **not** recursively call `collect_resources` on the host, since
264    /// we guarantee that `collect_resources` is only called once per host.
265    ///
266    /// This should also perform any "free", non-blocking computations (compilations),
267    /// because the `deploy` method will be called after these resources are allocated.
268    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
269
270    /// Connects to the acquired resources and prepares the service to be launched.
271    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;
272
273    /// Launches the service, which should start listening for incoming network
274    /// connections. The service should not start computing at this point.
275    async fn ready(&mut self) -> Result<()>;
276
277    /// Starts the service by having it connect to other services and start computations.
278    async fn start(&mut self) -> Result<()>;
279
280    /// Stops the service by having it disconnect from other services and stop computations.
281    async fn stop(&mut self) -> Result<()>;
282}
283
284pub trait ServiceBuilder {
285    type Service: Service + 'static;
286    fn build(self, id: usize) -> Self::Service;
287}
288
289impl<S: Service + 'static, T: FnOnce(usize) -> S> ServiceBuilder for T {
290    type Service = S;
291    fn build(self, id: usize) -> Self::Service {
292        self(id)
293    }
294}