hydro_deploy/localhost/
launched_binary.rs

1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::{Arc, Mutex};
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::dtrace::Folder as DtraceFolder;
13use inferno::collapse::perf::Folder as PerfFolder;
14use tempfile::NamedTempFile;
15use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18use tokio_util::io::SyncIoBridge;
19
20use super::samply::{FxProfile, samply_to_folded};
21use crate::progress::ProgressTracker;
22use crate::rust_crate::flamegraph::handle_fold_data;
23use crate::rust_crate::tracing_options::TracingOptions;
24use crate::ssh::PrefixFilteredChannel;
25use crate::util::prioritized_broadcast;
26use crate::{LaunchedBinary, TracingResults};
27
28pub(super) struct TracingDataLocal {
29    pub(super) outfile: NamedTempFile,
30}
31
32pub struct LaunchedLocalhostBinary {
33    child: Mutex<async_process::Child>,
34    tracing_config: Option<TracingOptions>,
35    tracing_data_local: Option<TracingDataLocal>,
36    tracing_results: Option<TracingResults>,
37    stdin_sender: mpsc::UnboundedSender<String>,
38    stdout_deploy_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
39    stdout_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
40    stderr_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
41}
42
43#[cfg(unix)]
44impl Drop for LaunchedLocalhostBinary {
45    fn drop(&mut self) {
46        let mut child = self.child.lock().unwrap();
47
48        if let Ok(Some(_)) = child.try_status() {
49            return;
50        }
51
52        let pid = child.id();
53        if let Err(e) = nix::sys::signal::kill(
54            nix::unistd::Pid::from_raw(pid as i32),
55            nix::sys::signal::SIGTERM,
56        ) {
57            ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
58        }
59    }
60}
61
62impl LaunchedLocalhostBinary {
63    pub(super) fn new(
64        mut child: async_process::Child,
65        id: String,
66        tracing_config: Option<TracingOptions>,
67        tracing_data_local: Option<TracingDataLocal>,
68    ) -> Self {
69        let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
70        let mut stdin = child.stdin.take().unwrap();
71        tokio::spawn(async move {
72            while let Some(line) = stdin_receiver.recv().await {
73                if stdin.write_all(line.as_bytes()).await.is_err() {
74                    break;
75                }
76
77                stdin.flush().await.ok();
78            }
79        });
80
81        let id_clone = id.clone();
82        let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast(
83            FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
84            move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
85        );
86        let (_, stderr_receivers) = prioritized_broadcast(
87            FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
88            move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
89        );
90
91        Self {
92            child: Mutex::new(child),
93            tracing_config,
94            tracing_data_local,
95            tracing_results: None,
96            stdin_sender,
97            stdout_deploy_receivers,
98            stdout_receivers,
99            stderr_receivers,
100        }
101    }
102}
103
104#[async_trait]
105impl LaunchedBinary for LaunchedLocalhostBinary {
106    fn stdin(&self) -> mpsc::UnboundedSender<String> {
107        self.stdin_sender.clone()
108    }
109
110    fn deploy_stdout(&self) -> oneshot::Receiver<String> {
111        let mut receivers = self.stdout_deploy_receivers.lock().unwrap();
112
113        if receivers.is_some() {
114            panic!("Only one deploy stdout receiver is allowed at a time");
115        }
116
117        let (sender, receiver) = oneshot::channel::<String>();
118        *receivers = Some(sender);
119        receiver
120    }
121
122    fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
123        let mut receivers = self.stdout_receivers.lock().unwrap();
124        let (sender, receiver) = mpsc::unbounded_channel::<String>();
125        receivers.push((None, sender));
126        receiver
127    }
128
129    fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
130        let mut receivers = self.stderr_receivers.lock().unwrap();
131        let (sender, receiver) = mpsc::unbounded_channel::<String>();
132        receivers.push((None, sender));
133        receiver
134    }
135
136    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
137        let mut receivers = self.stdout_receivers.lock().unwrap();
138        let (sender, receiver) = mpsc::unbounded_channel::<String>();
139        receivers.push((Some(prefix), sender));
140        receiver
141    }
142
143    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
144        let mut receivers = self.stderr_receivers.lock().unwrap();
145        let (sender, receiver) = mpsc::unbounded_channel::<String>();
146        receivers.push((Some(prefix), sender));
147        receiver
148    }
149
150    fn tracing_results(&self) -> Option<&TracingResults> {
151        self.tracing_results.as_ref()
152    }
153
154    fn exit_code(&self) -> Option<i32> {
155        self.child
156            .lock()
157            .unwrap()
158            .try_status()
159            .ok()
160            .flatten()
161            .map(exit_code)
162    }
163
164    async fn wait(&mut self) -> Result<i32> {
165        Ok(exit_code(self.child.get_mut().unwrap().status().await?))
166    }
167
168    async fn stop(&mut self) -> Result<()> {
169        if let Err(err) = self.child.get_mut().unwrap().kill() {
170            if !matches!(err.kind(), std::io::ErrorKind::InvalidInput) {
171                Err(err)?;
172            }
173        }
174
175        // Run perf post-processing and download perf output.
176        if let Some(tracing_config) = self.tracing_config.as_ref() {
177            if self.tracing_results.is_none() {
178                let tracing_data = self.tracing_data_local.take().unwrap();
179
180                if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
181                    if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
182                        std::fs::copy(&tracing_data.outfile, samply_outfile)?;
183                    }
184                } else if cfg!(target_family = "unix") {
185                    if let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref() {
186                        std::fs::copy(&tracing_data.outfile, perf_outfile)?;
187                    }
188                }
189
190                let fold_data = if cfg!(target_os = "macos") {
191                    let deserializer = &mut serde_json::Deserializer::from_reader(
192                        std::fs::File::open(tracing_data.outfile.path())?,
193                    );
194                    let loaded = serde_path_to_error::deserialize::<_, FxProfile>(deserializer)?;
195
196                    samply_to_folded(loaded).await.into()
197                } else if cfg!(target_family = "windows") {
198                    let mut fold_er = DtraceFolder::from(
199                        tracing_config
200                            .fold_dtrace_options
201                            .clone()
202                            .unwrap_or_default(),
203                    );
204
205                    let fold_data =
206                        ProgressTracker::leaf("fold dtrace output".to_owned(), async move {
207                            let mut fold_data = Vec::new();
208                            fold_er.collapse_file(Some(tracing_data.outfile), &mut fold_data)?;
209                            Result::<_>::Ok(fold_data)
210                        })
211                        .await?;
212                    fold_data
213                } else if cfg!(target_family = "unix") {
214                    // Run perf script.
215                    let mut perf_script = Command::new("perf")
216                        .args(["script", "--symfs=/", "-i"])
217                        .arg(tracing_data.outfile.path())
218                        .stdout(Stdio::piped())
219                        .stderr(Stdio::piped())
220                        .spawn()?;
221
222                    let stdout = perf_script.stdout.take().unwrap().compat();
223                    let mut stderr_lines =
224                        TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
225
226                    let mut fold_er = PerfFolder::from(
227                        tracing_config.fold_perf_options.clone().unwrap_or_default(),
228                    );
229
230                    // Pattern on `()` to make sure no `Result`s are ignored.
231                    let ((), fold_data, ()) = tokio::try_join!(
232                        async move {
233                            // Log stderr.
234                            while let Ok(Some(s)) = stderr_lines.next_line().await {
235                                ProgressTracker::println(format!("[perf script stderr] {s}"));
236                            }
237                            Result::<_>::Ok(())
238                        },
239                        async move {
240                            // Stream `perf script` stdout and fold.
241                            tokio::task::spawn_blocking(move || {
242                                let mut fold_data = Vec::new();
243                                fold_er.collapse(
244                                    SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
245                                    &mut fold_data,
246                                )?;
247                                Ok(fold_data)
248                            })
249                            .await?
250                        },
251                        async move {
252                            // Close stdin and wait for command exit.
253                            perf_script.status().await?;
254                            Ok(())
255                        },
256                    )?;
257                    fold_data
258                } else {
259                    bail!(
260                        "Unknown OS for perf/dtrace tracing: {}",
261                        std::env::consts::OS
262                    );
263                };
264
265                handle_fold_data(tracing_config, fold_data.clone()).await?;
266
267                self.tracing_results = Some(TracingResults {
268                    folded_data: fold_data,
269                });
270            }
271        };
272
273        Ok(())
274    }
275}
276
277fn exit_code(c: ExitStatus) -> i32 {
278    #[cfg(unix)]
279    return c.code().or(c.signal()).unwrap();
280    #[cfg(not(unix))]
281    return c.code().unwrap();
282}