1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
45use anyhow::Result;
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8use rust_crate::build::BuildOutput;
9use rust_crate::tracing_options::TracingOptions;
10use tokio::sync::{mpsc, oneshot};
1112pub mod deployment;
13pub use deployment::Deployment;
1415pub mod progress;
1617pub mod localhost;
18pub use localhost::LocalhostHost;
1920pub mod ssh;
2122pub mod gcp;
23pub use gcp::GcpComputeEngineHost;
2425pub mod azure;
26pub use azure::AzureHost;
2728pub mod rust_crate;
29pub use rust_crate::RustCrate;
3031pub mod custom_service;
32pub use custom_service::CustomService;
3334pub mod terraform;
3536pub mod util;
3738#[derive(Default)]
39pub struct ResourcePool {
40pub terraform: terraform::TerraformPool,
41}
4243pub struct ResourceBatch {
44pub terraform: terraform::TerraformBatch,
45}
4647impl ResourceBatch {
48fn new() -> ResourceBatch {
49 ResourceBatch {
50 terraform: terraform::TerraformBatch::default(),
51 }
52 }
5354async fn provision(
55self,
56 pool: &mut ResourcePool,
57 last_result: Option<Arc<ResourceResult>>,
58 ) -> Result<ResourceResult> {
59Ok(ResourceResult {
60 terraform: self.terraform.provision(&mut pool.terraform).await?,
61 _last_result: last_result,
62 })
63 }
64}
6566#[derive(Debug)]
67pub struct ResourceResult {
68pub terraform: terraform::TerraformResult,
69 _last_result: Option<Arc<ResourceResult>>,
70}
7172#[derive(Clone)]
73pub struct TracingResults {
74pub folded_data: Vec<u8>,
75}
7677#[async_trait]
78pub trait LaunchedBinary: Send + Sync {
79fn stdin(&self) -> mpsc::UnboundedSender<String>;
8081/// Provides a oneshot channel to handshake with the binary,
82 /// with the guarantee that as long as deploy is holding on
83 /// to a handle, none of the messages will also be broadcast
84 /// to the user-facing [`LaunchedBinary::stdout`] channel.
85fn deploy_stdout(&self) -> oneshot::Receiver<String>;
8687fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
88fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
89fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
90fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
9192fn tracing_results(&self) -> Option<&TracingResults>;
9394fn exit_code(&self) -> Option<i32>;
9596/// Wait for the process to stop on its own. Returns the exit code.
97async fn wait(&mut self) -> Result<i32>;
98/// If the process is still running, force stop it. Then run post-run tasks.
99async fn stop(&mut self) -> Result<()>;
100}
101102#[async_trait]
103pub trait LaunchedHost: Send + Sync {
104/// Given a pre-selected network type, computes concrete information needed for a service
105 /// to listen to network connections (such as the IP address to bind to).
106fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig;
107108async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
109110async fn launch_binary(
111&self,
112 id: String,
113 binary: &BuildOutput,
114 args: &[String],
115 perf: Option<TracingOptions>,
116 ) -> Result<Box<dyn LaunchedBinary>>;
117118async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
119}
120121/// Types of connections that a host can make to another host.
122pub enum ServerStrategy {
123 UnixSocket,
124 InternalTcpPort,
125 ExternalTcpPort(
126/// The port number to bind to, which must be explicit to open the firewall.
127u16,
128 ),
129 Demux(HashMap<u32, ServerStrategy>),
130 Merge(Vec<ServerStrategy>),
131 Tagged(Box<ServerStrategy>, u32),
132 Null,
133}
134135/// Like BindType, but includes metadata for determining whether a connection is possible.
136pub enum ClientStrategy<'a> {
137 UnixSocket(
138/// Unique identifier for the host this socket will be on.
139usize,
140 ),
141 InternalTcpPort(
142/// The host that this port is available on.
143&'a dyn Host,
144 ),
145 ForwardedTcpPort(
146/// The host that this port is available on.
147&'a dyn Host,
148 ),
149}
150151#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
152pub enum HostTargetType {
153 Local,
154 Linux,
155}
156157pub type HostStrategyGetter = Box<dyn FnOnce(&dyn std::any::Any) -> ServerStrategy>;
158159pub trait Host: Send + Sync {
160fn target_type(&self) -> HostTargetType;
161162fn request_port(&self, bind_type: &ServerStrategy);
163164/// An identifier for this host, which is unique within a deployment.
165fn id(&self) -> usize;
166167/// Configures the host to support copying and running a custom binary.
168fn request_custom_binary(&self);
169170/// Makes requests for physical resources (servers) that this host needs to run.
171 ///
172 /// This should be called before `provision` is called.
173fn collect_resources(&self, resource_batch: &mut ResourceBatch);
174175/// Connects to the acquired resources and prepares the host to run services.
176 ///
177 /// This should be called after `collect_resources` is called.
178fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
179180fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
181182/// Identifies a network type that this host can use for connections if it is the server.
183 /// The host will be `None` if the connection is from the same host as the target.
184fn strategy_as_server<'a>(
185&'a self,
186 connection_from: &dyn Host,
187 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
188189/// Determines whether this host can connect to another host using the given strategy.
190fn can_connect_to(&self, typ: ClientStrategy) -> bool;
191192/// Returns a reference to the host as a trait object.
193fn as_any(&self) -> &dyn std::any::Any;
194}
195196#[async_trait]
197pub trait Service: Send + Sync {
198/// Makes requests for physical resources server ports that this service needs to run.
199 /// This should **not** recursively call `collect_resources` on the host, since
200 /// we guarantee that `collect_resources` is only called once per host.
201 ///
202 /// This should also perform any "free", non-blocking computations (compilations),
203 /// because the `deploy` method will be called after these resources are allocated.
204fn collect_resources(&self, resource_batch: &mut ResourceBatch);
205206/// Connects to the acquired resources and prepares the service to be launched.
207async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;
208209/// Launches the service, which should start listening for incoming network
210 /// connections. The service should not start computing at this point.
211async fn ready(&mut self) -> Result<()>;
212213/// Starts the service by having it connect to other services and start computations.
214async fn start(&mut self) -> Result<()>;
215216/// Stops the service by having it disconnect from other services and stop computations.
217async fn stop(&mut self) -> Result<()>;
218}
219220pub trait ServiceBuilder {
221type Service: Service + 'static;
222fn build(self, id: usize) -> Self::Service;
223}
224225impl<S: Service + 'static, T: FnOnce(usize) -> S> ServiceBuilder for T {
226type Service = S;
227fn build(self, id: usize) -> Self::Service {
228self(id)
229 }
230}