hydro_deploy/localhost/
mod.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, OnceLock};
3
4use anyhow::{Result, bail};
5use async_process::{Command, Stdio};
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8
9use crate::progress::ProgressTracker;
10use crate::rust_crate::build::BuildOutput;
11use crate::rust_crate::tracing_options::TracingOptions;
12use crate::{
13    BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
14    LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
15};
16
17pub mod launched_binary;
18pub use launched_binary::*;
19
20#[cfg(any(target_os = "macos", target_family = "windows"))]
21mod samply;
22
23static LOCAL_LIBDIR: OnceLock<String> = OnceLock::new();
24
25#[derive(Debug)]
26pub struct LocalhostHost {
27    pub id: usize,
28    client_only: bool,
29}
30
31impl LocalhostHost {
32    pub fn new(id: usize) -> LocalhostHost {
33        LocalhostHost {
34            id,
35            client_only: false,
36        }
37    }
38
39    pub fn client_only(&self) -> LocalhostHost {
40        LocalhostHost {
41            id: self.id,
42            client_only: true,
43        }
44    }
45}
46
47#[async_trait]
48impl Host for LocalhostHost {
49    fn target_type(&self) -> HostTargetType {
50        HostTargetType::Local
51    }
52
53    fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
54    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
55    fn request_custom_binary(&self) {}
56
57    fn id(&self) -> usize {
58        self.id
59    }
60
61    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
62        Some(Arc::new(LaunchedLocalhost))
63    }
64
65    fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
66        Arc::new(LaunchedLocalhost)
67    }
68
69    fn strategy_as_server<'a>(
70        &'a self,
71        connection_from: &dyn Host,
72        network_hint: PortNetworkHint,
73    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
74        if self.client_only {
75            anyhow::bail!("Localhost cannot be a server if it is client only")
76        }
77
78        if matches!(network_hint, PortNetworkHint::Auto)
79            && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
80        {
81            Ok((
82                ClientStrategy::UnixSocket(self.id),
83                Box::new(|_| BaseServerStrategy::UnixSocket),
84            ))
85        } else if matches!(
86            network_hint,
87            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
88        ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
89        {
90            Ok((
91                ClientStrategy::InternalTcpPort(self),
92                Box::new(move |_| {
93                    BaseServerStrategy::InternalTcpPort(match network_hint {
94                        PortNetworkHint::Auto => None,
95                        PortNetworkHint::TcpPort(port) => port,
96                    })
97                }),
98            ))
99        } else {
100            anyhow::bail!("Could not find a strategy to connect to localhost")
101        }
102    }
103
104    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
105        match typ {
106            ClientStrategy::UnixSocket(id) => {
107                #[cfg(unix)]
108                {
109                    self.id == id
110                }
111
112                #[cfg(not(unix))]
113                {
114                    let _ = id;
115                    false
116                }
117            }
118            ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
119            ClientStrategy::ForwardedTcpPort(_) => true,
120        }
121    }
122}
123
124struct LaunchedLocalhost;
125
126#[async_trait]
127impl LaunchedHost for LaunchedLocalhost {
128    fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
129        match bind_type {
130            BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
131            BaseServerStrategy::InternalTcpPort(port) => {
132                ServerBindConfig::TcpPort("127.0.0.1".to_string(), *port)
133            }
134            BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
135        }
136    }
137
138    async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
139        Ok(())
140    }
141
142    async fn launch_binary(
143        &self,
144        id: String,
145        binary: &BuildOutput,
146        args: &[String],
147        tracing: Option<TracingOptions>,
148    ) -> Result<Box<dyn LaunchedBinary>> {
149        let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
150            if cfg!(any(target_os = "macos", target_family = "windows")) {
151                // samply
152                ProgressTracker::println(
153                    format!("[{id} tracing] Profiling binary with `samply`.",),
154                );
155                let samply_outfile = tempfile::NamedTempFile::new()?;
156
157                let mut command = Command::new("samply");
158                command
159                    .arg("record")
160                    .arg("--save-only")
161                    .arg("--output")
162                    .arg(samply_outfile.as_ref())
163                    .arg(&binary.bin_path)
164                    .args(args);
165                (Some(samply_outfile), command)
166            } else if cfg!(target_family = "unix") {
167                // perf
168                ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
169                let perf_outfile = tempfile::NamedTempFile::new()?;
170
171                let mut command = Command::new("perf");
172                command
173                    .args([
174                        "record",
175                        "-F",
176                        &tracing.frequency.to_string(),
177                        "-e",
178                        "cycles:u",
179                        "--call-graph",
180                        "dwarf,65528",
181                        "-o",
182                    ])
183                    .arg(perf_outfile.as_ref())
184                    .arg(&binary.bin_path)
185                    .args(args);
186
187                (Some(perf_outfile), command)
188            } else {
189                bail!(
190                    "Unknown OS for samply/perf tracing: {}",
191                    std::env::consts::OS
192                );
193            }
194        } else {
195            let mut command = Command::new(&binary.bin_path);
196            command.args(args);
197            (None, command)
198        };
199
200        // from cargo (https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/paths.rs#L38)
201        let dylib_path_var = if cfg!(windows) {
202            "PATH"
203        } else if cfg!(target_os = "macos") {
204            "DYLD_FALLBACK_LIBRARY_PATH"
205        } else if cfg!(target_os = "aix") {
206            "LIBPATH"
207        } else {
208            "LD_LIBRARY_PATH"
209        };
210
211        let local_libdir = LOCAL_LIBDIR.get_or_init(|| {
212            std::process::Command::new("rustc")
213                .arg("--print")
214                .arg("target-libdir")
215                .output()
216                .map(|output| String::from_utf8(output.stdout).unwrap().trim().to_string())
217                .unwrap()
218        });
219
220        command.env(
221            dylib_path_var,
222            std::env::var_os(dylib_path_var).map_or_else(
223                || {
224                    std::env::join_paths(
225                        [
226                            binary.shared_library_path.as_ref(),
227                            Some(&std::path::PathBuf::from(local_libdir)),
228                        ]
229                        .into_iter()
230                        .flatten(),
231                    )
232                    .unwrap()
233                },
234                |paths| {
235                    let mut paths = std::env::split_paths(&paths).collect::<Vec<_>>();
236                    paths.insert(0, std::path::PathBuf::from(local_libdir));
237                    if let Some(shared_path) = &binary.shared_library_path {
238                        paths.insert(0, shared_path.to_path_buf());
239                    }
240                    std::env::join_paths(paths).unwrap()
241                },
242            ),
243        );
244
245        command
246            .stdin(Stdio::piped())
247            .stdout(Stdio::piped())
248            .stderr(Stdio::piped());
249
250        #[cfg(not(target_family = "unix"))]
251        command.kill_on_drop(true);
252
253        ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
254
255        let child = command.spawn().map_err(|e| {
256            let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
257                "Tracing executable not found, ensure it is installed"
258            } else {
259                "Failed to execute command"
260            };
261            anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
262        })?;
263
264        Ok(Box::new(LaunchedLocalhostBinary::new(
265            child,
266            id,
267            tracing,
268            maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
269        )))
270    }
271
272    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
273        Ok(*addr)
274    }
275}