1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, bail};
7use async_trait::async_trait;
8use futures::Future;
9use hydro_deploy_integration::{InitConfig, ServerPort};
10use serde::Serialize;
11use tokio::sync::{RwLock, mpsc};
12
13use super::build::{BuildError, BuildOutput, BuildParams, build_crate_memoized};
14use super::ports::{self, RustCratePortConfig};
15use super::tracing_options::TracingOptions;
16use crate::progress::ProgressTracker;
17use crate::{
18 BaseServerStrategy, Host, LaunchedBinary, LaunchedHost, PortNetworkHint, ResourceBatch,
19 ResourceResult, ServerStrategy, Service, TracingResults,
20};
21
22pub struct RustCrateService {
23 id: usize,
24 pub(super) on: Arc<dyn Host>,
25 build_params: BuildParams,
26 tracing: Option<TracingOptions>,
27 args: Option<Vec<String>>,
28 display_id: Option<String>,
29 external_ports: Vec<u16>,
30
31 meta: Option<String>,
32
33 pub(super) port_to_server: HashMap<String, ports::ServerConfig>,
35
36 pub(super) port_to_bind: HashMap<String, ServerStrategy>,
38
39 launched_host: Option<Arc<dyn LaunchedHost>>,
40
41 pub(super) server_defns: Arc<RwLock<HashMap<String, ServerPort>>>,
45
46 launched_binary: Option<Box<dyn LaunchedBinary>>,
47 started: bool,
48}
49
50impl RustCrateService {
51 #[expect(clippy::too_many_arguments, reason = "internal code")]
52 pub fn new(
53 id: usize,
54 src: PathBuf,
55 on: Arc<dyn Host>,
56 bin: Option<String>,
57 example: Option<String>,
58 profile: Option<String>,
59 rustflags: Option<String>,
60 target_dir: Option<PathBuf>,
61 build_env: Vec<(String, String)>,
62 no_default_features: bool,
63 tracing: Option<TracingOptions>,
64 features: Option<Vec<String>>,
65 config: Option<String>,
66 args: Option<Vec<String>>,
67 display_id: Option<String>,
68 external_ports: Vec<u16>,
69 ) -> Self {
70 let target_type = on.target_type();
71
72 let build_params = BuildParams::new(
73 src,
74 bin,
75 example,
76 profile,
77 rustflags,
78 target_dir,
79 build_env,
80 no_default_features,
81 target_type,
82 features,
83 config,
84 );
85
86 Self {
87 id,
88 on,
89 build_params,
90 tracing,
91 args,
92 display_id,
93 external_ports,
94 meta: None,
95 port_to_server: HashMap::new(),
96 port_to_bind: HashMap::new(),
97 launched_host: None,
98 server_defns: Arc::new(RwLock::new(HashMap::new())),
99 launched_binary: None,
100 started: false,
101 }
102 }
103
104 pub fn update_meta<T: Serialize>(&mut self, meta: T) {
105 if self.launched_binary.is_some() {
106 panic!("Cannot update meta after binary has been launched")
107 }
108
109 self.meta = Some(serde_json::to_string(&meta).unwrap());
110 }
111
112 pub fn get_port(
113 &self,
114 name: String,
115 self_arc: &Arc<RwLock<RustCrateService>>,
116 ) -> RustCratePortConfig {
117 RustCratePortConfig {
118 service: Arc::downgrade(self_arc),
119 service_host: self.on.clone(),
120 service_server_defns: self.server_defns.clone(),
121 network_hint: PortNetworkHint::Auto,
122 port: name,
123 merge: false,
124 }
125 }
126
127 pub fn get_port_with_hint(
128 &self,
129 name: String,
130 network_hint: PortNetworkHint,
131 self_arc: &Arc<RwLock<RustCrateService>>,
132 ) -> RustCratePortConfig {
133 RustCratePortConfig {
134 service: Arc::downgrade(self_arc),
135 service_host: self.on.clone(),
136 service_server_defns: self.server_defns.clone(),
137 network_hint,
138 port: name,
139 merge: false,
140 }
141 }
142
143 pub fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
144 self.launched_binary.as_ref().unwrap().stdout()
145 }
146
147 pub fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
148 self.launched_binary.as_ref().unwrap().stderr()
149 }
150
151 pub fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
152 self.launched_binary.as_ref().unwrap().stdout_filter(prefix)
153 }
154
155 pub fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
156 self.launched_binary.as_ref().unwrap().stderr_filter(prefix)
157 }
158
159 pub fn tracing_results(&self) -> Option<&TracingResults> {
160 self.launched_binary.as_ref().unwrap().tracing_results()
161 }
162
163 pub fn exit_code(&self) -> Option<i32> {
164 self.launched_binary.as_ref().unwrap().exit_code()
165 }
166
167 fn build(
168 &self,
169 ) -> impl use<> + 'static + Future<Output = Result<&'static BuildOutput, BuildError>> {
170 build_crate_memoized(self.build_params.clone())
172 }
173}
174
175#[async_trait]
176impl Service for RustCrateService {
177 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
178 if self.launched_host.is_some() {
179 return;
180 }
181
182 tokio::task::spawn(self.build());
183
184 let host = &self.on;
185
186 host.request_custom_binary();
187 for (_, bind_type) in self.port_to_bind.iter() {
188 host.request_port(bind_type);
189 }
190
191 for port in self.external_ports.iter() {
192 host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
193 }
194 }
195
196 async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
197 if self.launched_host.is_some() {
198 return Ok(());
199 }
200
201 ProgressTracker::with_group(
202 self.display_id
203 .clone()
204 .unwrap_or_else(|| format!("service/{}", self.id)),
205 None,
206 || async {
207 let built = ProgressTracker::leaf("build", self.build()).await?;
208
209 let host = &self.on;
210 let launched = host.provision(resource_result);
211
212 launched.copy_binary(built).await?;
213
214 self.launched_host = Some(launched);
215 Ok(())
216 },
217 )
218 .await
219 }
220
221 async fn ready(&mut self) -> Result<()> {
222 if self.launched_binary.is_some() {
223 return Ok(());
224 }
225
226 ProgressTracker::with_group(
227 self.display_id
228 .clone()
229 .unwrap_or_else(|| format!("service/{}", self.id)),
230 None,
231 || async {
232 let launched_host = self.launched_host.as_ref().unwrap();
233
234 let built = self.build().await?;
235 let args = self.args.as_ref().cloned().unwrap_or_default();
236
237 let binary = launched_host
238 .launch_binary(
239 self.display_id
240 .clone()
241 .unwrap_or_else(|| format!("service/{}", self.id)),
242 built,
243 &args,
244 self.tracing.clone(),
245 )
246 .await?;
247
248 let mut bind_config = HashMap::new();
249 for (port_name, bind_type) in self.port_to_bind.iter() {
250 bind_config.insert(port_name.clone(), launched_host.server_config(bind_type));
251 }
252
253 let formatted_bind_config =
254 serde_json::to_string::<InitConfig>(&(bind_config, self.meta.clone())).unwrap();
255
256 let stdout_receiver = binary.deploy_stdout();
258
259 binary.stdin().send(format!("{formatted_bind_config}\n"))?;
260
261 let ready_line = ProgressTracker::leaf(
262 "waiting for ready",
263 tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
264 )
265 .await
266 .context("Timed out waiting for ready")?
267 .context("Program unexpectedly quit")?;
268 if let Some(line_rest) = ready_line.strip_prefix("ready: ") {
269 *self.server_defns.try_write().unwrap() =
270 serde_json::from_str(line_rest).unwrap();
271 } else {
272 bail!("expected ready");
273 }
274
275 self.launched_binary = Some(binary);
276
277 Ok(())
278 },
279 )
280 .await
281 }
282
283 async fn start(&mut self) -> Result<()> {
284 if self.started {
285 return Ok(());
286 }
287
288 let mut sink_ports = HashMap::new();
289 for (port_name, outgoing) in self.port_to_server.drain() {
290 sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await);
291 }
292
293 let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
294
295 let stdout_receiver = self.launched_binary.as_ref().unwrap().deploy_stdout();
296
297 self.launched_binary
298 .as_ref()
299 .unwrap()
300 .stdin()
301 .send(format!("start: {formatted_defns}\n"))
302 .unwrap();
303
304 let start_ack_line = ProgressTracker::leaf(
305 self.display_id
306 .clone()
307 .unwrap_or_else(|| format!("service/{}", self.id))
308 + " / waiting for ack start",
309 tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
310 )
311 .await??;
312 if !start_ack_line.starts_with("ack start") {
313 bail!("expected ack start");
314 }
315
316 self.started = true;
317 Ok(())
318 }
319
320 async fn stop(&mut self) -> Result<()> {
321 ProgressTracker::with_group(
322 self.display_id
323 .clone()
324 .unwrap_or_else(|| format!("service/{}", self.id)),
325 None,
326 || async {
327 let launched_binary = self.launched_binary.as_mut().unwrap();
328 launched_binary.stdin().send("stop\n".to_string())?;
329
330 let timeout_result = ProgressTracker::leaf(
331 "waiting for exit",
332 tokio::time::timeout(Duration::from_secs(60), launched_binary.wait()),
333 )
334 .await;
335 match timeout_result {
336 Err(_timeout) => {} Ok(Err(unexpected_error)) => return Err(unexpected_error), Ok(Ok(_exit_status)) => {}
339 }
340 launched_binary.stop().await?;
341
342 Ok(())
343 },
344 )
345 .await
346 }
347}