hydro_deploy/rust_crate/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, bail};
6use async_trait::async_trait;
7use futures::Future;
8use hydro_deploy_integration::{InitConfig, ServerPort};
9use serde::Serialize;
10use tokio::sync::{RwLock, mpsc};
11
12use super::build::{BuildError, BuildOutput, BuildParams, build_crate_memoized};
13use super::ports::{self, RustCratePortConfig};
14use super::tracing_options::TracingOptions;
15use crate::progress::ProgressTracker;
16use crate::{
17    BaseServerStrategy, Host, LaunchedBinary, LaunchedHost, PortNetworkHint, ResourceBatch,
18    ResourceResult, ServerStrategy, Service, TracingResults,
19};
20
21pub struct RustCrateService {
22    id: usize,
23    pub(super) on: Arc<dyn Host>,
24    build_params: BuildParams,
25    tracing: Option<TracingOptions>,
26    args: Option<Vec<String>>,
27    display_id: Option<String>,
28    external_ports: Vec<u16>,
29
30    meta: Option<String>,
31
32    /// Configuration for the ports this service will connect to as a client.
33    pub(super) port_to_server: HashMap<String, ports::ServerConfig>,
34
35    /// Configuration for the ports that this service will listen on a port for.
36    pub(super) port_to_bind: HashMap<String, ServerStrategy>,
37
38    launched_host: Option<Arc<dyn LaunchedHost>>,
39
40    /// A map of port names to config for how other services can connect to this one.
41    /// Only valid after `ready` has been called, only contains ports that are configured
42    /// in `server_ports`.
43    pub(super) server_defns: Arc<RwLock<HashMap<String, ServerPort>>>,
44
45    launched_binary: Option<Box<dyn LaunchedBinary>>,
46    started: bool,
47}
48
49impl RustCrateService {
50    pub fn new(
51        id: usize,
52        on: Arc<dyn Host>,
53        build_params: BuildParams,
54        tracing: Option<TracingOptions>,
55        args: Option<Vec<String>>,
56        display_id: Option<String>,
57        external_ports: Vec<u16>,
58    ) -> Self {
59        Self {
60            id,
61            on,
62            build_params,
63            tracing,
64            args,
65            display_id,
66            external_ports,
67            meta: None,
68            port_to_server: HashMap::new(),
69            port_to_bind: HashMap::new(),
70            launched_host: None,
71            server_defns: Arc::new(RwLock::new(HashMap::new())),
72            launched_binary: None,
73            started: false,
74        }
75    }
76
77    pub fn update_meta<T: Serialize>(&mut self, meta: T) {
78        if self.launched_binary.is_some() {
79            panic!("Cannot update meta after binary has been launched")
80        }
81
82        self.meta = Some(serde_json::to_string(&meta).unwrap());
83    }
84
85    pub fn get_port(
86        &self,
87        name: String,
88        self_arc: &Arc<RwLock<RustCrateService>>,
89    ) -> RustCratePortConfig {
90        RustCratePortConfig {
91            service: Arc::downgrade(self_arc),
92            service_host: self.on.clone(),
93            service_server_defns: self.server_defns.clone(),
94            network_hint: PortNetworkHint::Auto,
95            port: name,
96            merge: false,
97        }
98    }
99
100    pub fn get_port_with_hint(
101        &self,
102        name: String,
103        network_hint: PortNetworkHint,
104        self_arc: &Arc<RwLock<RustCrateService>>,
105    ) -> RustCratePortConfig {
106        RustCratePortConfig {
107            service: Arc::downgrade(self_arc),
108            service_host: self.on.clone(),
109            service_server_defns: self.server_defns.clone(),
110            network_hint,
111            port: name,
112            merge: false,
113        }
114    }
115
116    pub fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
117        self.launched_binary.as_ref().unwrap().stdout()
118    }
119
120    pub fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
121        self.launched_binary.as_ref().unwrap().stderr()
122    }
123
124    pub fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
125        self.launched_binary.as_ref().unwrap().stdout_filter(prefix)
126    }
127
128    pub fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
129        self.launched_binary.as_ref().unwrap().stderr_filter(prefix)
130    }
131
132    pub fn tracing_results(&self) -> Option<&TracingResults> {
133        self.launched_binary.as_ref().unwrap().tracing_results()
134    }
135
136    pub fn exit_code(&self) -> Option<i32> {
137        self.launched_binary.as_ref().unwrap().exit_code()
138    }
139
140    fn build(
141        &self,
142    ) -> impl use<> + 'static + Future<Output = Result<&'static BuildOutput, BuildError>> {
143        // Memoized, so no caching in `self` is needed.
144        build_crate_memoized(self.build_params.clone())
145    }
146}
147
148#[async_trait]
149impl Service for RustCrateService {
150    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
151        if self.launched_host.is_some() {
152            return;
153        }
154
155        tokio::task::spawn(self.build());
156
157        let host = &self.on;
158
159        host.request_custom_binary();
160        for (_, bind_type) in self.port_to_bind.iter() {
161            host.request_port(bind_type);
162        }
163
164        for port in self.external_ports.iter() {
165            host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
166        }
167    }
168
169    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
170        if self.launched_host.is_some() {
171            return Ok(());
172        }
173
174        ProgressTracker::with_group(
175            self.display_id
176                .clone()
177                .unwrap_or_else(|| format!("service/{}", self.id)),
178            None,
179            || async {
180                let built = self.build().await?;
181
182                let host = &self.on;
183                let launched = host.provision(resource_result);
184
185                launched.copy_binary(built).await?;
186
187                self.launched_host = Some(launched);
188                Ok(())
189            },
190        )
191        .await
192    }
193
194    async fn ready(&mut self) -> Result<()> {
195        if self.launched_binary.is_some() {
196            return Ok(());
197        }
198
199        ProgressTracker::with_group(
200            self.display_id
201                .clone()
202                .unwrap_or_else(|| format!("service/{}", self.id)),
203            None,
204            || async {
205                let launched_host = self.launched_host.as_ref().unwrap();
206
207                let built = self.build().await?;
208                let args = self.args.as_ref().cloned().unwrap_or_default();
209
210                let binary = launched_host
211                    .launch_binary(
212                        self.display_id
213                            .clone()
214                            .unwrap_or_else(|| format!("service/{}", self.id)),
215                        built,
216                        &args,
217                        self.tracing.clone(),
218                    )
219                    .await?;
220
221                let mut bind_config = HashMap::new();
222                for (port_name, bind_type) in self.port_to_bind.iter() {
223                    bind_config.insert(port_name.clone(), launched_host.server_config(bind_type));
224                }
225
226                let formatted_bind_config =
227                    serde_json::to_string::<InitConfig>(&(bind_config, self.meta.clone())).unwrap();
228
229                // request stdout before sending config so we don't miss the "ready" response
230                let stdout_receiver = binary.deploy_stdout();
231
232                binary.stdin().send(format!("{formatted_bind_config}\n"))?;
233
234                let ready_line = ProgressTracker::leaf(
235                    "waiting for ready",
236                    tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
237                )
238                .await
239                .context("Timed out waiting for ready")?
240                .context("Program unexpectedly quit")?;
241                if let Some(line_rest) = ready_line.strip_prefix("ready: ") {
242                    *self.server_defns.try_write().unwrap() =
243                        serde_json::from_str(line_rest).unwrap();
244                } else {
245                    bail!("expected ready");
246                }
247
248                self.launched_binary = Some(binary);
249
250                Ok(())
251            },
252        )
253        .await
254    }
255
256    async fn start(&mut self) -> Result<()> {
257        if self.started {
258            return Ok(());
259        }
260
261        let mut sink_ports = HashMap::new();
262        for (port_name, outgoing) in self.port_to_server.drain() {
263            sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await);
264        }
265
266        let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
267
268        let stdout_receiver = self.launched_binary.as_ref().unwrap().deploy_stdout();
269
270        self.launched_binary
271            .as_ref()
272            .unwrap()
273            .stdin()
274            .send(format!("start: {formatted_defns}\n"))
275            .unwrap();
276
277        let start_ack_line = ProgressTracker::leaf(
278            self.display_id
279                .clone()
280                .unwrap_or_else(|| format!("service/{}", self.id))
281                + " / waiting for ack start",
282            tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
283        )
284        .await??;
285        if !start_ack_line.starts_with("ack start") {
286            bail!("expected ack start");
287        }
288
289        self.started = true;
290        Ok(())
291    }
292
293    async fn stop(&mut self) -> Result<()> {
294        ProgressTracker::with_group(
295            self.display_id
296                .clone()
297                .unwrap_or_else(|| format!("service/{}", self.id)),
298            None,
299            || async {
300                let launched_binary = self.launched_binary.as_mut().unwrap();
301                launched_binary.stdin().send("stop\n".to_string())?;
302
303                let timeout_result = ProgressTracker::leaf(
304                    "waiting for exit",
305                    tokio::time::timeout(Duration::from_secs(60), launched_binary.wait()),
306                )
307                .await;
308                match timeout_result {
309                    Err(_timeout) => {} // `wait()` timed out, but stop will force quit.
310                    Ok(Err(unexpected_error)) => return Err(unexpected_error), // `wait()` errored.
311                    Ok(Ok(_exit_status)) => {}
312                }
313                launched_binary.stop().await?;
314
315                Ok(())
316            },
317        )
318        .await
319    }
320}