hydro_deploy/localhost/
mod.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use anyhow::{Result, bail};
5use async_process::{Command, Stdio};
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8
9use crate::progress::ProgressTracker;
10use crate::rust_crate::build::BuildOutput;
11use crate::rust_crate::tracing_options::TracingOptions;
12use crate::{
13    BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
14    LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
15};
16
17pub mod launched_binary;
18pub use launched_binary::*;
19mod samply;
20
21#[derive(Debug)]
22pub struct LocalhostHost {
23    pub id: usize,
24    client_only: bool,
25}
26
27impl LocalhostHost {
28    pub fn new(id: usize) -> LocalhostHost {
29        LocalhostHost {
30            id,
31            client_only: false,
32        }
33    }
34
35    pub fn client_only(&self) -> LocalhostHost {
36        LocalhostHost {
37            id: self.id,
38            client_only: true,
39        }
40    }
41}
42
43#[async_trait]
44impl Host for LocalhostHost {
45    fn target_type(&self) -> HostTargetType {
46        HostTargetType::Local
47    }
48
49    fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
50    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
51    fn request_custom_binary(&self) {}
52
53    fn id(&self) -> usize {
54        self.id
55    }
56
57    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
58        Some(Arc::new(LaunchedLocalhost))
59    }
60
61    fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
62        Arc::new(LaunchedLocalhost)
63    }
64
65    fn strategy_as_server<'a>(
66        &'a self,
67        connection_from: &dyn Host,
68        network_hint: PortNetworkHint,
69    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
70        if self.client_only {
71            anyhow::bail!("Localhost cannot be a server if it is client only")
72        }
73
74        if matches!(network_hint, PortNetworkHint::Auto)
75            && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
76        {
77            Ok((
78                ClientStrategy::UnixSocket(self.id),
79                Box::new(|_| BaseServerStrategy::UnixSocket),
80            ))
81        } else if matches!(
82            network_hint,
83            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
84        ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
85        {
86            Ok((
87                ClientStrategy::InternalTcpPort(self),
88                Box::new(move |_| {
89                    BaseServerStrategy::InternalTcpPort(match network_hint {
90                        PortNetworkHint::Auto => None,
91                        PortNetworkHint::TcpPort(port) => port,
92                    })
93                }),
94            ))
95        } else {
96            anyhow::bail!("Could not find a strategy to connect to localhost")
97        }
98    }
99
100    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
101        match typ {
102            ClientStrategy::UnixSocket(id) => {
103                #[cfg(unix)]
104                {
105                    self.id == id
106                }
107
108                #[cfg(not(unix))]
109                {
110                    let _ = id;
111                    false
112                }
113            }
114            ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
115            ClientStrategy::ForwardedTcpPort(_) => true,
116        }
117    }
118}
119
120struct LaunchedLocalhost;
121
122#[async_trait]
123impl LaunchedHost for LaunchedLocalhost {
124    fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
125        match bind_type {
126            BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
127            BaseServerStrategy::InternalTcpPort(port) => {
128                ServerBindConfig::TcpPort("127.0.0.1".to_string(), *port)
129            }
130            BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
131        }
132    }
133
134    async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
135        Ok(())
136    }
137
138    async fn launch_binary(
139        &self,
140        id: String,
141        binary: &BuildOutput,
142        args: &[String],
143        tracing: Option<TracingOptions>,
144    ) -> Result<Box<dyn LaunchedBinary>> {
145        let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
146            if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
147                // samply
148                ProgressTracker::println(
149                    format!("[{id} tracing] Profiling binary with `samply`.",),
150                );
151                let samply_outfile = tempfile::NamedTempFile::new()?;
152
153                let mut command = Command::new("samply");
154                command
155                    .arg("record")
156                    .arg("--save-only")
157                    .arg("--output")
158                    .arg(samply_outfile.as_ref())
159                    .arg(&binary.bin_path)
160                    .args(args);
161                (Some(samply_outfile), command)
162            } else if cfg!(target_family = "unix") {
163                // perf
164                ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
165                let perf_outfile = tempfile::NamedTempFile::new()?;
166
167                let mut command = Command::new("perf");
168                command
169                    .args([
170                        "record",
171                        "-F",
172                        &tracing.frequency.to_string(),
173                        "-e",
174                        "cycles:u",
175                        "--call-graph",
176                        "dwarf,65528",
177                        "-o",
178                    ])
179                    .arg(perf_outfile.as_ref())
180                    .arg(&binary.bin_path)
181                    .args(args);
182
183                (Some(perf_outfile), command)
184            } else {
185                bail!(
186                    "Unknown OS for perf/dtrace tracing: {}",
187                    std::env::consts::OS
188                );
189            }
190        } else {
191            let mut command = Command::new(&binary.bin_path);
192            command.args(args);
193            (None, command)
194        };
195
196        command
197            .stdin(Stdio::piped())
198            .stdout(Stdio::piped())
199            .stderr(Stdio::piped());
200
201        #[cfg(not(target_family = "unix"))]
202        command.kill_on_drop(true);
203
204        ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
205
206        let child = command.spawn().map_err(|e| {
207            let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
208                "Tracing executable not found, ensure it is installed"
209            } else {
210                "Failed to execute command"
211            };
212            anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
213        })?;
214
215        Ok(Box::new(LaunchedLocalhostBinary::new(
216            child,
217            id,
218            tracing,
219            maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
220        )))
221    }
222
223    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
224        Ok(*addr)
225    }
226}