hydro_deploy/localhost/
launched_binary.rs
1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::{Arc, Mutex};
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::dtrace::Folder as DtraceFolder;
13use inferno::collapse::perf::Folder as PerfFolder;
14use tempfile::NamedTempFile;
15use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18use tokio_util::io::SyncIoBridge;
19
20use super::samply::{FxProfile, samply_to_folded};
21use crate::progress::ProgressTracker;
22use crate::rust_crate::flamegraph::handle_fold_data;
23use crate::rust_crate::tracing_options::TracingOptions;
24use crate::ssh::PrefixFilteredChannel;
25use crate::util::prioritized_broadcast;
26use crate::{LaunchedBinary, TracingResults};
27
28pub(super) struct TracingDataLocal {
29 pub(super) outfile: NamedTempFile,
30}
31
32pub struct LaunchedLocalhostBinary {
33 child: Mutex<async_process::Child>,
34 tracing_config: Option<TracingOptions>,
35 tracing_data_local: Option<TracingDataLocal>,
36 tracing_results: Option<TracingResults>,
37 stdin_sender: mpsc::UnboundedSender<String>,
38 stdout_deploy_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
39 stdout_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
40 stderr_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
41}
42
43#[cfg(unix)]
44impl Drop for LaunchedLocalhostBinary {
45 fn drop(&mut self) {
46 let mut child = self.child.lock().unwrap();
47
48 if let Ok(Some(_)) = child.try_status() {
49 return;
50 }
51
52 let pid = child.id();
53 if let Err(e) = nix::sys::signal::kill(
54 nix::unistd::Pid::from_raw(pid as i32),
55 nix::sys::signal::SIGTERM,
56 ) {
57 ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
58 }
59 }
60}
61
62impl LaunchedLocalhostBinary {
63 pub(super) fn new(
64 mut child: async_process::Child,
65 id: String,
66 tracing_config: Option<TracingOptions>,
67 tracing_data_local: Option<TracingDataLocal>,
68 ) -> Self {
69 let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
70 let mut stdin = child.stdin.take().unwrap();
71 tokio::spawn(async move {
72 while let Some(line) = stdin_receiver.recv().await {
73 if stdin.write_all(line.as_bytes()).await.is_err() {
74 break;
75 }
76
77 stdin.flush().await.ok();
78 }
79 });
80
81 let id_clone = id.clone();
82 let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast(
83 FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
84 move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
85 );
86 let (_, stderr_receivers) = prioritized_broadcast(
87 FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
88 move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
89 );
90
91 Self {
92 child: Mutex::new(child),
93 tracing_config,
94 tracing_data_local,
95 tracing_results: None,
96 stdin_sender,
97 stdout_deploy_receivers,
98 stdout_receivers,
99 stderr_receivers,
100 }
101 }
102}
103
104#[async_trait]
105impl LaunchedBinary for LaunchedLocalhostBinary {
106 fn stdin(&self) -> mpsc::UnboundedSender<String> {
107 self.stdin_sender.clone()
108 }
109
110 fn deploy_stdout(&self) -> oneshot::Receiver<String> {
111 let mut receivers = self.stdout_deploy_receivers.lock().unwrap();
112
113 if receivers.is_some() {
114 panic!("Only one deploy stdout receiver is allowed at a time");
115 }
116
117 let (sender, receiver) = oneshot::channel::<String>();
118 *receivers = Some(sender);
119 receiver
120 }
121
122 fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
123 let mut receivers = self.stdout_receivers.lock().unwrap();
124 let (sender, receiver) = mpsc::unbounded_channel::<String>();
125 receivers.push((None, sender));
126 receiver
127 }
128
129 fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
130 let mut receivers = self.stderr_receivers.lock().unwrap();
131 let (sender, receiver) = mpsc::unbounded_channel::<String>();
132 receivers.push((None, sender));
133 receiver
134 }
135
136 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
137 let mut receivers = self.stdout_receivers.lock().unwrap();
138 let (sender, receiver) = mpsc::unbounded_channel::<String>();
139 receivers.push((Some(prefix), sender));
140 receiver
141 }
142
143 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
144 let mut receivers = self.stderr_receivers.lock().unwrap();
145 let (sender, receiver) = mpsc::unbounded_channel::<String>();
146 receivers.push((Some(prefix), sender));
147 receiver
148 }
149
150 fn tracing_results(&self) -> Option<&TracingResults> {
151 self.tracing_results.as_ref()
152 }
153
154 fn exit_code(&self) -> Option<i32> {
155 self.child
156 .lock()
157 .unwrap()
158 .try_status()
159 .ok()
160 .flatten()
161 .map(exit_code)
162 }
163
164 async fn wait(&mut self) -> Result<i32> {
165 Ok(exit_code(self.child.get_mut().unwrap().status().await?))
166 }
167
168 async fn stop(&mut self) -> Result<()> {
169 if let Err(err) = self.child.get_mut().unwrap().kill() {
170 if !matches!(err.kind(), std::io::ErrorKind::InvalidInput) {
171 Err(err)?;
172 }
173 }
174
175 if let Some(tracing_config) = self.tracing_config.as_ref() {
177 if self.tracing_results.is_none() {
178 let tracing_data = self.tracing_data_local.take().unwrap();
179
180 if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
181 if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
182 std::fs::copy(&tracing_data.outfile, samply_outfile)?;
183 }
184 } else if cfg!(target_family = "unix") {
185 if let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref() {
186 std::fs::copy(&tracing_data.outfile, perf_outfile)?;
187 }
188 }
189
190 let fold_data = if cfg!(target_os = "macos") {
191 let deserializer = &mut serde_json::Deserializer::from_reader(
192 std::fs::File::open(tracing_data.outfile.path())?,
193 );
194 let loaded = serde_path_to_error::deserialize::<_, FxProfile>(deserializer)?;
195
196 samply_to_folded(loaded).await.into()
197 } else if cfg!(target_family = "windows") {
198 let mut fold_er = DtraceFolder::from(
199 tracing_config
200 .fold_dtrace_options
201 .clone()
202 .unwrap_or_default(),
203 );
204
205 let fold_data =
206 ProgressTracker::leaf("fold dtrace output".to_owned(), async move {
207 let mut fold_data = Vec::new();
208 fold_er.collapse_file(Some(tracing_data.outfile), &mut fold_data)?;
209 Result::<_>::Ok(fold_data)
210 })
211 .await?;
212 fold_data
213 } else if cfg!(target_family = "unix") {
214 let mut perf_script = Command::new("perf")
216 .args(["script", "--symfs=/", "-i"])
217 .arg(tracing_data.outfile.path())
218 .stdout(Stdio::piped())
219 .stderr(Stdio::piped())
220 .spawn()?;
221
222 let stdout = perf_script.stdout.take().unwrap().compat();
223 let mut stderr_lines =
224 TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
225
226 let mut fold_er = PerfFolder::from(
227 tracing_config.fold_perf_options.clone().unwrap_or_default(),
228 );
229
230 let ((), fold_data, ()) = tokio::try_join!(
232 async move {
233 while let Ok(Some(s)) = stderr_lines.next_line().await {
235 ProgressTracker::println(format!("[perf script stderr] {s}"));
236 }
237 Result::<_>::Ok(())
238 },
239 async move {
240 tokio::task::spawn_blocking(move || {
242 let mut fold_data = Vec::new();
243 fold_er.collapse(
244 SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
245 &mut fold_data,
246 )?;
247 Ok(fold_data)
248 })
249 .await?
250 },
251 async move {
252 perf_script.status().await?;
254 Ok(())
255 },
256 )?;
257 fold_data
258 } else {
259 bail!(
260 "Unknown OS for perf/dtrace tracing: {}",
261 std::env::consts::OS
262 );
263 };
264
265 handle_fold_data(tracing_config, fold_data.clone()).await?;
266
267 self.tracing_results = Some(TracingResults {
268 folded_data: fold_data,
269 });
270 }
271 };
272
273 Ok(())
274 }
275}
276
277fn exit_code(c: ExitStatus) -> i32 {
278 #[cfg(unix)]
279 return c.code().or(c.signal()).unwrap();
280 #[cfg(not(unix))]
281 return c.code().unwrap();
282}