hydro_deploy/localhost/
mod.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use anyhow::{Result, bail};
5use async_process::{Command, Stdio};
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8
9use crate::progress::ProgressTracker;
10use crate::rust_crate::build::BuildOutput;
11use crate::rust_crate::tracing_options::TracingOptions;
12use crate::{
13 BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
14 LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
15};
16
17pub mod launched_binary;
18pub use launched_binary::*;
19mod samply;
20
21#[derive(Debug)]
22pub struct LocalhostHost {
23 pub id: usize,
24 client_only: bool,
25}
26
27impl LocalhostHost {
28 pub fn new(id: usize) -> LocalhostHost {
29 LocalhostHost {
30 id,
31 client_only: false,
32 }
33 }
34
35 pub fn client_only(&self) -> LocalhostHost {
36 LocalhostHost {
37 id: self.id,
38 client_only: true,
39 }
40 }
41}
42
43#[async_trait]
44impl Host for LocalhostHost {
45 fn target_type(&self) -> HostTargetType {
46 HostTargetType::Local
47 }
48
49 fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
50 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
51 fn request_custom_binary(&self) {}
52
53 fn id(&self) -> usize {
54 self.id
55 }
56
57 fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
58 Some(Arc::new(LaunchedLocalhost))
59 }
60
61 fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
62 Arc::new(LaunchedLocalhost)
63 }
64
65 fn strategy_as_server<'a>(
66 &'a self,
67 connection_from: &dyn Host,
68 network_hint: PortNetworkHint,
69 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
70 if self.client_only {
71 anyhow::bail!("Localhost cannot be a server if it is client only")
72 }
73
74 if matches!(network_hint, PortNetworkHint::Auto)
75 && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
76 {
77 Ok((
78 ClientStrategy::UnixSocket(self.id),
79 Box::new(|_| BaseServerStrategy::UnixSocket),
80 ))
81 } else if matches!(
82 network_hint,
83 PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
84 ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
85 {
86 Ok((
87 ClientStrategy::InternalTcpPort(self),
88 Box::new(move |_| {
89 BaseServerStrategy::InternalTcpPort(match network_hint {
90 PortNetworkHint::Auto => None,
91 PortNetworkHint::TcpPort(port) => port,
92 })
93 }),
94 ))
95 } else {
96 anyhow::bail!("Could not find a strategy to connect to localhost")
97 }
98 }
99
100 fn can_connect_to(&self, typ: ClientStrategy) -> bool {
101 match typ {
102 ClientStrategy::UnixSocket(id) => {
103 #[cfg(unix)]
104 {
105 self.id == id
106 }
107
108 #[cfg(not(unix))]
109 {
110 let _ = id;
111 false
112 }
113 }
114 ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
115 ClientStrategy::ForwardedTcpPort(_) => true,
116 }
117 }
118}
119
120struct LaunchedLocalhost;
121
122#[async_trait]
123impl LaunchedHost for LaunchedLocalhost {
124 fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
125 match bind_type {
126 BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
127 BaseServerStrategy::InternalTcpPort(port) => {
128 ServerBindConfig::TcpPort("127.0.0.1".to_string(), *port)
129 }
130 BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
131 }
132 }
133
134 async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
135 Ok(())
136 }
137
138 async fn launch_binary(
139 &self,
140 id: String,
141 binary: &BuildOutput,
142 args: &[String],
143 tracing: Option<TracingOptions>,
144 ) -> Result<Box<dyn LaunchedBinary>> {
145 let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
146 if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
147 ProgressTracker::println(
149 format!("[{id} tracing] Profiling binary with `samply`.",),
150 );
151 let samply_outfile = tempfile::NamedTempFile::new()?;
152
153 let mut command = Command::new("samply");
154 command
155 .arg("record")
156 .arg("--save-only")
157 .arg("--output")
158 .arg(samply_outfile.as_ref())
159 .arg(&binary.bin_path)
160 .args(args);
161 (Some(samply_outfile), command)
162 } else if cfg!(target_family = "unix") {
163 ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
165 let perf_outfile = tempfile::NamedTempFile::new()?;
166
167 let mut command = Command::new("perf");
168 command
169 .args([
170 "record",
171 "-F",
172 &tracing.frequency.to_string(),
173 "-e",
174 "cycles:u",
175 "--call-graph",
176 "dwarf,65528",
177 "-o",
178 ])
179 .arg(perf_outfile.as_ref())
180 .arg(&binary.bin_path)
181 .args(args);
182
183 (Some(perf_outfile), command)
184 } else {
185 bail!(
186 "Unknown OS for perf/dtrace tracing: {}",
187 std::env::consts::OS
188 );
189 }
190 } else {
191 let mut command = Command::new(&binary.bin_path);
192 command.args(args);
193 (None, command)
194 };
195
196 command
197 .stdin(Stdio::piped())
198 .stdout(Stdio::piped())
199 .stderr(Stdio::piped());
200
201 #[cfg(not(target_family = "unix"))]
202 command.kill_on_drop(true);
203
204 ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
205
206 let child = command.spawn().map_err(|e| {
207 let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
208 "Tracing executable not found, ensure it is installed"
209 } else {
210 "Failed to execute command"
211 };
212 anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
213 })?;
214
215 Ok(Box::new(LaunchedLocalhostBinary::new(
216 child,
217 id,
218 tracing,
219 maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
220 )))
221 }
222
223 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
224 Ok(*addr)
225 }
226}