hydro_deploy/localhost/
mod.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use anyhow::{Result, bail};
6use async_process::{Command, Stdio};
7use async_trait::async_trait;
8use hydro_deploy_integration::ServerBindConfig;
9
10use crate::progress::ProgressTracker;
11use crate::rust_crate::build::BuildOutput;
12use crate::rust_crate::tracing_options::TracingOptions;
13use crate::{
14    ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary, LaunchedHost,
15    ResourceBatch, ResourceResult, ServerStrategy,
16};
17
18pub mod launched_binary;
19pub use launched_binary::*;
20mod samply;
21
22#[derive(Debug)]
23pub struct LocalhostHost {
24    pub id: usize,
25    client_only: bool,
26}
27
28impl LocalhostHost {
29    pub fn new(id: usize) -> LocalhostHost {
30        LocalhostHost {
31            id,
32            client_only: false,
33        }
34    }
35
36    pub fn client_only(&self) -> LocalhostHost {
37        LocalhostHost {
38            id: self.id,
39            client_only: true,
40        }
41    }
42}
43
44#[async_trait]
45impl Host for LocalhostHost {
46    fn target_type(&self) -> HostTargetType {
47        HostTargetType::Local
48    }
49
50    fn request_port(&self, _bind_type: &ServerStrategy) {}
51    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
52    fn request_custom_binary(&self) {}
53
54    fn id(&self) -> usize {
55        self.id
56    }
57
58    fn as_any(&self) -> &dyn std::any::Any {
59        self
60    }
61
62    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
63        Some(Arc::new(LaunchedLocalhost))
64    }
65
66    fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
67        Arc::new(LaunchedLocalhost)
68    }
69
70    fn strategy_as_server<'a>(
71        &'a self,
72        connection_from: &dyn Host,
73    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
74        if self.client_only {
75            anyhow::bail!("Localhost cannot be a server if it is client only")
76        }
77
78        if connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
79            Ok((
80                ClientStrategy::UnixSocket(self.id),
81                Box::new(|_| ServerStrategy::UnixSocket),
82            ))
83        } else if connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self)) {
84            Ok((
85                ClientStrategy::InternalTcpPort(self),
86                Box::new(|_| ServerStrategy::InternalTcpPort),
87            ))
88        } else {
89            anyhow::bail!("Could not find a strategy to connect to localhost")
90        }
91    }
92
93    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
94        match typ {
95            ClientStrategy::UnixSocket(id) => {
96                #[cfg(unix)]
97                {
98                    self.id == id
99                }
100
101                #[cfg(not(unix))]
102                {
103                    let _ = id;
104                    false
105                }
106            }
107            ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
108            ClientStrategy::ForwardedTcpPort(_) => true,
109        }
110    }
111}
112
113struct LaunchedLocalhost;
114
115#[async_trait]
116impl LaunchedHost for LaunchedLocalhost {
117    fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig {
118        match bind_type {
119            ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
120            ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort("127.0.0.1".to_string()),
121            ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
122            ServerStrategy::Demux(demux) => {
123                let mut config_map = HashMap::new();
124                for (key, underlying) in demux {
125                    config_map.insert(*key, self.server_config(underlying));
126                }
127
128                ServerBindConfig::Demux(config_map)
129            }
130            ServerStrategy::Merge(merge) => {
131                let mut configs = vec![];
132                for underlying in merge {
133                    configs.push(self.server_config(underlying));
134                }
135
136                ServerBindConfig::Merge(configs)
137            }
138            ServerStrategy::Tagged(underlying, id) => {
139                ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
140            }
141            ServerStrategy::Null => ServerBindConfig::Null,
142        }
143    }
144
145    async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
146        Ok(())
147    }
148
149    async fn launch_binary(
150        &self,
151        id: String,
152        binary: &BuildOutput,
153        args: &[String],
154        tracing: Option<TracingOptions>,
155    ) -> Result<Box<dyn LaunchedBinary>> {
156        let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
157            if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
158                // samply
159                ProgressTracker::println(
160                    format!("[{id} tracing] Profiling binary with `samply`.",),
161                );
162                let samply_outfile = tempfile::NamedTempFile::new()?;
163
164                let mut command = Command::new("samply");
165                command
166                    .arg("record")
167                    .arg("--save-only")
168                    .arg("--output")
169                    .arg(samply_outfile.as_ref())
170                    .arg(&binary.bin_path)
171                    .args(args);
172                (Some(samply_outfile), command)
173            } else if cfg!(target_family = "unix") {
174                // perf
175                ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
176                let perf_outfile = tempfile::NamedTempFile::new()?;
177
178                let mut command = Command::new("perf");
179                command
180                    .args([
181                        "record",
182                        "-F",
183                        &tracing.frequency.to_string(),
184                        "-e",
185                        "cycles:u",
186                        "--call-graph",
187                        "dwarf,65528",
188                        "-o",
189                    ])
190                    .arg(perf_outfile.as_ref())
191                    .arg(&binary.bin_path)
192                    .args(args);
193
194                (Some(perf_outfile), command)
195            } else {
196                bail!(
197                    "Unknown OS for perf/dtrace tracing: {}",
198                    std::env::consts::OS
199                );
200            }
201        } else {
202            let mut command = Command::new(&binary.bin_path);
203            command.args(args);
204            (None, command)
205        };
206
207        command
208            .stdin(Stdio::piped())
209            .stdout(Stdio::piped())
210            .stderr(Stdio::piped());
211
212        #[cfg(not(target_family = "unix"))]
213        command.kill_on_drop(true);
214
215        ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
216
217        let child = command.spawn().map_err(|e| {
218            let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
219                "Tracing executable not found, ensure it is installed"
220            } else {
221                "Failed to execute command"
222            };
223            anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
224        })?;
225
226        Ok(Box::new(LaunchedLocalhostBinary::new(
227            child,
228            id,
229            tracing,
230            maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
231        )))
232    }
233
234    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
235        Ok(*addr)
236    }
237}