hydro_deploy/localhost/
launched_binary.rs
1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::{Arc, Mutex};
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::dtrace::Folder as DtraceFolder;
13use inferno::collapse::perf::Folder as PerfFolder;
14use tempfile::NamedTempFile;
15use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18use tokio_util::io::SyncIoBridge;
19
20use crate::progress::ProgressTracker;
21use crate::rust_crate::flamegraph::handle_fold_data;
22use crate::rust_crate::tracing_options::TracingOptions;
23use crate::ssh::PrefixFilteredChannel;
24use crate::util::prioritized_broadcast;
25use crate::{LaunchedBinary, TracingResults};
26
27pub(super) struct TracingDataLocal {
28 pub(super) outfile: NamedTempFile,
29}
30
31pub struct LaunchedLocalhostBinary {
32 child: Mutex<async_process::Child>,
33 tracing_config: Option<TracingOptions>,
34 tracing_data_local: Option<TracingDataLocal>,
35 tracing_results: Option<TracingResults>,
36 stdin_sender: mpsc::UnboundedSender<String>,
37 stdout_deploy_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
38 stdout_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
39 stderr_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
40}
41
42#[cfg(unix)]
43impl Drop for LaunchedLocalhostBinary {
44 fn drop(&mut self) {
45 let mut child = self.child.lock().unwrap();
46
47 if let Ok(Some(_)) = child.try_status() {
48 return;
49 }
50
51 let pid = child.id();
52 if let Err(e) = nix::sys::signal::kill(
53 nix::unistd::Pid::from_raw(pid as i32),
54 nix::sys::signal::SIGTERM,
55 ) {
56 ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
57 }
58 }
59}
60
61impl LaunchedLocalhostBinary {
62 pub(super) fn new(
63 mut child: async_process::Child,
64 id: String,
65 tracing_config: Option<TracingOptions>,
66 tracing_data_local: Option<TracingDataLocal>,
67 ) -> Self {
68 let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
69 let mut stdin = child.stdin.take().unwrap();
70 tokio::spawn(async move {
71 while let Some(line) = stdin_receiver.recv().await {
72 if stdin.write_all(line.as_bytes()).await.is_err() {
73 break;
74 }
75
76 stdin.flush().await.ok();
77 }
78 });
79
80 let id_clone = id.clone();
81 let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast(
82 FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
83 move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
84 );
85 let (_, stderr_receivers) = prioritized_broadcast(
86 FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
87 move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
88 );
89
90 Self {
91 child: Mutex::new(child),
92 tracing_config,
93 tracing_data_local,
94 tracing_results: None,
95 stdin_sender,
96 stdout_deploy_receivers,
97 stdout_receivers,
98 stderr_receivers,
99 }
100 }
101}
102
103#[async_trait]
104impl LaunchedBinary for LaunchedLocalhostBinary {
105 fn stdin(&self) -> mpsc::UnboundedSender<String> {
106 self.stdin_sender.clone()
107 }
108
109 fn deploy_stdout(&self) -> oneshot::Receiver<String> {
110 let mut receivers = self.stdout_deploy_receivers.lock().unwrap();
111
112 if receivers.is_some() {
113 panic!("Only one deploy stdout receiver is allowed at a time");
114 }
115
116 let (sender, receiver) = oneshot::channel::<String>();
117 *receivers = Some(sender);
118 receiver
119 }
120
121 fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
122 let mut receivers = self.stdout_receivers.lock().unwrap();
123 let (sender, receiver) = mpsc::unbounded_channel::<String>();
124 receivers.push((None, sender));
125 receiver
126 }
127
128 fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
129 let mut receivers = self.stderr_receivers.lock().unwrap();
130 let (sender, receiver) = mpsc::unbounded_channel::<String>();
131 receivers.push((None, sender));
132 receiver
133 }
134
135 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
136 let mut receivers = self.stdout_receivers.lock().unwrap();
137 let (sender, receiver) = mpsc::unbounded_channel::<String>();
138 receivers.push((Some(prefix), sender));
139 receiver
140 }
141
142 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
143 let mut receivers = self.stderr_receivers.lock().unwrap();
144 let (sender, receiver) = mpsc::unbounded_channel::<String>();
145 receivers.push((Some(prefix), sender));
146 receiver
147 }
148
149 fn tracing_results(&self) -> Option<&TracingResults> {
150 self.tracing_results.as_ref()
151 }
152
153 fn exit_code(&self) -> Option<i32> {
154 self.child
155 .lock()
156 .unwrap()
157 .try_status()
158 .ok()
159 .flatten()
160 .map(exit_code)
161 }
162
163 async fn wait(&mut self) -> Result<i32> {
164 Ok(exit_code(self.child.get_mut().unwrap().status().await?))
165 }
166
167 async fn stop(&mut self) -> Result<()> {
168 if let Err(err) = self.child.get_mut().unwrap().kill() {
169 if !matches!(err.kind(), std::io::ErrorKind::InvalidInput) {
170 Err(err)?;
171 }
172 }
173
174 if let Some(tracing_config) = self.tracing_config.as_ref() {
176 let tracing_data = self.tracing_data_local.take().unwrap();
177
178 if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
179 if let Some(dtrace_outfile) = tracing_config.dtrace_outfile.as_ref() {
180 std::fs::copy(&tracing_data.outfile, dtrace_outfile)?;
181 }
182 } else if cfg!(target_family = "unix") {
183 if let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref() {
184 std::fs::copy(&tracing_data.outfile, perf_outfile)?;
185 }
186 }
187
188 let fold_data = if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
189 let mut fold_er = DtraceFolder::from(
190 tracing_config
191 .fold_dtrace_options
192 .clone()
193 .unwrap_or_default(),
194 );
195
196 let fold_data =
197 ProgressTracker::leaf("fold dtrace output".to_owned(), async move {
198 let mut fold_data = Vec::new();
199 fold_er.collapse_file(Some(tracing_data.outfile), &mut fold_data)?;
200 Result::<_>::Ok(fold_data)
201 })
202 .await?;
203 fold_data
204 } else if cfg!(target_family = "unix") {
205 let mut perf_script = Command::new("perf")
207 .args(["script", "--symfs=/", "-i"])
208 .arg(tracing_data.outfile.path())
209 .stdout(Stdio::piped())
210 .stderr(Stdio::piped())
211 .spawn()?;
212
213 let stdout = perf_script.stdout.take().unwrap().compat();
214 let mut stderr_lines =
215 TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
216
217 let mut fold_er =
218 PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
219
220 let ((), fold_data, ()) = tokio::try_join!(
222 async move {
223 while let Ok(Some(s)) = stderr_lines.next_line().await {
225 ProgressTracker::println(format!("[perf script stderr] {s}"));
226 }
227 Result::<_>::Ok(())
228 },
229 async move {
230 tokio::task::spawn_blocking(move || {
232 let mut fold_data = Vec::new();
233 fold_er.collapse(
234 SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
235 &mut fold_data,
236 )?;
237 Ok(fold_data)
238 })
239 .await?
240 },
241 async move {
242 perf_script.status().await?;
244 Ok(())
245 },
246 )?;
247 fold_data
248 } else {
249 bail!(
250 "Unknown OS for perf/dtrace tracing: {}",
251 std::env::consts::OS
252 );
253 };
254
255 self.tracing_results = Some(TracingResults {
256 folded_data: fold_data.clone(),
257 });
258
259 handle_fold_data(tracing_config, fold_data).await?;
260 };
261
262 Ok(())
263 }
264}
265
266fn exit_code(c: ExitStatus) -> i32 {
267 #[cfg(unix)]
268 return c.code().or(c.signal()).unwrap();
269 #[cfg(not(unix))]
270 return c.code().unwrap();
271}