hydro_deploy/rust_crate/
service.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, bail};
7use async_trait::async_trait;
8use futures::Future;
9use hydro_deploy_integration::{InitConfig, ServerPort};
10use serde::Serialize;
11use tokio::sync::{RwLock, mpsc};
12
13use super::build::{BuildError, BuildOutput, BuildParams, build_crate_memoized};
14use super::ports::{self, RustCratePortConfig};
15use super::tracing_options::TracingOptions;
16use crate::progress::ProgressTracker;
17use crate::{
18    BaseServerStrategy, Host, LaunchedBinary, LaunchedHost, PortNetworkHint, ResourceBatch,
19    ResourceResult, ServerStrategy, Service, TracingResults,
20};
21
22pub struct RustCrateService {
23    id: usize,
24    pub(super) on: Arc<dyn Host>,
25    build_params: BuildParams,
26    tracing: Option<TracingOptions>,
27    args: Option<Vec<String>>,
28    display_id: Option<String>,
29    external_ports: Vec<u16>,
30
31    meta: Option<String>,
32
33    /// Configuration for the ports this service will connect to as a client.
34    pub(super) port_to_server: HashMap<String, ports::ServerConfig>,
35
36    /// Configuration for the ports that this service will listen on a port for.
37    pub(super) port_to_bind: HashMap<String, ServerStrategy>,
38
39    launched_host: Option<Arc<dyn LaunchedHost>>,
40
41    /// A map of port names to config for how other services can connect to this one.
42    /// Only valid after `ready` has been called, only contains ports that are configured
43    /// in `server_ports`.
44    pub(super) server_defns: Arc<RwLock<HashMap<String, ServerPort>>>,
45
46    launched_binary: Option<Box<dyn LaunchedBinary>>,
47    started: bool,
48}
49
50impl RustCrateService {
51    #[expect(clippy::too_many_arguments, reason = "internal code")]
52    pub fn new(
53        id: usize,
54        src: PathBuf,
55        on: Arc<dyn Host>,
56        bin: Option<String>,
57        example: Option<String>,
58        profile: Option<String>,
59        rustflags: Option<String>,
60        target_dir: Option<PathBuf>,
61        build_env: Vec<(String, String)>,
62        no_default_features: bool,
63        tracing: Option<TracingOptions>,
64        features: Option<Vec<String>>,
65        config: Option<String>,
66        args: Option<Vec<String>>,
67        display_id: Option<String>,
68        external_ports: Vec<u16>,
69    ) -> Self {
70        let target_type = on.target_type();
71
72        let build_params = BuildParams::new(
73            src,
74            bin,
75            example,
76            profile,
77            rustflags,
78            target_dir,
79            build_env,
80            no_default_features,
81            target_type,
82            features,
83            config,
84        );
85
86        Self {
87            id,
88            on,
89            build_params,
90            tracing,
91            args,
92            display_id,
93            external_ports,
94            meta: None,
95            port_to_server: HashMap::new(),
96            port_to_bind: HashMap::new(),
97            launched_host: None,
98            server_defns: Arc::new(RwLock::new(HashMap::new())),
99            launched_binary: None,
100            started: false,
101        }
102    }
103
104    pub fn update_meta<T: Serialize>(&mut self, meta: T) {
105        if self.launched_binary.is_some() {
106            panic!("Cannot update meta after binary has been launched")
107        }
108
109        self.meta = Some(serde_json::to_string(&meta).unwrap());
110    }
111
112    pub fn get_port(
113        &self,
114        name: String,
115        self_arc: &Arc<RwLock<RustCrateService>>,
116    ) -> RustCratePortConfig {
117        RustCratePortConfig {
118            service: Arc::downgrade(self_arc),
119            service_host: self.on.clone(),
120            service_server_defns: self.server_defns.clone(),
121            network_hint: PortNetworkHint::Auto,
122            port: name,
123            merge: false,
124        }
125    }
126
127    pub fn get_port_with_hint(
128        &self,
129        name: String,
130        network_hint: PortNetworkHint,
131        self_arc: &Arc<RwLock<RustCrateService>>,
132    ) -> RustCratePortConfig {
133        RustCratePortConfig {
134            service: Arc::downgrade(self_arc),
135            service_host: self.on.clone(),
136            service_server_defns: self.server_defns.clone(),
137            network_hint,
138            port: name,
139            merge: false,
140        }
141    }
142
143    pub fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
144        self.launched_binary.as_ref().unwrap().stdout()
145    }
146
147    pub fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
148        self.launched_binary.as_ref().unwrap().stderr()
149    }
150
151    pub fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
152        self.launched_binary.as_ref().unwrap().stdout_filter(prefix)
153    }
154
155    pub fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
156        self.launched_binary.as_ref().unwrap().stderr_filter(prefix)
157    }
158
159    pub fn tracing_results(&self) -> Option<&TracingResults> {
160        self.launched_binary.as_ref().unwrap().tracing_results()
161    }
162
163    pub fn exit_code(&self) -> Option<i32> {
164        self.launched_binary.as_ref().unwrap().exit_code()
165    }
166
167    fn build(
168        &self,
169    ) -> impl use<> + 'static + Future<Output = Result<&'static BuildOutput, BuildError>> {
170        // Memoized, so no caching in `self` is needed.
171        build_crate_memoized(self.build_params.clone())
172    }
173}
174
175#[async_trait]
176impl Service for RustCrateService {
177    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
178        if self.launched_host.is_some() {
179            return;
180        }
181
182        tokio::task::spawn(self.build());
183
184        let host = &self.on;
185
186        host.request_custom_binary();
187        for (_, bind_type) in self.port_to_bind.iter() {
188            host.request_port(bind_type);
189        }
190
191        for port in self.external_ports.iter() {
192            host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
193        }
194    }
195
196    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
197        if self.launched_host.is_some() {
198            return Ok(());
199        }
200
201        ProgressTracker::with_group(
202            self.display_id
203                .clone()
204                .unwrap_or_else(|| format!("service/{}", self.id)),
205            None,
206            || async {
207                let built = ProgressTracker::leaf("build", self.build()).await?;
208
209                let host = &self.on;
210                let launched = host.provision(resource_result);
211
212                launched.copy_binary(built).await?;
213
214                self.launched_host = Some(launched);
215                Ok(())
216            },
217        )
218        .await
219    }
220
221    async fn ready(&mut self) -> Result<()> {
222        if self.launched_binary.is_some() {
223            return Ok(());
224        }
225
226        ProgressTracker::with_group(
227            self.display_id
228                .clone()
229                .unwrap_or_else(|| format!("service/{}", self.id)),
230            None,
231            || async {
232                let launched_host = self.launched_host.as_ref().unwrap();
233
234                let built = self.build().await?;
235                let args = self.args.as_ref().cloned().unwrap_or_default();
236
237                let binary = launched_host
238                    .launch_binary(
239                        self.display_id
240                            .clone()
241                            .unwrap_or_else(|| format!("service/{}", self.id)),
242                        built,
243                        &args,
244                        self.tracing.clone(),
245                    )
246                    .await?;
247
248                let mut bind_config = HashMap::new();
249                for (port_name, bind_type) in self.port_to_bind.iter() {
250                    bind_config.insert(port_name.clone(), launched_host.server_config(bind_type));
251                }
252
253                let formatted_bind_config =
254                    serde_json::to_string::<InitConfig>(&(bind_config, self.meta.clone())).unwrap();
255
256                // request stdout before sending config so we don't miss the "ready" response
257                let stdout_receiver = binary.deploy_stdout();
258
259                binary.stdin().send(format!("{formatted_bind_config}\n"))?;
260
261                let ready_line = ProgressTracker::leaf(
262                    "waiting for ready",
263                    tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
264                )
265                .await
266                .context("Timed out waiting for ready")?
267                .context("Program unexpectedly quit")?;
268                if let Some(line_rest) = ready_line.strip_prefix("ready: ") {
269                    *self.server_defns.try_write().unwrap() =
270                        serde_json::from_str(line_rest).unwrap();
271                } else {
272                    bail!("expected ready");
273                }
274
275                self.launched_binary = Some(binary);
276
277                Ok(())
278            },
279        )
280        .await
281    }
282
283    async fn start(&mut self) -> Result<()> {
284        if self.started {
285            return Ok(());
286        }
287
288        let mut sink_ports = HashMap::new();
289        for (port_name, outgoing) in self.port_to_server.drain() {
290            sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await);
291        }
292
293        let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
294
295        let stdout_receiver = self.launched_binary.as_ref().unwrap().deploy_stdout();
296
297        self.launched_binary
298            .as_ref()
299            .unwrap()
300            .stdin()
301            .send(format!("start: {formatted_defns}\n"))
302            .unwrap();
303
304        let start_ack_line = ProgressTracker::leaf(
305            self.display_id
306                .clone()
307                .unwrap_or_else(|| format!("service/{}", self.id))
308                + " / waiting for ack start",
309            tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
310        )
311        .await??;
312        if !start_ack_line.starts_with("ack start") {
313            bail!("expected ack start");
314        }
315
316        self.started = true;
317        Ok(())
318    }
319
320    async fn stop(&mut self) -> Result<()> {
321        ProgressTracker::with_group(
322            self.display_id
323                .clone()
324                .unwrap_or_else(|| format!("service/{}", self.id)),
325            None,
326            || async {
327                let launched_binary = self.launched_binary.as_mut().unwrap();
328                launched_binary.stdin().send("stop\n".to_string())?;
329
330                let timeout_result = ProgressTracker::leaf(
331                    "waiting for exit",
332                    tokio::time::timeout(Duration::from_secs(60), launched_binary.wait()),
333                )
334                .await;
335                match timeout_result {
336                    Err(_timeout) => {} // `wait()` timed out, but stop will force quit.
337                    Ok(Err(unexpected_error)) => return Err(unexpected_error), // `wait()` errored.
338                    Ok(Ok(_exit_status)) => {}
339                }
340                launched_binary.stop().await?;
341
342                Ok(())
343            },
344        )
345        .await
346    }
347}