hydro_deploy/localhost/
launched_binary.rs1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::ExitStatus;
4#[cfg(feature = "profile-folding")]
5use std::process::Stdio;
6#[cfg(feature = "profile-folding")]
7use std::sync::OnceLock;
8
9use anyhow::Result;
10#[cfg(feature = "profile-folding")]
11use anyhow::bail;
12#[cfg(feature = "profile-folding")]
13use async_process::Command;
14use async_trait::async_trait;
15use futures::io::BufReader as FuturesBufReader;
16use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
17#[cfg(feature = "profile-folding")]
18use inferno::collapse::Collapse;
19#[cfg(feature = "profile-folding")]
20use inferno::collapse::perf::Folder as PerfFolder;
21use tempfile::NamedTempFile;
22#[cfg(feature = "profile-folding")]
23use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
24use tokio::sync::{mpsc, oneshot};
25#[cfg(feature = "profile-folding")]
26use tokio_util::compat::FuturesAsyncReadCompatExt;
27#[cfg(feature = "profile-folding")]
28use tokio_util::io::SyncIoBridge;
29
30#[cfg(feature = "profile-folding")]
31#[cfg(any(target_os = "macos", target_family = "windows"))]
32use super::samply::samply_to_folded;
33use crate::LaunchedBinary;
34#[cfg(feature = "profile-folding")]
35use crate::TracingResults;
36use crate::progress::ProgressTracker;
37#[cfg(feature = "profile-folding")]
38use crate::rust_crate::flamegraph::handle_fold_data;
39use crate::rust_crate::tracing_options::TracingOptions;
40use crate::util::{PriorityBroadcast, prioritized_broadcast};
41
42pub(super) struct TracingDataLocal {
43 pub(super) outfile: NamedTempFile,
44}
45
46pub struct LaunchedLocalhostBinary {
47 child: tokio::sync::Mutex<async_process::Child>,
49 tracing_config: Option<TracingOptions>,
50 tracing_data_local: std::sync::Mutex<Option<TracingDataLocal>>,
51 #[cfg(feature = "profile-folding")]
52 tracing_results: OnceLock<TracingResults>,
53 stdin_sender: mpsc::UnboundedSender<String>,
54 stdout_broadcast: PriorityBroadcast,
55 stderr_broadcast: PriorityBroadcast,
56}
57
58#[cfg(unix)]
59impl Drop for LaunchedLocalhostBinary {
60 fn drop(&mut self) {
61 let child = self.child.get_mut();
62
63 if let Ok(Some(_)) = child.try_status() {
64 return;
65 }
66
67 let pid = child.id();
68 if let Err(e) = nix::sys::signal::kill(
69 nix::unistd::Pid::from_raw(pid as i32),
70 nix::sys::signal::SIGTERM,
71 ) {
72 ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
73 }
74 }
75}
76
77impl LaunchedLocalhostBinary {
78 pub(super) fn new(
79 mut child: async_process::Child,
80 id: String,
81 tracing_config: Option<TracingOptions>,
82 tracing_data_local: Option<TracingDataLocal>,
83 ) -> Self {
84 let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
85 let mut stdin = child.stdin.take().unwrap();
86 tokio::spawn(async move {
87 while let Some(line) = stdin_receiver.recv().await {
88 if stdin.write_all(line.as_bytes()).await.is_err() {
89 break;
90 }
91
92 stdin.flush().await.ok();
93 }
94 });
95
96 let id_clone = id.clone();
97 let stdout_broadcast = prioritized_broadcast(
98 FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
99 move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
100 );
101 let stderr_broadcast = prioritized_broadcast(
102 FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
103 move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
104 );
105
106 Self {
107 child: tokio::sync::Mutex::new(child),
108 tracing_config,
109 tracing_data_local: std::sync::Mutex::new(tracing_data_local),
110 #[cfg(feature = "profile-folding")]
111 tracing_results: OnceLock::new(),
112 stdin_sender,
113 stdout_broadcast,
114 stderr_broadcast,
115 }
116 }
117}
118
119#[async_trait]
120impl LaunchedBinary for LaunchedLocalhostBinary {
121 fn stdin(&self) -> mpsc::UnboundedSender<String> {
122 self.stdin_sender.clone()
123 }
124
125 fn deploy_stdout(&self) -> oneshot::Receiver<String> {
126 self.stdout_broadcast.receive_priority()
127 }
128
129 fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
130 self.stdout_broadcast.receive(None)
131 }
132
133 fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
134 self.stderr_broadcast.receive(None)
135 }
136
137 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
138 self.stdout_broadcast.receive(Some(prefix))
139 }
140
141 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
142 self.stderr_broadcast.receive(Some(prefix))
143 }
144
145 #[cfg(feature = "profile-folding")]
146 fn tracing_results(&self) -> Option<&TracingResults> {
147 self.tracing_results.get()
148 }
149
150 fn exit_code(&self) -> Option<i32> {
151 self.child
152 .try_lock()
153 .ok()
154 .and_then(|mut child| child.try_status().ok())
155 .flatten()
156 .map(exit_code)
157 }
158
159 async fn wait(&self) -> Result<i32> {
160 Ok(exit_code(self.child.lock().await.status().await?))
161 }
162
163 async fn stop(&self) -> Result<()> {
164 if let Err(err) = { self.child.lock().await.kill() }
165 && !matches!(err.kind(), std::io::ErrorKind::InvalidInput)
166 {
167 Err(err)?;
168 }
169
170 if let Some(tracing_config) = self.tracing_config.as_ref() {
172 #[cfg(feature = "profile-folding")]
173 assert!(
174 self.tracing_results.get().is_none(),
175 "`tracing_results` already set! Was `stop()` called twice? This is a bug."
176 );
177 let tracing_data =
178 {
179 self.tracing_data_local.lock().unwrap().take().expect(
180 "`tracing_data_local` empty, was `stop()` called twice? This is a bug.",
181 )
182 };
183
184 if cfg!(any(target_os = "macos", target_family = "windows")) {
185 if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
186 std::fs::copy(&tracing_data.outfile, samply_outfile)?;
187 }
188 } else if cfg!(target_family = "unix")
189 && let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref()
190 {
191 std::fs::copy(&tracing_data.outfile, perf_outfile)?;
192 }
193
194 #[cfg(feature = "profile-folding")]
195 let fold_data = if cfg!(any(target_os = "macos", target_family = "windows")) {
196 #[cfg(any(target_os = "macos", target_family = "windows"))]
197 {
198 let loaded =
199 serde_json::from_reader(std::fs::File::open(tracing_data.outfile.path())?)?;
200
201 ProgressTracker::leaf("processing samply", samply_to_folded(loaded))
202 .await
203 .into()
204 }
205
206 #[cfg(not(any(target_os = "macos", target_family = "windows")))]
207 {
208 unreachable!()
209 }
210 } else if cfg!(target_family = "unix") {
211 let mut perf_script = Command::new("perf")
213 .args(["script", "--symfs=/", "-i"])
214 .arg(tracing_data.outfile.path())
215 .stdout(Stdio::piped())
216 .stderr(Stdio::piped())
217 .spawn()?;
218
219 let stdout = perf_script.stdout.take().unwrap().compat();
220 let mut stderr_lines =
221 TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
222
223 let mut fold_er =
224 PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
225
226 let ((), fold_data, ()) = tokio::try_join!(
228 async move {
229 while let Ok(Some(s)) = stderr_lines.next_line().await {
231 ProgressTracker::println(format!("[perf script stderr] {s}"));
232 }
233 Result::<_>::Ok(())
234 },
235 async move {
236 tokio::task::spawn_blocking(move || {
238 let mut fold_data = Vec::new();
239 fold_er.collapse(
240 SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
241 &mut fold_data,
242 )?;
243 Ok(fold_data)
244 })
245 .await?
246 },
247 async move {
248 perf_script.status().await?;
250 Ok(())
251 },
252 )?;
253 fold_data
254 } else {
255 bail!(
256 "Unknown OS for samply/perf tracing: {}",
257 std::env::consts::OS
258 );
259 };
260
261 #[cfg(feature = "profile-folding")]
262 handle_fold_data(tracing_config, fold_data.clone()).await?;
263
264 #[cfg(feature = "profile-folding")]
265 self.tracing_results
266 .set(TracingResults {
267 folded_data: fold_data,
268 })
269 .expect("`tracing_results` already set! This is a bug.");
270 };
271
272 Ok(())
273 }
274}
275
276fn exit_code(c: ExitStatus) -> i32 {
277 #[cfg(unix)]
278 return c.code().or(c.signal()).unwrap();
279 #[cfg(not(unix))]
280 return c.code().unwrap();
281}