hydro_deploy/localhost/
launched_binary.rs1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::Mutex;
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::dtrace::Folder as DtraceFolder;
13use inferno::collapse::perf::Folder as PerfFolder;
14use tempfile::NamedTempFile;
15use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18use tokio_util::io::SyncIoBridge;
19
20use super::samply::{FxProfile, samply_to_folded};
21use crate::progress::ProgressTracker;
22use crate::rust_crate::flamegraph::handle_fold_data;
23use crate::rust_crate::tracing_options::TracingOptions;
24use crate::util::{PriorityBroadcast, prioritized_broadcast};
25use crate::{LaunchedBinary, TracingResults};
26
27pub(super) struct TracingDataLocal {
28 pub(super) outfile: NamedTempFile,
29}
30
31pub struct LaunchedLocalhostBinary {
32 child: Mutex<async_process::Child>,
33 tracing_config: Option<TracingOptions>,
34 tracing_data_local: Option<TracingDataLocal>,
35 tracing_results: Option<TracingResults>,
36 stdin_sender: mpsc::UnboundedSender<String>,
37 stdout_broadcast: PriorityBroadcast,
38 stderr_broadcast: PriorityBroadcast,
39}
40
41#[cfg(unix)]
42impl Drop for LaunchedLocalhostBinary {
43 fn drop(&mut self) {
44 let mut child = self.child.lock().unwrap();
45
46 if let Ok(Some(_)) = child.try_status() {
47 return;
48 }
49
50 let pid = child.id();
51 if let Err(e) = nix::sys::signal::kill(
52 nix::unistd::Pid::from_raw(pid as i32),
53 nix::sys::signal::SIGTERM,
54 ) {
55 ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
56 }
57 }
58}
59
60impl LaunchedLocalhostBinary {
61 pub(super) fn new(
62 mut child: async_process::Child,
63 id: String,
64 tracing_config: Option<TracingOptions>,
65 tracing_data_local: Option<TracingDataLocal>,
66 ) -> Self {
67 let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
68 let mut stdin = child.stdin.take().unwrap();
69 tokio::spawn(async move {
70 while let Some(line) = stdin_receiver.recv().await {
71 if stdin.write_all(line.as_bytes()).await.is_err() {
72 break;
73 }
74
75 stdin.flush().await.ok();
76 }
77 });
78
79 let id_clone = id.clone();
80 let stdout_broadcast = prioritized_broadcast(
81 FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
82 move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
83 );
84 let stderr_broadcast = prioritized_broadcast(
85 FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
86 move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
87 );
88
89 Self {
90 child: Mutex::new(child),
91 tracing_config,
92 tracing_data_local,
93 tracing_results: None,
94 stdin_sender,
95 stdout_broadcast,
96 stderr_broadcast,
97 }
98 }
99}
100
101#[async_trait]
102impl LaunchedBinary for LaunchedLocalhostBinary {
103 fn stdin(&self) -> mpsc::UnboundedSender<String> {
104 self.stdin_sender.clone()
105 }
106
107 fn deploy_stdout(&self) -> oneshot::Receiver<String> {
108 self.stdout_broadcast.receive_priority()
109 }
110
111 fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
112 self.stdout_broadcast.receive(None)
113 }
114
115 fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
116 self.stderr_broadcast.receive(None)
117 }
118
119 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
120 self.stdout_broadcast.receive(Some(prefix))
121 }
122
123 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
124 self.stderr_broadcast.receive(Some(prefix))
125 }
126
127 fn tracing_results(&self) -> Option<&TracingResults> {
128 self.tracing_results.as_ref()
129 }
130
131 fn exit_code(&self) -> Option<i32> {
132 self.child
133 .lock()
134 .unwrap()
135 .try_status()
136 .ok()
137 .flatten()
138 .map(exit_code)
139 }
140
141 async fn wait(&mut self) -> Result<i32> {
142 Ok(exit_code(self.child.get_mut().unwrap().status().await?))
143 }
144
145 async fn stop(&mut self) -> Result<()> {
146 if let Err(err) = self.child.get_mut().unwrap().kill()
147 && !matches!(err.kind(), std::io::ErrorKind::InvalidInput)
148 {
149 Err(err)?;
150 }
151
152 if let Some(tracing_config) = self.tracing_config.as_ref()
154 && self.tracing_results.is_none()
155 {
156 let tracing_data = self.tracing_data_local.take().unwrap();
157
158 if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
159 if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
160 std::fs::copy(&tracing_data.outfile, samply_outfile)?;
161 }
162 } else if cfg!(target_family = "unix")
163 && let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref()
164 {
165 std::fs::copy(&tracing_data.outfile, perf_outfile)?;
166 }
167
168 let fold_data = if cfg!(target_os = "macos") {
169 let deserializer = &mut serde_json::Deserializer::from_reader(std::fs::File::open(
170 tracing_data.outfile.path(),
171 )?);
172 let loaded = serde_path_to_error::deserialize::<_, FxProfile>(deserializer)?;
173
174 ProgressTracker::leaf("processing samply", samply_to_folded(loaded))
175 .await
176 .into()
177 } else if cfg!(target_family = "windows") {
178 let mut fold_er = DtraceFolder::from(
179 tracing_config
180 .fold_dtrace_options
181 .clone()
182 .unwrap_or_default(),
183 );
184
185 let fold_data =
186 ProgressTracker::leaf("fold dtrace output".to_owned(), async move {
187 let mut fold_data = Vec::new();
188 fold_er.collapse_file(Some(tracing_data.outfile), &mut fold_data)?;
189 Result::<_>::Ok(fold_data)
190 })
191 .await?;
192 fold_data
193 } else if cfg!(target_family = "unix") {
194 let mut perf_script = Command::new("perf")
196 .args(["script", "--symfs=/", "-i"])
197 .arg(tracing_data.outfile.path())
198 .stdout(Stdio::piped())
199 .stderr(Stdio::piped())
200 .spawn()?;
201
202 let stdout = perf_script.stdout.take().unwrap().compat();
203 let mut stderr_lines =
204 TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
205
206 let mut fold_er =
207 PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
208
209 let ((), fold_data, ()) = tokio::try_join!(
211 async move {
212 while let Ok(Some(s)) = stderr_lines.next_line().await {
214 ProgressTracker::println(format!("[perf script stderr] {s}"));
215 }
216 Result::<_>::Ok(())
217 },
218 async move {
219 tokio::task::spawn_blocking(move || {
221 let mut fold_data = Vec::new();
222 fold_er.collapse(
223 SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
224 &mut fold_data,
225 )?;
226 Ok(fold_data)
227 })
228 .await?
229 },
230 async move {
231 perf_script.status().await?;
233 Ok(())
234 },
235 )?;
236 fold_data
237 } else {
238 bail!(
239 "Unknown OS for perf/dtrace tracing: {}",
240 std::env::consts::OS
241 );
242 };
243
244 handle_fold_data(tracing_config, fold_data.clone()).await?;
245
246 self.tracing_results = Some(TracingResults {
247 folded_data: fold_data,
248 });
249 };
250
251 Ok(())
252 }
253}
254
255fn exit_code(c: ExitStatus) -> i32 {
256 #[cfg(unix)]
257 return c.code().or(c.signal()).unwrap();
258 #[cfg(not(unix))]
259 return c.code().unwrap();
260}