1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use hydro_deploy_integration::ServerBindConfig;
10use rust_crate::build::BuildOutput;
11use rust_crate::tracing_options::TracingOptions;
12use tokio::sync::{mpsc, oneshot};
13
14pub mod deployment;
15pub use deployment::Deployment;
16
17pub mod progress;
18
19pub mod localhost;
20pub use localhost::LocalhostHost;
21
22pub mod ssh;
23
24pub mod gcp;
25pub use gcp::GcpComputeEngineHost;
26
27pub mod azure;
28pub use azure::AzureHost;
29
30pub mod aws;
31pub use aws::{AwsEc2Host, AwsNetwork};
32
33pub mod rust_crate;
34pub use rust_crate::RustCrate;
35
36pub mod custom_service;
37pub use custom_service::CustomService;
38
39pub mod terraform;
40
41pub mod util;
42
43#[derive(Default)]
44pub struct ResourcePool {
45 pub terraform: terraform::TerraformPool,
46}
47
48pub struct ResourceBatch {
49 pub terraform: terraform::TerraformBatch,
50}
51
52impl ResourceBatch {
53 fn new() -> ResourceBatch {
54 ResourceBatch {
55 terraform: terraform::TerraformBatch::default(),
56 }
57 }
58
59 async fn provision(
60 self,
61 pool: &mut ResourcePool,
62 last_result: Option<Arc<ResourceResult>>,
63 ) -> Result<ResourceResult> {
64 Ok(ResourceResult {
65 terraform: self.terraform.provision(&mut pool.terraform).await?,
66 _last_result: last_result,
67 })
68 }
69}
70
71#[derive(Debug)]
72pub struct ResourceResult {
73 pub terraform: terraform::TerraformResult,
74 _last_result: Option<Arc<ResourceResult>>,
75}
76
77#[derive(Clone)]
78pub struct TracingResults {
79 pub folded_data: Vec<u8>,
80}
81
82#[async_trait]
83pub trait LaunchedBinary: Send + Sync {
84 fn stdin(&self) -> mpsc::UnboundedSender<String>;
85
86 fn deploy_stdout(&self) -> oneshot::Receiver<String>;
91
92 fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
93 fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
94 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
95 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
96
97 fn tracing_results(&self) -> Option<&TracingResults>;
98
99 fn exit_code(&self) -> Option<i32>;
100
101 async fn wait(&mut self) -> Result<i32>;
103 async fn stop(&mut self) -> Result<()>;
105}
106
107#[async_trait]
108pub trait LaunchedHost: Send + Sync {
109 fn base_server_config(&self, strategy: &BaseServerStrategy) -> ServerBindConfig;
112
113 fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig {
114 match strategy {
115 ServerStrategy::Direct(b) => self.base_server_config(b),
116 ServerStrategy::Many(b) => {
117 ServerBindConfig::MultiConnection(Box::new(self.base_server_config(b)))
118 }
119 ServerStrategy::Demux(demux) => {
120 let mut config_map = HashMap::new();
121 for (key, underlying) in demux {
122 config_map.insert(*key, self.server_config(underlying));
123 }
124
125 ServerBindConfig::Demux(config_map)
126 }
127 ServerStrategy::Merge(merge) => {
128 let mut configs = vec![];
129 for underlying in merge {
130 configs.push(self.server_config(underlying));
131 }
132
133 ServerBindConfig::Merge(configs)
134 }
135 ServerStrategy::Tagged(underlying, id) => {
136 ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
137 }
138 ServerStrategy::Null => ServerBindConfig::Null,
139 }
140 }
141
142 async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
143
144 async fn launch_binary(
145 &self,
146 id: String,
147 binary: &BuildOutput,
148 args: &[String],
149 perf: Option<TracingOptions>,
150 ) -> Result<Box<dyn LaunchedBinary>>;
151
152 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
153}
154
155pub enum BaseServerStrategy {
156 UnixSocket,
157 InternalTcpPort(Option<u16>),
158 ExternalTcpPort(
159 u16,
161 ),
162}
163
164pub enum ServerStrategy {
166 Direct(BaseServerStrategy),
167 Many(BaseServerStrategy),
168 Demux(HashMap<u32, ServerStrategy>),
169 Merge(Vec<ServerStrategy>),
170 Tagged(Box<ServerStrategy>, u32),
171 Null,
172}
173
174pub enum ClientStrategy<'a> {
176 UnixSocket(
177 usize,
179 ),
180 InternalTcpPort(
181 &'a dyn Host,
183 ),
184 ForwardedTcpPort(
185 &'a dyn Host,
187 ),
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
191pub enum HostTargetType {
192 Local,
193 Linux,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
197pub enum PortNetworkHint {
198 Auto,
199 TcpPort(Option<u16>),
200}
201
202pub type HostStrategyGetter = Box<dyn FnOnce(&dyn Any) -> BaseServerStrategy>;
203
204pub trait Host: Any + Send + Sync + Debug {
205 fn target_type(&self) -> HostTargetType;
206
207 fn request_port_base(&self, bind_type: &BaseServerStrategy);
208
209 fn request_port(&self, bind_type: &ServerStrategy) {
210 match bind_type {
211 ServerStrategy::Direct(base) => self.request_port_base(base),
212 ServerStrategy::Many(base) => self.request_port_base(base),
213 ServerStrategy::Demux(demux) => {
214 for bind_type in demux.values() {
215 self.request_port(bind_type);
216 }
217 }
218 ServerStrategy::Merge(merge) => {
219 for bind_type in merge {
220 self.request_port(bind_type);
221 }
222 }
223 ServerStrategy::Tagged(underlying, _) => {
224 self.request_port(underlying);
225 }
226 ServerStrategy::Null => {}
227 }
228 }
229
230 fn id(&self) -> usize;
232
233 fn request_custom_binary(&self);
235
236 fn collect_resources(&self, resource_batch: &mut ResourceBatch);
240
241 fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
245
246 fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
247
248 fn strategy_as_server<'a>(
251 &'a self,
252 connection_from: &dyn Host,
253 server_tcp_port_hint: PortNetworkHint,
254 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
255
256 fn can_connect_to(&self, typ: ClientStrategy) -> bool;
258}
259
260#[async_trait]
261pub trait Service: Send + Sync {
262 fn collect_resources(&self, resource_batch: &mut ResourceBatch);
269
270 async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;
272
273 async fn ready(&mut self) -> Result<()>;
276
277 async fn start(&mut self) -> Result<()>;
279
280 async fn stop(&mut self) -> Result<()>;
282}
283
284pub trait ServiceBuilder {
285 type Service: Service + 'static;
286 fn build(self, id: usize) -> Self::Service;
287}
288
289impl<S: Service + 'static, T: FnOnce(usize) -> S> ServiceBuilder for T {
290 type Service = S;
291 fn build(self, id: usize) -> Self::Service {
292 self(id)
293 }
294}