hydro_deploy/localhost/
mod.rs
1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use anyhow::{Result, bail};
6use async_process::{Command, Stdio};
7use async_trait::async_trait;
8use hydro_deploy_integration::ServerBindConfig;
9
10use crate::progress::ProgressTracker;
11use crate::rust_crate::build::BuildOutput;
12use crate::rust_crate::tracing_options::TracingOptions;
13use crate::{
14 ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary, LaunchedHost,
15 ResourceBatch, ResourceResult, ServerStrategy,
16};
17
18pub mod launched_binary;
19pub use launched_binary::*;
20mod samply;
21
22#[derive(Debug)]
23pub struct LocalhostHost {
24 pub id: usize,
25 client_only: bool,
26}
27
28impl LocalhostHost {
29 pub fn new(id: usize) -> LocalhostHost {
30 LocalhostHost {
31 id,
32 client_only: false,
33 }
34 }
35
36 pub fn client_only(&self) -> LocalhostHost {
37 LocalhostHost {
38 id: self.id,
39 client_only: true,
40 }
41 }
42}
43
44#[async_trait]
45impl Host for LocalhostHost {
46 fn target_type(&self) -> HostTargetType {
47 HostTargetType::Local
48 }
49
50 fn request_port(&self, _bind_type: &ServerStrategy) {}
51 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
52 fn request_custom_binary(&self) {}
53
54 fn id(&self) -> usize {
55 self.id
56 }
57
58 fn as_any(&self) -> &dyn std::any::Any {
59 self
60 }
61
62 fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
63 Some(Arc::new(LaunchedLocalhost))
64 }
65
66 fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
67 Arc::new(LaunchedLocalhost)
68 }
69
70 fn strategy_as_server<'a>(
71 &'a self,
72 connection_from: &dyn Host,
73 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
74 if self.client_only {
75 anyhow::bail!("Localhost cannot be a server if it is client only")
76 }
77
78 if connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
79 Ok((
80 ClientStrategy::UnixSocket(self.id),
81 Box::new(|_| ServerStrategy::UnixSocket),
82 ))
83 } else if connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self)) {
84 Ok((
85 ClientStrategy::InternalTcpPort(self),
86 Box::new(|_| ServerStrategy::InternalTcpPort),
87 ))
88 } else {
89 anyhow::bail!("Could not find a strategy to connect to localhost")
90 }
91 }
92
93 fn can_connect_to(&self, typ: ClientStrategy) -> bool {
94 match typ {
95 ClientStrategy::UnixSocket(id) => {
96 #[cfg(unix)]
97 {
98 self.id == id
99 }
100
101 #[cfg(not(unix))]
102 {
103 let _ = id;
104 false
105 }
106 }
107 ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
108 ClientStrategy::ForwardedTcpPort(_) => true,
109 }
110 }
111}
112
113struct LaunchedLocalhost;
114
115#[async_trait]
116impl LaunchedHost for LaunchedLocalhost {
117 fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig {
118 match bind_type {
119 ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
120 ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort("127.0.0.1".to_string()),
121 ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
122 ServerStrategy::Demux(demux) => {
123 let mut config_map = HashMap::new();
124 for (key, underlying) in demux {
125 config_map.insert(*key, self.server_config(underlying));
126 }
127
128 ServerBindConfig::Demux(config_map)
129 }
130 ServerStrategy::Merge(merge) => {
131 let mut configs = vec![];
132 for underlying in merge {
133 configs.push(self.server_config(underlying));
134 }
135
136 ServerBindConfig::Merge(configs)
137 }
138 ServerStrategy::Tagged(underlying, id) => {
139 ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
140 }
141 ServerStrategy::Null => ServerBindConfig::Null,
142 }
143 }
144
145 async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
146 Ok(())
147 }
148
149 async fn launch_binary(
150 &self,
151 id: String,
152 binary: &BuildOutput,
153 args: &[String],
154 tracing: Option<TracingOptions>,
155 ) -> Result<Box<dyn LaunchedBinary>> {
156 let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
157 if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
158 ProgressTracker::println(
160 format!("[{id} tracing] Profiling binary with `samply`.",),
161 );
162 let samply_outfile = tempfile::NamedTempFile::new()?;
163
164 let mut command = Command::new("samply");
165 command
166 .arg("record")
167 .arg("--save-only")
168 .arg("--output")
169 .arg(samply_outfile.as_ref())
170 .arg(&binary.bin_path)
171 .args(args);
172 (Some(samply_outfile), command)
173 } else if cfg!(target_family = "unix") {
174 ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
176 let perf_outfile = tempfile::NamedTempFile::new()?;
177
178 let mut command = Command::new("perf");
179 command
180 .args([
181 "record",
182 "-F",
183 &tracing.frequency.to_string(),
184 "-e",
185 "cycles:u",
186 "--call-graph",
187 "dwarf,65528",
188 "-o",
189 ])
190 .arg(perf_outfile.as_ref())
191 .arg(&binary.bin_path)
192 .args(args);
193
194 (Some(perf_outfile), command)
195 } else {
196 bail!(
197 "Unknown OS for perf/dtrace tracing: {}",
198 std::env::consts::OS
199 );
200 }
201 } else {
202 let mut command = Command::new(&binary.bin_path);
203 command.args(args);
204 (None, command)
205 };
206
207 command
208 .stdin(Stdio::piped())
209 .stdout(Stdio::piped())
210 .stderr(Stdio::piped());
211
212 #[cfg(not(target_family = "unix"))]
213 command.kill_on_drop(true);
214
215 ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
216
217 let child = command.spawn().map_err(|e| {
218 let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
219 "Tracing executable not found, ensure it is installed"
220 } else {
221 "Failed to execute command"
222 };
223 anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
224 })?;
225
226 Ok(Box::new(LaunchedLocalhostBinary::new(
227 child,
228 id,
229 tracing,
230 maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
231 )))
232 }
233
234 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
235 Ok(*addr)
236 }
237}