hydro_deploy/localhost/
launched_binary.rs

1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::Mutex;
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::dtrace::Folder as DtraceFolder;
13use inferno::collapse::perf::Folder as PerfFolder;
14use tempfile::NamedTempFile;
15use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18use tokio_util::io::SyncIoBridge;
19
20use super::samply::{FxProfile, samply_to_folded};
21use crate::progress::ProgressTracker;
22use crate::rust_crate::flamegraph::handle_fold_data;
23use crate::rust_crate::tracing_options::TracingOptions;
24use crate::util::{PriorityBroadcast, prioritized_broadcast};
25use crate::{LaunchedBinary, TracingResults};
26
27pub(super) struct TracingDataLocal {
28    pub(super) outfile: NamedTempFile,
29}
30
31pub struct LaunchedLocalhostBinary {
32    child: Mutex<async_process::Child>,
33    tracing_config: Option<TracingOptions>,
34    tracing_data_local: Option<TracingDataLocal>,
35    tracing_results: Option<TracingResults>,
36    stdin_sender: mpsc::UnboundedSender<String>,
37    stdout_broadcast: PriorityBroadcast,
38    stderr_broadcast: PriorityBroadcast,
39}
40
41#[cfg(unix)]
42impl Drop for LaunchedLocalhostBinary {
43    fn drop(&mut self) {
44        let mut child = self.child.lock().unwrap();
45
46        if let Ok(Some(_)) = child.try_status() {
47            return;
48        }
49
50        let pid = child.id();
51        if let Err(e) = nix::sys::signal::kill(
52            nix::unistd::Pid::from_raw(pid as i32),
53            nix::sys::signal::SIGTERM,
54        ) {
55            ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
56        }
57    }
58}
59
60impl LaunchedLocalhostBinary {
61    pub(super) fn new(
62        mut child: async_process::Child,
63        id: String,
64        tracing_config: Option<TracingOptions>,
65        tracing_data_local: Option<TracingDataLocal>,
66    ) -> Self {
67        let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
68        let mut stdin = child.stdin.take().unwrap();
69        tokio::spawn(async move {
70            while let Some(line) = stdin_receiver.recv().await {
71                if stdin.write_all(line.as_bytes()).await.is_err() {
72                    break;
73                }
74
75                stdin.flush().await.ok();
76            }
77        });
78
79        let id_clone = id.clone();
80        let stdout_broadcast = prioritized_broadcast(
81            FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
82            move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
83        );
84        let stderr_broadcast = prioritized_broadcast(
85            FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
86            move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
87        );
88
89        Self {
90            child: Mutex::new(child),
91            tracing_config,
92            tracing_data_local,
93            tracing_results: None,
94            stdin_sender,
95            stdout_broadcast,
96            stderr_broadcast,
97        }
98    }
99}
100
101#[async_trait]
102impl LaunchedBinary for LaunchedLocalhostBinary {
103    fn stdin(&self) -> mpsc::UnboundedSender<String> {
104        self.stdin_sender.clone()
105    }
106
107    fn deploy_stdout(&self) -> oneshot::Receiver<String> {
108        self.stdout_broadcast.receive_priority()
109    }
110
111    fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
112        self.stdout_broadcast.receive(None)
113    }
114
115    fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
116        self.stderr_broadcast.receive(None)
117    }
118
119    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
120        self.stdout_broadcast.receive(Some(prefix))
121    }
122
123    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
124        self.stderr_broadcast.receive(Some(prefix))
125    }
126
127    fn tracing_results(&self) -> Option<&TracingResults> {
128        self.tracing_results.as_ref()
129    }
130
131    fn exit_code(&self) -> Option<i32> {
132        self.child
133            .lock()
134            .unwrap()
135            .try_status()
136            .ok()
137            .flatten()
138            .map(exit_code)
139    }
140
141    async fn wait(&mut self) -> Result<i32> {
142        Ok(exit_code(self.child.get_mut().unwrap().status().await?))
143    }
144
145    async fn stop(&mut self) -> Result<()> {
146        if let Err(err) = self.child.get_mut().unwrap().kill()
147            && !matches!(err.kind(), std::io::ErrorKind::InvalidInput)
148        {
149            Err(err)?;
150        }
151
152        // Run perf post-processing and download perf output.
153        if let Some(tracing_config) = self.tracing_config.as_ref()
154            && self.tracing_results.is_none()
155        {
156            let tracing_data = self.tracing_data_local.take().unwrap();
157
158            if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
159                if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
160                    std::fs::copy(&tracing_data.outfile, samply_outfile)?;
161                }
162            } else if cfg!(target_family = "unix")
163                && let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref()
164            {
165                std::fs::copy(&tracing_data.outfile, perf_outfile)?;
166            }
167
168            let fold_data = if cfg!(target_os = "macos") {
169                let deserializer = &mut serde_json::Deserializer::from_reader(std::fs::File::open(
170                    tracing_data.outfile.path(),
171                )?);
172                let loaded = serde_path_to_error::deserialize::<_, FxProfile>(deserializer)?;
173
174                ProgressTracker::leaf("processing samply", samply_to_folded(loaded))
175                    .await
176                    .into()
177            } else if cfg!(target_family = "windows") {
178                let mut fold_er = DtraceFolder::from(
179                    tracing_config
180                        .fold_dtrace_options
181                        .clone()
182                        .unwrap_or_default(),
183                );
184
185                let fold_data =
186                    ProgressTracker::leaf("fold dtrace output".to_owned(), async move {
187                        let mut fold_data = Vec::new();
188                        fold_er.collapse_file(Some(tracing_data.outfile), &mut fold_data)?;
189                        Result::<_>::Ok(fold_data)
190                    })
191                    .await?;
192                fold_data
193            } else if cfg!(target_family = "unix") {
194                // Run perf script.
195                let mut perf_script = Command::new("perf")
196                    .args(["script", "--symfs=/", "-i"])
197                    .arg(tracing_data.outfile.path())
198                    .stdout(Stdio::piped())
199                    .stderr(Stdio::piped())
200                    .spawn()?;
201
202                let stdout = perf_script.stdout.take().unwrap().compat();
203                let mut stderr_lines =
204                    TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
205
206                let mut fold_er =
207                    PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
208
209                // Pattern on `()` to make sure no `Result`s are ignored.
210                let ((), fold_data, ()) = tokio::try_join!(
211                    async move {
212                        // Log stderr.
213                        while let Ok(Some(s)) = stderr_lines.next_line().await {
214                            ProgressTracker::println(format!("[perf script stderr] {s}"));
215                        }
216                        Result::<_>::Ok(())
217                    },
218                    async move {
219                        // Stream `perf script` stdout and fold.
220                        tokio::task::spawn_blocking(move || {
221                            let mut fold_data = Vec::new();
222                            fold_er.collapse(
223                                SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
224                                &mut fold_data,
225                            )?;
226                            Ok(fold_data)
227                        })
228                        .await?
229                    },
230                    async move {
231                        // Close stdin and wait for command exit.
232                        perf_script.status().await?;
233                        Ok(())
234                    },
235                )?;
236                fold_data
237            } else {
238                bail!(
239                    "Unknown OS for perf/dtrace tracing: {}",
240                    std::env::consts::OS
241                );
242            };
243
244            handle_fold_data(tracing_config, fold_data.clone()).await?;
245
246            self.tracing_results = Some(TracingResults {
247                folded_data: fold_data,
248            });
249        };
250
251        Ok(())
252    }
253}
254
255fn exit_code(c: ExitStatus) -> i32 {
256    #[cfg(unix)]
257    return c.code().or(c.signal()).unwrap();
258    #[cfg(not(unix))]
259    return c.code().unwrap();
260}