hydro_deploy/localhost/
launched_binary.rs

1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::ExitStatus;
4#[cfg(feature = "profile-folding")]
5use std::process::Stdio;
6#[cfg(feature = "profile-folding")]
7use std::sync::OnceLock;
8
9use anyhow::Result;
10#[cfg(feature = "profile-folding")]
11use anyhow::bail;
12#[cfg(feature = "profile-folding")]
13use async_process::Command;
14use async_trait::async_trait;
15use futures::io::BufReader as FuturesBufReader;
16use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
17#[cfg(feature = "profile-folding")]
18use inferno::collapse::Collapse;
19#[cfg(feature = "profile-folding")]
20use inferno::collapse::perf::Folder as PerfFolder;
21use tempfile::NamedTempFile;
22#[cfg(feature = "profile-folding")]
23use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
24use tokio::sync::{mpsc, oneshot};
25#[cfg(feature = "profile-folding")]
26use tokio_util::compat::FuturesAsyncReadCompatExt;
27#[cfg(feature = "profile-folding")]
28use tokio_util::io::SyncIoBridge;
29
30#[cfg(feature = "profile-folding")]
31#[cfg(any(target_os = "macos", target_family = "windows"))]
32use super::samply::samply_to_folded;
33use crate::LaunchedBinary;
34#[cfg(feature = "profile-folding")]
35use crate::TracingResults;
36use crate::progress::ProgressTracker;
37#[cfg(feature = "profile-folding")]
38use crate::rust_crate::flamegraph::handle_fold_data;
39use crate::rust_crate::tracing_options::TracingOptions;
40use crate::util::{PriorityBroadcast, prioritized_broadcast};
41
42pub(super) struct TracingDataLocal {
43    pub(super) outfile: NamedTempFile,
44}
45
46pub struct LaunchedLocalhostBinary {
47    /// Must use async mutex -- we will .await methods within the child (while holding lock).
48    child: tokio::sync::Mutex<async_process::Child>,
49    tracing_config: Option<TracingOptions>,
50    tracing_data_local: std::sync::Mutex<Option<TracingDataLocal>>,
51    #[cfg(feature = "profile-folding")]
52    tracing_results: OnceLock<TracingResults>,
53    stdin_sender: mpsc::UnboundedSender<String>,
54    stdout_broadcast: PriorityBroadcast,
55    stderr_broadcast: PriorityBroadcast,
56}
57
58#[cfg(unix)]
59impl Drop for LaunchedLocalhostBinary {
60    fn drop(&mut self) {
61        let child = self.child.get_mut();
62
63        if let Ok(Some(_)) = child.try_status() {
64            return;
65        }
66
67        let pid = child.id();
68        if let Err(e) = nix::sys::signal::kill(
69            nix::unistd::Pid::from_raw(pid as i32),
70            nix::sys::signal::SIGTERM,
71        ) {
72            ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
73        }
74    }
75}
76
77impl LaunchedLocalhostBinary {
78    pub(super) fn new(
79        mut child: async_process::Child,
80        id: String,
81        tracing_config: Option<TracingOptions>,
82        tracing_data_local: Option<TracingDataLocal>,
83    ) -> Self {
84        let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
85        let mut stdin = child.stdin.take().unwrap();
86        tokio::spawn(async move {
87            while let Some(line) = stdin_receiver.recv().await {
88                if stdin.write_all(line.as_bytes()).await.is_err() {
89                    break;
90                }
91
92                stdin.flush().await.ok();
93            }
94        });
95
96        let id_clone = id.clone();
97        let stdout_broadcast = prioritized_broadcast(
98            FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
99            move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
100        );
101        let stderr_broadcast = prioritized_broadcast(
102            FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
103            move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
104        );
105
106        Self {
107            child: tokio::sync::Mutex::new(child),
108            tracing_config,
109            tracing_data_local: std::sync::Mutex::new(tracing_data_local),
110            #[cfg(feature = "profile-folding")]
111            tracing_results: OnceLock::new(),
112            stdin_sender,
113            stdout_broadcast,
114            stderr_broadcast,
115        }
116    }
117}
118
119#[async_trait]
120impl LaunchedBinary for LaunchedLocalhostBinary {
121    fn stdin(&self) -> mpsc::UnboundedSender<String> {
122        self.stdin_sender.clone()
123    }
124
125    fn deploy_stdout(&self) -> oneshot::Receiver<String> {
126        self.stdout_broadcast.receive_priority()
127    }
128
129    fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
130        self.stdout_broadcast.receive(None)
131    }
132
133    fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
134        self.stderr_broadcast.receive(None)
135    }
136
137    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
138        self.stdout_broadcast.receive(Some(prefix))
139    }
140
141    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
142        self.stderr_broadcast.receive(Some(prefix))
143    }
144
145    #[cfg(feature = "profile-folding")]
146    fn tracing_results(&self) -> Option<&TracingResults> {
147        self.tracing_results.get()
148    }
149
150    fn exit_code(&self) -> Option<i32> {
151        self.child
152            .try_lock()
153            .ok()
154            .and_then(|mut child| child.try_status().ok())
155            .flatten()
156            .map(exit_code)
157    }
158
159    async fn wait(&self) -> Result<i32> {
160        Ok(exit_code(self.child.lock().await.status().await?))
161    }
162
163    async fn stop(&self) -> Result<()> {
164        if let Err(err) = { self.child.lock().await.kill() }
165            && !matches!(err.kind(), std::io::ErrorKind::InvalidInput)
166        {
167            Err(err)?;
168        }
169
170        // Run perf post-processing and download perf output.
171        if let Some(tracing_config) = self.tracing_config.as_ref() {
172            #[cfg(feature = "profile-folding")]
173            assert!(
174                self.tracing_results.get().is_none(),
175                "`tracing_results` already set! Was `stop()` called twice? This is a bug."
176            );
177            let tracing_data =
178                {
179                    self.tracing_data_local.lock().unwrap().take().expect(
180                        "`tracing_data_local` empty, was `stop()` called twice? This is a bug.",
181                    )
182                };
183
184            if cfg!(any(target_os = "macos", target_family = "windows")) {
185                if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
186                    std::fs::copy(&tracing_data.outfile, samply_outfile)?;
187                }
188            } else if cfg!(target_family = "unix")
189                && let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref()
190            {
191                std::fs::copy(&tracing_data.outfile, perf_outfile)?;
192            }
193
194            #[cfg(feature = "profile-folding")]
195            let fold_data = if cfg!(any(target_os = "macos", target_family = "windows")) {
196                #[cfg(any(target_os = "macos", target_family = "windows"))]
197                {
198                    let loaded =
199                        serde_json::from_reader(std::fs::File::open(tracing_data.outfile.path())?)?;
200
201                    ProgressTracker::leaf("processing samply", samply_to_folded(loaded))
202                        .await
203                        .into()
204                }
205
206                #[cfg(not(any(target_os = "macos", target_family = "windows")))]
207                {
208                    unreachable!()
209                }
210            } else if cfg!(target_family = "unix") {
211                // Run perf script.
212                let mut perf_script = Command::new("perf")
213                    .args(["script", "--symfs=/", "-i"])
214                    .arg(tracing_data.outfile.path())
215                    .stdout(Stdio::piped())
216                    .stderr(Stdio::piped())
217                    .spawn()?;
218
219                let stdout = perf_script.stdout.take().unwrap().compat();
220                let mut stderr_lines =
221                    TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
222
223                let mut fold_er =
224                    PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
225
226                // Pattern on `()` to make sure no `Result`s are ignored.
227                let ((), fold_data, ()) = tokio::try_join!(
228                    async move {
229                        // Log stderr.
230                        while let Ok(Some(s)) = stderr_lines.next_line().await {
231                            ProgressTracker::println(format!("[perf script stderr] {s}"));
232                        }
233                        Result::<_>::Ok(())
234                    },
235                    async move {
236                        // Stream `perf script` stdout and fold.
237                        tokio::task::spawn_blocking(move || {
238                            let mut fold_data = Vec::new();
239                            fold_er.collapse(
240                                SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
241                                &mut fold_data,
242                            )?;
243                            Ok(fold_data)
244                        })
245                        .await?
246                    },
247                    async move {
248                        // Close stdin and wait for command exit.
249                        perf_script.status().await?;
250                        Ok(())
251                    },
252                )?;
253                fold_data
254            } else {
255                bail!(
256                    "Unknown OS for samply/perf tracing: {}",
257                    std::env::consts::OS
258                );
259            };
260
261            #[cfg(feature = "profile-folding")]
262            handle_fold_data(tracing_config, fold_data.clone()).await?;
263
264            #[cfg(feature = "profile-folding")]
265            self.tracing_results
266                .set(TracingResults {
267                    folded_data: fold_data,
268                })
269                .expect("`tracing_results` already set! This is a bug.");
270        };
271
272        Ok(())
273    }
274}
275
276fn exit_code(c: ExitStatus) -> i32 {
277    #[cfg(unix)]
278    return c.code().or(c.signal()).unwrap();
279    #[cfg(not(unix))]
280    return c.code().unwrap();
281}