hydro_deploy/
lib.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use hydro_deploy_integration::ServerBindConfig;
10use rust_crate::build::BuildOutput;
11use rust_crate::tracing_options::TracingOptions;
12use tokio::sync::{mpsc, oneshot};
13
14pub mod deployment;
15pub use deployment::Deployment;
16
17pub mod progress;
18
19pub mod localhost;
20pub use localhost::LocalhostHost;
21
22pub mod ssh;
23
24pub mod gcp;
25pub use gcp::GcpComputeEngineHost;
26
27pub mod azure;
28pub use azure::AzureHost;
29
30pub mod rust_crate;
31pub use rust_crate::RustCrate;
32
33pub mod custom_service;
34pub use custom_service::CustomService;
35
36pub mod terraform;
37
38pub mod util;
39
40#[derive(Default)]
41pub struct ResourcePool {
42    pub terraform: terraform::TerraformPool,
43}
44
45pub struct ResourceBatch {
46    pub terraform: terraform::TerraformBatch,
47}
48
49impl ResourceBatch {
50    fn new() -> ResourceBatch {
51        ResourceBatch {
52            terraform: terraform::TerraformBatch::default(),
53        }
54    }
55
56    async fn provision(
57        self,
58        pool: &mut ResourcePool,
59        last_result: Option<Arc<ResourceResult>>,
60    ) -> Result<ResourceResult> {
61        Ok(ResourceResult {
62            terraform: self.terraform.provision(&mut pool.terraform).await?,
63            _last_result: last_result,
64        })
65    }
66}
67
68#[derive(Debug)]
69pub struct ResourceResult {
70    pub terraform: terraform::TerraformResult,
71    _last_result: Option<Arc<ResourceResult>>,
72}
73
74#[derive(Clone)]
75pub struct TracingResults {
76    pub folded_data: Vec<u8>,
77}
78
79#[async_trait]
80pub trait LaunchedBinary: Send + Sync {
81    fn stdin(&self) -> mpsc::UnboundedSender<String>;
82
83    /// Provides a oneshot channel to handshake with the binary,
84    /// with the guarantee that as long as deploy is holding on
85    /// to a handle, none of the messages will also be broadcast
86    /// to the user-facing [`LaunchedBinary::stdout`] channel.
87    fn deploy_stdout(&self) -> oneshot::Receiver<String>;
88
89    fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
90    fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
91    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
92    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
93
94    fn tracing_results(&self) -> Option<&TracingResults>;
95
96    fn exit_code(&self) -> Option<i32>;
97
98    /// Wait for the process to stop on its own. Returns the exit code.
99    async fn wait(&mut self) -> Result<i32>;
100    /// If the process is still running, force stop it. Then run post-run tasks.
101    async fn stop(&mut self) -> Result<()>;
102}
103
104#[async_trait]
105pub trait LaunchedHost: Send + Sync {
106    /// Given a pre-selected network type, computes concrete information needed for a service
107    /// to listen to network connections (such as the IP address to bind to).
108    fn base_server_config(&self, strategy: &BaseServerStrategy) -> ServerBindConfig;
109
110    fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig {
111        match strategy {
112            ServerStrategy::Direct(b) => self.base_server_config(b),
113            ServerStrategy::Many(b) => {
114                ServerBindConfig::MultiConnection(Box::new(self.base_server_config(b)))
115            }
116            ServerStrategy::Demux(demux) => {
117                let mut config_map = HashMap::new();
118                for (key, underlying) in demux {
119                    config_map.insert(*key, self.server_config(underlying));
120                }
121
122                ServerBindConfig::Demux(config_map)
123            }
124            ServerStrategy::Merge(merge) => {
125                let mut configs = vec![];
126                for underlying in merge {
127                    configs.push(self.server_config(underlying));
128                }
129
130                ServerBindConfig::Merge(configs)
131            }
132            ServerStrategy::Tagged(underlying, id) => {
133                ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
134            }
135            ServerStrategy::Null => ServerBindConfig::Null,
136        }
137    }
138
139    async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
140
141    async fn launch_binary(
142        &self,
143        id: String,
144        binary: &BuildOutput,
145        args: &[String],
146        perf: Option<TracingOptions>,
147    ) -> Result<Box<dyn LaunchedBinary>>;
148
149    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
150}
151
152pub enum BaseServerStrategy {
153    UnixSocket,
154    InternalTcpPort(Option<u16>),
155    ExternalTcpPort(
156        /// The port number to bind to, which must be explicit to open the firewall.
157        u16,
158    ),
159}
160
161/// Types of connection that a service can receive when configured as the server.
162pub enum ServerStrategy {
163    Direct(BaseServerStrategy),
164    Many(BaseServerStrategy),
165    Demux(HashMap<u32, ServerStrategy>),
166    Merge(Vec<ServerStrategy>),
167    Tagged(Box<ServerStrategy>, u32),
168    Null,
169}
170
171/// Like BindType, but includes metadata for determining whether a connection is possible.
172pub enum ClientStrategy<'a> {
173    UnixSocket(
174        /// Unique identifier for the host this socket will be on.
175        usize,
176    ),
177    InternalTcpPort(
178        /// The host that this port is available on.
179        &'a dyn Host,
180    ),
181    ForwardedTcpPort(
182        /// The host that this port is available on.
183        &'a dyn Host,
184    ),
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
188pub enum HostTargetType {
189    Local,
190    Linux,
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
194pub enum PortNetworkHint {
195    Auto,
196    TcpPort(Option<u16>),
197}
198
199pub type HostStrategyGetter = Box<dyn FnOnce(&dyn Any) -> BaseServerStrategy>;
200
201pub trait Host: Any + Send + Sync + Debug {
202    fn target_type(&self) -> HostTargetType;
203
204    fn request_port_base(&self, bind_type: &BaseServerStrategy);
205
206    fn request_port(&self, bind_type: &ServerStrategy) {
207        match bind_type {
208            ServerStrategy::Direct(base) => self.request_port_base(base),
209            ServerStrategy::Many(base) => self.request_port_base(base),
210            ServerStrategy::Demux(demux) => {
211                for bind_type in demux.values() {
212                    self.request_port(bind_type);
213                }
214            }
215            ServerStrategy::Merge(merge) => {
216                for bind_type in merge {
217                    self.request_port(bind_type);
218                }
219            }
220            ServerStrategy::Tagged(underlying, _) => {
221                self.request_port(underlying);
222            }
223            ServerStrategy::Null => {}
224        }
225    }
226
227    /// An identifier for this host, which is unique within a deployment.
228    fn id(&self) -> usize;
229
230    /// Configures the host to support copying and running a custom binary.
231    fn request_custom_binary(&self);
232
233    /// Makes requests for physical resources (servers) that this host needs to run.
234    ///
235    /// This should be called before `provision` is called.
236    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
237
238    /// Connects to the acquired resources and prepares the host to run services.
239    ///
240    /// This should be called after `collect_resources` is called.
241    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
242
243    fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
244
245    /// Identifies a network type that this host can use for connections if it is the server.
246    /// The host will be `None` if the connection is from the same host as the target.
247    fn strategy_as_server<'a>(
248        &'a self,
249        connection_from: &dyn Host,
250        server_tcp_port_hint: PortNetworkHint,
251    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
252
253    /// Determines whether this host can connect to another host using the given strategy.
254    fn can_connect_to(&self, typ: ClientStrategy) -> bool;
255}
256
257#[async_trait]
258pub trait Service: Send + Sync {
259    /// Makes requests for physical resources server ports that this service needs to run.
260    /// This should **not** recursively call `collect_resources` on the host, since
261    /// we guarantee that `collect_resources` is only called once per host.
262    ///
263    /// This should also perform any "free", non-blocking computations (compilations),
264    /// because the `deploy` method will be called after these resources are allocated.
265    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
266
267    /// Connects to the acquired resources and prepares the service to be launched.
268    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;
269
270    /// Launches the service, which should start listening for incoming network
271    /// connections. The service should not start computing at this point.
272    async fn ready(&mut self) -> Result<()>;
273
274    /// Starts the service by having it connect to other services and start computations.
275    async fn start(&mut self) -> Result<()>;
276
277    /// Stops the service by having it disconnect from other services and stop computations.
278    async fn stop(&mut self) -> Result<()>;
279}
280
281pub trait ServiceBuilder {
282    type Service: Service + 'static;
283    fn build(self, id: usize) -> Self::Service;
284}
285
286impl<S: Service + 'static, T: FnOnce(usize) -> S> ServiceBuilder for T {
287    type Service = S;
288    fn build(self, id: usize) -> Self::Service {
289        self(id)
290    }
291}