hydro_deploy/localhost/
mod.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::ops::Deref;
5use std::sync::Arc;
6
7use anyhow::{Context, Result, bail};
8use async_process::{Command, Stdio};
9use async_trait::async_trait;
10use hydro_deploy_integration::ServerBindConfig;
11
12use crate::progress::ProgressTracker;
13use crate::rust_crate::build::BuildOutput;
14use crate::rust_crate::tracing_options::TracingOptions;
15use crate::{
16    ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary, LaunchedHost,
17    ResourceBatch, ResourceResult, ServerStrategy,
18};
19
20pub mod launched_binary;
21pub use launched_binary::*;
22
23#[derive(Debug)]
24pub struct LocalhostHost {
25    pub id: usize,
26    client_only: bool,
27}
28
29impl LocalhostHost {
30    pub fn new(id: usize) -> LocalhostHost {
31        LocalhostHost {
32            id,
33            client_only: false,
34        }
35    }
36
37    pub fn client_only(&self) -> LocalhostHost {
38        LocalhostHost {
39            id: self.id,
40            client_only: true,
41        }
42    }
43}
44
45#[async_trait]
46impl Host for LocalhostHost {
47    fn target_type(&self) -> HostTargetType {
48        HostTargetType::Local
49    }
50
51    fn request_port(&self, _bind_type: &ServerStrategy) {}
52    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
53    fn request_custom_binary(&self) {}
54
55    fn id(&self) -> usize {
56        self.id
57    }
58
59    fn as_any(&self) -> &dyn std::any::Any {
60        self
61    }
62
63    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
64        Some(Arc::new(LaunchedLocalhost))
65    }
66
67    fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
68        Arc::new(LaunchedLocalhost)
69    }
70
71    fn strategy_as_server<'a>(
72        &'a self,
73        connection_from: &dyn Host,
74    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
75        if self.client_only {
76            anyhow::bail!("Localhost cannot be a server if it is client only")
77        }
78
79        if connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
80            Ok((
81                ClientStrategy::UnixSocket(self.id),
82                Box::new(|_| ServerStrategy::UnixSocket),
83            ))
84        } else if connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self)) {
85            Ok((
86                ClientStrategy::InternalTcpPort(self),
87                Box::new(|_| ServerStrategy::InternalTcpPort),
88            ))
89        } else {
90            anyhow::bail!("Could not find a strategy to connect to localhost")
91        }
92    }
93
94    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
95        match typ {
96            ClientStrategy::UnixSocket(id) => {
97                #[cfg(unix)]
98                {
99                    self.id == id
100                }
101
102                #[cfg(not(unix))]
103                {
104                    let _ = id;
105                    false
106                }
107            }
108            ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
109            ClientStrategy::ForwardedTcpPort(_) => true,
110        }
111    }
112}
113
114struct LaunchedLocalhost;
115
116#[async_trait]
117impl LaunchedHost for LaunchedLocalhost {
118    fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig {
119        match bind_type {
120            ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
121            ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort("127.0.0.1".to_string()),
122            ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
123            ServerStrategy::Demux(demux) => {
124                let mut config_map = HashMap::new();
125                for (key, underlying) in demux {
126                    config_map.insert(*key, self.server_config(underlying));
127                }
128
129                ServerBindConfig::Demux(config_map)
130            }
131            ServerStrategy::Merge(merge) => {
132                let mut configs = vec![];
133                for underlying in merge {
134                    configs.push(self.server_config(underlying));
135                }
136
137                ServerBindConfig::Merge(configs)
138            }
139            ServerStrategy::Tagged(underlying, id) => {
140                ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
141            }
142            ServerStrategy::Null => ServerBindConfig::Null,
143        }
144    }
145
146    async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
147        Ok(())
148    }
149
150    async fn launch_binary(
151        &self,
152        id: String,
153        binary: &BuildOutput,
154        args: &[String],
155        tracing: Option<TracingOptions>,
156    ) -> Result<Box<dyn LaunchedBinary>> {
157        let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
158            if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
159                // dtrace
160                ProgressTracker::println(
161                    format!("[{id} tracing] Profiling binary with `dtrace`.",),
162                );
163                let dtrace_outfile = tempfile::NamedTempFile::new()?;
164
165                // TODO(mingwei): use std `intersperse` when stabilized.
166                let inner_command = itertools::Itertools::intersperse(
167                    std::iter::once(binary.bin_path.to_str().unwrap())
168                        .chain(args.iter().map(Deref::deref))
169                        .map(|s| shell_escape::unix::escape(s.into())),
170                    Cow::Borrowed(" "),
171                )
172                .collect::<String>();
173
174                let mut command = Command::new("dtrace");
175                command
176                    .arg("-o")
177                    .arg(dtrace_outfile.as_ref())
178                    .arg("-n")
179                    .arg(format!(
180                        "profile-{} /pid == $target/ {{ @[ustack()] = count(); }}",
181                        tracing.frequency
182                    ))
183                    .arg("-c")
184                    .arg(&*shell_escape::unix::escape(inner_command.into()));
185                (Some(dtrace_outfile), command)
186            }
187            // else if cfg!(target_family = "windows") {
188            //     // blondie_dtrace
189            //     ProgressTracker::println(&format!(
190            //         "[{id} tracing] Profiling binary with `blondie`. `TracingOptions::frequency` is ignored. Ensure that this is run as admin.",
191            //     ));
192            //     ProgressTracker::println(&format!(
193            //         "[{id} tracing] Install `blondie` via `cargo install blondie --all-features`.",
194            //     ));
195            //     let _ = tracing;
196            //     let mut command = Command::new("blondie");
197            //     command
198            //         .arg("-o")
199            //         .arg(format!(
200            //             "./blondie-{}.stacks",
201            //             nanoid::nanoid!(5), // TODO!
202            //         ))
203            //         .arg("folded-text")
204            //         .arg(&binary.bin_path)
205            //         .args(args);
206            //     command
207            // }
208            else if cfg!(target_family = "unix") {
209                // perf
210                ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
211                let perf_outfile = tempfile::NamedTempFile::new()?;
212
213                let mut command = Command::new("perf");
214                command
215                    .args([
216                        "record",
217                        "-F",
218                        &tracing.frequency.to_string(),
219                        "-e",
220                        "cycles:u",
221                        "--call-graph",
222                        "dwarf,65528",
223                        "-o",
224                    ])
225                    .arg(perf_outfile.as_ref())
226                    .arg(&binary.bin_path)
227                    .args(args);
228
229                (Some(perf_outfile), command)
230            } else {
231                bail!(
232                    "Unknown OS for perf/dtrace tracing: {}",
233                    std::env::consts::OS
234                );
235            }
236        } else {
237            let mut command = Command::new(&binary.bin_path);
238            command.args(args);
239            (None, command)
240        };
241
242        command
243            .stdin(Stdio::piped())
244            .stdout(Stdio::piped())
245            .stderr(Stdio::piped());
246
247        #[cfg(not(target_family = "unix"))]
248        command.kill_on_drop(true);
249
250        ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
251
252        let child = command
253            .spawn()
254            .with_context(|| format!("Failed to execute command: {:?}", command))?;
255
256        Ok(Box::new(LaunchedLocalhostBinary::new(
257            child,
258            id,
259            tracing,
260            maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
261        )))
262    }
263
264    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
265        Ok(*addr)
266    }
267}