hydro_deploy/localhost/
mod.rs1use std::net::SocketAddr;
2use std::sync::{Arc, OnceLock};
3
4use anyhow::{Result, bail};
5use async_process::{Command, Stdio};
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8
9use crate::progress::ProgressTracker;
10use crate::rust_crate::build::BuildOutput;
11use crate::rust_crate::tracing_options::TracingOptions;
12use crate::{
13 BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
14 LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
15};
16
17pub mod launched_binary;
18pub use launched_binary::*;
19
20#[cfg(any(target_os = "macos", target_family = "windows"))]
21mod samply;
22
23static LOCAL_LIBDIR: OnceLock<String> = OnceLock::new();
24
25#[derive(Debug)]
26pub struct LocalhostHost {
27 pub id: usize,
28 client_only: bool,
29}
30
31impl LocalhostHost {
32 pub fn new(id: usize) -> LocalhostHost {
33 LocalhostHost {
34 id,
35 client_only: false,
36 }
37 }
38
39 pub fn client_only(&self) -> LocalhostHost {
40 LocalhostHost {
41 id: self.id,
42 client_only: true,
43 }
44 }
45}
46
47#[async_trait]
48impl Host for LocalhostHost {
49 fn target_type(&self) -> HostTargetType {
50 HostTargetType::Local
51 }
52
53 fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
54 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
55 fn request_custom_binary(&self) {}
56
57 fn id(&self) -> usize {
58 self.id
59 }
60
61 fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
62 Some(Arc::new(LaunchedLocalhost))
63 }
64
65 fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
66 Arc::new(LaunchedLocalhost)
67 }
68
69 fn strategy_as_server<'a>(
70 &'a self,
71 connection_from: &dyn Host,
72 network_hint: PortNetworkHint,
73 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
74 if self.client_only {
75 anyhow::bail!("Localhost cannot be a server if it is client only")
76 }
77
78 if matches!(network_hint, PortNetworkHint::Auto)
79 && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
80 {
81 Ok((
82 ClientStrategy::UnixSocket(self.id),
83 Box::new(|_| BaseServerStrategy::UnixSocket),
84 ))
85 } else if matches!(
86 network_hint,
87 PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
88 ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
89 {
90 Ok((
91 ClientStrategy::InternalTcpPort(self),
92 Box::new(move |_| {
93 BaseServerStrategy::InternalTcpPort(match network_hint {
94 PortNetworkHint::Auto => None,
95 PortNetworkHint::TcpPort(port) => port,
96 })
97 }),
98 ))
99 } else {
100 anyhow::bail!("Could not find a strategy to connect to localhost")
101 }
102 }
103
104 fn can_connect_to(&self, typ: ClientStrategy) -> bool {
105 match typ {
106 ClientStrategy::UnixSocket(id) => {
107 #[cfg(unix)]
108 {
109 self.id == id
110 }
111
112 #[cfg(not(unix))]
113 {
114 let _ = id;
115 false
116 }
117 }
118 ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
119 ClientStrategy::ForwardedTcpPort(_) => true,
120 }
121 }
122}
123
124struct LaunchedLocalhost;
125
126#[async_trait]
127impl LaunchedHost for LaunchedLocalhost {
128 fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
129 match bind_type {
130 BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
131 BaseServerStrategy::InternalTcpPort(port) => {
132 ServerBindConfig::TcpPort("127.0.0.1".to_string(), *port)
133 }
134 BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
135 }
136 }
137
138 async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
139 Ok(())
140 }
141
142 async fn launch_binary(
143 &self,
144 id: String,
145 binary: &BuildOutput,
146 args: &[String],
147 tracing: Option<TracingOptions>,
148 ) -> Result<Box<dyn LaunchedBinary>> {
149 let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
150 if cfg!(any(target_os = "macos", target_family = "windows")) {
151 ProgressTracker::println(
153 format!("[{id} tracing] Profiling binary with `samply`.",),
154 );
155 let samply_outfile = tempfile::NamedTempFile::new()?;
156
157 let mut command = Command::new("samply");
158 command
159 .arg("record")
160 .arg("--save-only")
161 .arg("--output")
162 .arg(samply_outfile.as_ref())
163 .arg(&binary.bin_path)
164 .args(args);
165 (Some(samply_outfile), command)
166 } else if cfg!(target_family = "unix") {
167 ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
169 let perf_outfile = tempfile::NamedTempFile::new()?;
170
171 let mut command = Command::new("perf");
172 command
173 .args([
174 "record",
175 "-F",
176 &tracing.frequency.to_string(),
177 "-e",
178 "cycles:u",
179 "--call-graph",
180 "dwarf,65528",
181 "-o",
182 ])
183 .arg(perf_outfile.as_ref())
184 .arg(&binary.bin_path)
185 .args(args);
186
187 (Some(perf_outfile), command)
188 } else {
189 bail!(
190 "Unknown OS for samply/perf tracing: {}",
191 std::env::consts::OS
192 );
193 }
194 } else {
195 let mut command = Command::new(&binary.bin_path);
196 command.args(args);
197 (None, command)
198 };
199
200 let dylib_path_var = if cfg!(windows) {
202 "PATH"
203 } else if cfg!(target_os = "macos") {
204 "DYLD_FALLBACK_LIBRARY_PATH"
205 } else if cfg!(target_os = "aix") {
206 "LIBPATH"
207 } else {
208 "LD_LIBRARY_PATH"
209 };
210
211 let local_libdir = LOCAL_LIBDIR.get_or_init(|| {
212 std::process::Command::new("rustc")
213 .arg("--print")
214 .arg("target-libdir")
215 .output()
216 .map(|output| String::from_utf8(output.stdout).unwrap().trim().to_string())
217 .unwrap()
218 });
219
220 command.env(
221 dylib_path_var,
222 std::env::var_os(dylib_path_var).map_or_else(
223 || {
224 std::env::join_paths(
225 [
226 binary.shared_library_path.as_ref(),
227 Some(&std::path::PathBuf::from(local_libdir)),
228 ]
229 .into_iter()
230 .flatten(),
231 )
232 .unwrap()
233 },
234 |paths| {
235 let mut paths = std::env::split_paths(&paths).collect::<Vec<_>>();
236 paths.insert(0, std::path::PathBuf::from(local_libdir));
237 if let Some(shared_path) = &binary.shared_library_path {
238 paths.insert(0, shared_path.to_path_buf());
239 }
240 std::env::join_paths(paths).unwrap()
241 },
242 ),
243 );
244
245 command
246 .stdin(Stdio::piped())
247 .stdout(Stdio::piped())
248 .stderr(Stdio::piped());
249
250 #[cfg(not(target_family = "unix"))]
251 command.kill_on_drop(true);
252
253 ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
254
255 let child = command.spawn().map_err(|e| {
256 let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
257 "Tracing executable not found, ensure it is installed"
258 } else {
259 "Failed to execute command"
260 };
261 anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
262 })?;
263
264 Ok(Box::new(LaunchedLocalhostBinary::new(
265 child,
266 id,
267 tracing,
268 maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
269 )))
270 }
271
272 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
273 Ok(*addr)
274 }
275}