hydro_deploy/localhost/
launched_binary.rs

1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::{Arc, Mutex};
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::dtrace::Folder as DtraceFolder;
13use inferno::collapse::perf::Folder as PerfFolder;
14use tempfile::NamedTempFile;
15use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::compat::FuturesAsyncReadCompatExt;
18use tokio_util::io::SyncIoBridge;
19
20use crate::progress::ProgressTracker;
21use crate::rust_crate::flamegraph::handle_fold_data;
22use crate::rust_crate::tracing_options::TracingOptions;
23use crate::ssh::PrefixFilteredChannel;
24use crate::util::prioritized_broadcast;
25use crate::{LaunchedBinary, TracingResults};
26
27pub(super) struct TracingDataLocal {
28    pub(super) outfile: NamedTempFile,
29}
30
31pub struct LaunchedLocalhostBinary {
32    child: Mutex<async_process::Child>,
33    tracing_config: Option<TracingOptions>,
34    tracing_data_local: Option<TracingDataLocal>,
35    tracing_results: Option<TracingResults>,
36    stdin_sender: mpsc::UnboundedSender<String>,
37    stdout_deploy_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
38    stdout_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
39    stderr_receivers: Arc<Mutex<Vec<PrefixFilteredChannel>>>,
40}
41
42#[cfg(unix)]
43impl Drop for LaunchedLocalhostBinary {
44    fn drop(&mut self) {
45        let mut child = self.child.lock().unwrap();
46
47        if let Ok(Some(_)) = child.try_status() {
48            return;
49        }
50
51        let pid = child.id();
52        if let Err(e) = nix::sys::signal::kill(
53            nix::unistd::Pid::from_raw(pid as i32),
54            nix::sys::signal::SIGTERM,
55        ) {
56            ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
57        }
58    }
59}
60
61impl LaunchedLocalhostBinary {
62    pub(super) fn new(
63        mut child: async_process::Child,
64        id: String,
65        tracing_config: Option<TracingOptions>,
66        tracing_data_local: Option<TracingDataLocal>,
67    ) -> Self {
68        let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
69        let mut stdin = child.stdin.take().unwrap();
70        tokio::spawn(async move {
71            while let Some(line) = stdin_receiver.recv().await {
72                if stdin.write_all(line.as_bytes()).await.is_err() {
73                    break;
74                }
75
76                stdin.flush().await.ok();
77            }
78        });
79
80        let id_clone = id.clone();
81        let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast(
82            FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
83            move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
84        );
85        let (_, stderr_receivers) = prioritized_broadcast(
86            FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
87            move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
88        );
89
90        Self {
91            child: Mutex::new(child),
92            tracing_config,
93            tracing_data_local,
94            tracing_results: None,
95            stdin_sender,
96            stdout_deploy_receivers,
97            stdout_receivers,
98            stderr_receivers,
99        }
100    }
101}
102
103#[async_trait]
104impl LaunchedBinary for LaunchedLocalhostBinary {
105    fn stdin(&self) -> mpsc::UnboundedSender<String> {
106        self.stdin_sender.clone()
107    }
108
109    fn deploy_stdout(&self) -> oneshot::Receiver<String> {
110        let mut receivers = self.stdout_deploy_receivers.lock().unwrap();
111
112        if receivers.is_some() {
113            panic!("Only one deploy stdout receiver is allowed at a time");
114        }
115
116        let (sender, receiver) = oneshot::channel::<String>();
117        *receivers = Some(sender);
118        receiver
119    }
120
121    fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
122        let mut receivers = self.stdout_receivers.lock().unwrap();
123        let (sender, receiver) = mpsc::unbounded_channel::<String>();
124        receivers.push((None, sender));
125        receiver
126    }
127
128    fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
129        let mut receivers = self.stderr_receivers.lock().unwrap();
130        let (sender, receiver) = mpsc::unbounded_channel::<String>();
131        receivers.push((None, sender));
132        receiver
133    }
134
135    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
136        let mut receivers = self.stdout_receivers.lock().unwrap();
137        let (sender, receiver) = mpsc::unbounded_channel::<String>();
138        receivers.push((Some(prefix), sender));
139        receiver
140    }
141
142    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
143        let mut receivers = self.stderr_receivers.lock().unwrap();
144        let (sender, receiver) = mpsc::unbounded_channel::<String>();
145        receivers.push((Some(prefix), sender));
146        receiver
147    }
148
149    fn tracing_results(&self) -> Option<&TracingResults> {
150        self.tracing_results.as_ref()
151    }
152
153    fn exit_code(&self) -> Option<i32> {
154        self.child
155            .lock()
156            .unwrap()
157            .try_status()
158            .ok()
159            .flatten()
160            .map(exit_code)
161    }
162
163    async fn wait(&mut self) -> Result<i32> {
164        Ok(exit_code(self.child.get_mut().unwrap().status().await?))
165    }
166
167    async fn stop(&mut self) -> Result<()> {
168        if let Err(err) = self.child.get_mut().unwrap().kill() {
169            if !matches!(err.kind(), std::io::ErrorKind::InvalidInput) {
170                Err(err)?;
171            }
172        }
173
174        // Run perf post-processing and download perf output.
175        if let Some(tracing_config) = self.tracing_config.as_ref() {
176            let tracing_data = self.tracing_data_local.take().unwrap();
177
178            if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
179                if let Some(dtrace_outfile) = tracing_config.dtrace_outfile.as_ref() {
180                    std::fs::copy(&tracing_data.outfile, dtrace_outfile)?;
181                }
182            } else if cfg!(target_family = "unix") {
183                if let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref() {
184                    std::fs::copy(&tracing_data.outfile, perf_outfile)?;
185                }
186            }
187
188            let fold_data = if cfg!(target_os = "macos") || cfg!(target_family = "windows") {
189                let mut fold_er = DtraceFolder::from(
190                    tracing_config
191                        .fold_dtrace_options
192                        .clone()
193                        .unwrap_or_default(),
194                );
195
196                let fold_data =
197                    ProgressTracker::leaf("fold dtrace output".to_owned(), async move {
198                        let mut fold_data = Vec::new();
199                        fold_er.collapse_file(Some(tracing_data.outfile), &mut fold_data)?;
200                        Result::<_>::Ok(fold_data)
201                    })
202                    .await?;
203                fold_data
204            } else if cfg!(target_family = "unix") {
205                // Run perf script.
206                let mut perf_script = Command::new("perf")
207                    .args(["script", "--symfs=/", "-i"])
208                    .arg(tracing_data.outfile.path())
209                    .stdout(Stdio::piped())
210                    .stderr(Stdio::piped())
211                    .spawn()?;
212
213                let stdout = perf_script.stdout.take().unwrap().compat();
214                let mut stderr_lines =
215                    TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
216
217                let mut fold_er =
218                    PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
219
220                // Pattern on `()` to make sure no `Result`s are ignored.
221                let ((), fold_data, ()) = tokio::try_join!(
222                    async move {
223                        // Log stderr.
224                        while let Ok(Some(s)) = stderr_lines.next_line().await {
225                            ProgressTracker::println(format!("[perf script stderr] {s}"));
226                        }
227                        Result::<_>::Ok(())
228                    },
229                    async move {
230                        // Stream `perf script` stdout and fold.
231                        tokio::task::spawn_blocking(move || {
232                            let mut fold_data = Vec::new();
233                            fold_er.collapse(
234                                SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
235                                &mut fold_data,
236                            )?;
237                            Ok(fold_data)
238                        })
239                        .await?
240                    },
241                    async move {
242                        // Close stdin and wait for command exit.
243                        perf_script.status().await?;
244                        Ok(())
245                    },
246                )?;
247                fold_data
248            } else {
249                bail!(
250                    "Unknown OS for perf/dtrace tracing: {}",
251                    std::env::consts::OS
252                );
253            };
254
255            self.tracing_results = Some(TracingResults {
256                folded_data: fold_data.clone(),
257            });
258
259            handle_fold_data(tracing_config, fold_data).await?;
260        };
261
262        Ok(())
263    }
264}
265
266fn exit_code(c: ExitStatus) -> i32 {
267    #[cfg(unix)]
268    return c.code().or(c.signal()).unwrap();
269    #[cfg(not(unix))]
270    return c.code().unwrap();
271}