1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, bail};
6use async_trait::async_trait;
7use futures::Future;
8use hydro_deploy_integration::{InitConfig, ServerPort};
9use serde::Serialize;
10use tokio::sync::{RwLock, mpsc};
11
12use super::build::{BuildError, BuildOutput, BuildParams, build_crate_memoized};
13use super::ports::{self, RustCratePortConfig};
14use super::tracing_options::TracingOptions;
15use crate::progress::ProgressTracker;
16use crate::{
17 BaseServerStrategy, Host, LaunchedBinary, LaunchedHost, PortNetworkHint, ResourceBatch,
18 ResourceResult, ServerStrategy, Service, TracingResults,
19};
20
21pub struct RustCrateService {
22 id: usize,
23 pub(super) on: Arc<dyn Host>,
24 build_params: BuildParams,
25 tracing: Option<TracingOptions>,
26 args: Option<Vec<String>>,
27 display_id: Option<String>,
28 external_ports: Vec<u16>,
29
30 meta: Option<String>,
31
32 pub(super) port_to_server: HashMap<String, ports::ServerConfig>,
34
35 pub(super) port_to_bind: HashMap<String, ServerStrategy>,
37
38 launched_host: Option<Arc<dyn LaunchedHost>>,
39
40 pub(super) server_defns: Arc<RwLock<HashMap<String, ServerPort>>>,
44
45 launched_binary: Option<Box<dyn LaunchedBinary>>,
46 started: bool,
47}
48
49impl RustCrateService {
50 pub fn new(
51 id: usize,
52 on: Arc<dyn Host>,
53 build_params: BuildParams,
54 tracing: Option<TracingOptions>,
55 args: Option<Vec<String>>,
56 display_id: Option<String>,
57 external_ports: Vec<u16>,
58 ) -> Self {
59 Self {
60 id,
61 on,
62 build_params,
63 tracing,
64 args,
65 display_id,
66 external_ports,
67 meta: None,
68 port_to_server: HashMap::new(),
69 port_to_bind: HashMap::new(),
70 launched_host: None,
71 server_defns: Arc::new(RwLock::new(HashMap::new())),
72 launched_binary: None,
73 started: false,
74 }
75 }
76
77 pub fn update_meta<T: Serialize>(&mut self, meta: T) {
78 if self.launched_binary.is_some() {
79 panic!("Cannot update meta after binary has been launched")
80 }
81
82 self.meta = Some(serde_json::to_string(&meta).unwrap());
83 }
84
85 pub fn get_port(
86 &self,
87 name: String,
88 self_arc: &Arc<RwLock<RustCrateService>>,
89 ) -> RustCratePortConfig {
90 RustCratePortConfig {
91 service: Arc::downgrade(self_arc),
92 service_host: self.on.clone(),
93 service_server_defns: self.server_defns.clone(),
94 network_hint: PortNetworkHint::Auto,
95 port: name,
96 merge: false,
97 }
98 }
99
100 pub fn get_port_with_hint(
101 &self,
102 name: String,
103 network_hint: PortNetworkHint,
104 self_arc: &Arc<RwLock<RustCrateService>>,
105 ) -> RustCratePortConfig {
106 RustCratePortConfig {
107 service: Arc::downgrade(self_arc),
108 service_host: self.on.clone(),
109 service_server_defns: self.server_defns.clone(),
110 network_hint,
111 port: name,
112 merge: false,
113 }
114 }
115
116 pub fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
117 self.launched_binary.as_ref().unwrap().stdout()
118 }
119
120 pub fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
121 self.launched_binary.as_ref().unwrap().stderr()
122 }
123
124 pub fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
125 self.launched_binary.as_ref().unwrap().stdout_filter(prefix)
126 }
127
128 pub fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
129 self.launched_binary.as_ref().unwrap().stderr_filter(prefix)
130 }
131
132 pub fn tracing_results(&self) -> Option<&TracingResults> {
133 self.launched_binary.as_ref().unwrap().tracing_results()
134 }
135
136 pub fn exit_code(&self) -> Option<i32> {
137 self.launched_binary.as_ref().unwrap().exit_code()
138 }
139
140 fn build(
141 &self,
142 ) -> impl use<> + 'static + Future<Output = Result<&'static BuildOutput, BuildError>> {
143 build_crate_memoized(self.build_params.clone())
145 }
146}
147
148#[async_trait]
149impl Service for RustCrateService {
150 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
151 if self.launched_host.is_some() {
152 return;
153 }
154
155 tokio::task::spawn(self.build());
156
157 let host = &self.on;
158
159 host.request_custom_binary();
160 for (_, bind_type) in self.port_to_bind.iter() {
161 host.request_port(bind_type);
162 }
163
164 for port in self.external_ports.iter() {
165 host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
166 }
167 }
168
169 async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
170 if self.launched_host.is_some() {
171 return Ok(());
172 }
173
174 ProgressTracker::with_group(
175 self.display_id
176 .clone()
177 .unwrap_or_else(|| format!("service/{}", self.id)),
178 None,
179 || async {
180 let built = self.build().await?;
181
182 let host = &self.on;
183 let launched = host.provision(resource_result);
184
185 launched.copy_binary(built).await?;
186
187 self.launched_host = Some(launched);
188 Ok(())
189 },
190 )
191 .await
192 }
193
194 async fn ready(&mut self) -> Result<()> {
195 if self.launched_binary.is_some() {
196 return Ok(());
197 }
198
199 ProgressTracker::with_group(
200 self.display_id
201 .clone()
202 .unwrap_or_else(|| format!("service/{}", self.id)),
203 None,
204 || async {
205 let launched_host = self.launched_host.as_ref().unwrap();
206
207 let built = self.build().await?;
208 let args = self.args.as_ref().cloned().unwrap_or_default();
209
210 let binary = launched_host
211 .launch_binary(
212 self.display_id
213 .clone()
214 .unwrap_or_else(|| format!("service/{}", self.id)),
215 built,
216 &args,
217 self.tracing.clone(),
218 )
219 .await?;
220
221 let mut bind_config = HashMap::new();
222 for (port_name, bind_type) in self.port_to_bind.iter() {
223 bind_config.insert(port_name.clone(), launched_host.server_config(bind_type));
224 }
225
226 let formatted_bind_config =
227 serde_json::to_string::<InitConfig>(&(bind_config, self.meta.clone())).unwrap();
228
229 let stdout_receiver = binary.deploy_stdout();
231
232 binary.stdin().send(format!("{formatted_bind_config}\n"))?;
233
234 let ready_line = ProgressTracker::leaf(
235 "waiting for ready",
236 tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
237 )
238 .await
239 .context("Timed out waiting for ready")?
240 .context("Program unexpectedly quit")?;
241 if let Some(line_rest) = ready_line.strip_prefix("ready: ") {
242 *self.server_defns.try_write().unwrap() =
243 serde_json::from_str(line_rest).unwrap();
244 } else {
245 bail!("expected ready");
246 }
247
248 self.launched_binary = Some(binary);
249
250 Ok(())
251 },
252 )
253 .await
254 }
255
256 async fn start(&mut self) -> Result<()> {
257 if self.started {
258 return Ok(());
259 }
260
261 let mut sink_ports = HashMap::new();
262 for (port_name, outgoing) in self.port_to_server.drain() {
263 sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await);
264 }
265
266 let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
267
268 let stdout_receiver = self.launched_binary.as_ref().unwrap().deploy_stdout();
269
270 self.launched_binary
271 .as_ref()
272 .unwrap()
273 .stdin()
274 .send(format!("start: {formatted_defns}\n"))
275 .unwrap();
276
277 let start_ack_line = ProgressTracker::leaf(
278 self.display_id
279 .clone()
280 .unwrap_or_else(|| format!("service/{}", self.id))
281 + " / waiting for ack start",
282 tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
283 )
284 .await??;
285 if !start_ack_line.starts_with("ack start") {
286 bail!("expected ack start");
287 }
288
289 self.started = true;
290 Ok(())
291 }
292
293 async fn stop(&mut self) -> Result<()> {
294 ProgressTracker::with_group(
295 self.display_id
296 .clone()
297 .unwrap_or_else(|| format!("service/{}", self.id)),
298 None,
299 || async {
300 let launched_binary = self.launched_binary.as_mut().unwrap();
301 launched_binary.stdin().send("stop\n".to_string())?;
302
303 let timeout_result = ProgressTracker::leaf(
304 "waiting for exit",
305 tokio::time::timeout(Duration::from_secs(60), launched_binary.wait()),
306 )
307 .await;
308 match timeout_result {
309 Err(_timeout) => {} Ok(Err(unexpected_error)) => return Err(unexpected_error), Ok(Ok(_exit_status)) => {}
312 }
313 launched_binary.stop().await?;
314
315 Ok(())
316 },
317 )
318 .await
319 }
320}