hydro_deploy/localhost/
mod.rs
1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::ops::Deref;
5use std::sync::Arc;
6
7use anyhow::{Context, Result, bail};
8use async_process::{Command, Stdio};
9use async_trait::async_trait;
10use hydro_deploy_integration::ServerBindConfig;
11
12use crate::progress::ProgressTracker;
13use crate::rust_crate::build::BuildOutput;
14use crate::rust_crate::tracing_options::TracingOptions;
15use crate::{
16 ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary, LaunchedHost,
17 ResourceBatch, ResourceResult, ServerStrategy,
18};
19
20pub mod launched_binary;
21pub use launched_binary::*;
22
23#[derive(Debug)]
24pub struct LocalhostHost {
25 pub id: usize,
26 client_only: bool,
27}
28
29impl LocalhostHost {
30 pub fn new(id: usize) -> LocalhostHost {
31 LocalhostHost {
32 id,
33 client_only: false,
34 }
35 }
36
37 pub fn client_only(&self) -> LocalhostHost {
38 LocalhostHost {
39 id: self.id,
40 client_only: true,
41 }
42 }
43}
44
45#[async_trait]
46impl Host for LocalhostHost {
47 fn target_type(&self) -> HostTargetType {
48 HostTargetType::Local
49 }
50
51 fn request_port(&self, _bind_type: &ServerStrategy) {}
52 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
53 fn request_custom_binary(&self) {}
54
55 fn id(&self) -> usize {
56 self.id
57 }
58
59 fn as_any(&self) -> &dyn std::any::Any {
60 self
61 }
62
63 fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
64 Some(Arc::new(LaunchedLocalhost))
65 }
66
67 fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
68 Arc::new(LaunchedLocalhost)
69 }
70
71 fn strategy_as_server<'a>(
72 &'a self,
73 connection_from: &dyn Host,
74 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
75 if self.client_only {
76 anyhow::bail!("Localhost cannot be a server if it is client only")
77 }
78
79 if connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
80 Ok((
81 ClientStrategy::UnixSocket(self.id),
82 Box::new(|_| ServerStrategy::UnixSocket),
83 ))
84 } else if connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self)) {
85 Ok((
86 ClientStrategy::InternalTcpPort(self),
87 Box::new(|_| ServerStrategy::InternalTcpPort),
88 ))
89 } else {
90 anyhow::bail!("Could not find a strategy to connect to localhost")
91 }
92 }
93
94 fn can_connect_to(&self, typ: ClientStrategy) -> bool {
95 match typ {
96 ClientStrategy::UnixSocket(id) => {
97 #[cfg(unix)]
98 {
99 self.id == id
100 }
101
102 #[cfg(not(unix))]
103 {
104 let _ = id;
105 false
106 }
107 }
108 ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
109 ClientStrategy::ForwardedTcpPort(_) => true,
110 }
111 }
112}
113
114struct LaunchedLocalhost;
115
116#[async_trait]
117impl LaunchedHost for LaunchedLocalhost {
118 fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig {
119 match bind_type {
120 ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
121 ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort("127.0.0.1".to_string()),
122 ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
123 ServerStrategy::Demux(demux) => {
124 let mut config_map = HashMap::new();
125 for (key, underlying) in demux {
126 config_map.insert(*key, self.server_config(underlying));
127 }
128
129 ServerBindConfig::Demux(config_map)
130 }
131 ServerStrategy::Merge(merge) => {
132 let mut configs = vec![];
133 for underlying in merge {
134 configs.push(self.server_config(underlying));
135 }
136
137 ServerBindConfig::Merge(configs)
138 }
139 ServerStrategy::Tagged(underlying, id) => {
140 ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
141 }
142 ServerStrategy::Null => ServerBindConfig::Null,
143 }
144 }
145
146 async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
147 Ok(())
148 }
149
150 async fn launch_binary(
151 &self,
152 id: String,
153 binary: &BuildOutput,
154 args: &[String],
155 tracing: Option<TracingOptions>,
156 ) -> Result<Box<dyn LaunchedBinary>> {
157 let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
158 if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
159 ProgressTracker::println(
161 format!("[{id} tracing] Profiling binary with `dtrace`.",),
162 );
163 let dtrace_outfile = tempfile::NamedTempFile::new()?;
164
165 let inner_command = itertools::Itertools::intersperse(
167 std::iter::once(binary.bin_path.to_str().unwrap())
168 .chain(args.iter().map(Deref::deref))
169 .map(|s| shell_escape::unix::escape(s.into())),
170 Cow::Borrowed(" "),
171 )
172 .collect::<String>();
173
174 let mut command = Command::new("dtrace");
175 command
176 .arg("-o")
177 .arg(dtrace_outfile.as_ref())
178 .arg("-n")
179 .arg(format!(
180 "profile-{} /pid == $target/ {{ @[ustack()] = count(); }}",
181 tracing.frequency
182 ))
183 .arg("-c")
184 .arg(&*shell_escape::unix::escape(inner_command.into()));
185 (Some(dtrace_outfile), command)
186 }
187 else if cfg!(target_family = "unix") {
209 ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
211 let perf_outfile = tempfile::NamedTempFile::new()?;
212
213 let mut command = Command::new("perf");
214 command
215 .args([
216 "record",
217 "-F",
218 &tracing.frequency.to_string(),
219 "-e",
220 "cycles:u",
221 "--call-graph",
222 "dwarf,65528",
223 "-o",
224 ])
225 .arg(perf_outfile.as_ref())
226 .arg(&binary.bin_path)
227 .args(args);
228
229 (Some(perf_outfile), command)
230 } else {
231 bail!(
232 "Unknown OS for perf/dtrace tracing: {}",
233 std::env::consts::OS
234 );
235 }
236 } else {
237 let mut command = Command::new(&binary.bin_path);
238 command.args(args);
239 (None, command)
240 };
241
242 command
243 .stdin(Stdio::piped())
244 .stdout(Stdio::piped())
245 .stderr(Stdio::piped());
246
247 #[cfg(not(target_family = "unix"))]
248 command.kill_on_drop(true);
249
250 ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
251
252 let child = command
253 .spawn()
254 .with_context(|| format!("Failed to execute command: {:?}", command))?;
255
256 Ok(Box::new(LaunchedLocalhostBinary::new(
257 child,
258 id,
259 tracing,
260 maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
261 )))
262 }
263
264 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
265 Ok(*addr)
266 }
267}