hydro_deploy/rust_crate/
flamegraph.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use anyhow::{Error, Result};
6use futures::stream::FuturesUnordered;
7use tokio_stream::StreamExt;
8
9use super::tracing_options::TracingOptions;
10
11pub async fn handle_fold_data(
12    tracing: &TracingOptions,
13    fold_data: impl Into<Arc<[u8]>>,
14) -> Result<()> {
15    // Wrap in Arc to allow sharing data across multiple outputs.
16    let fold_data = &fold_data.into();
17    let output_tasks =
18        FuturesUnordered::<Pin<Box<dyn Future<Output = Result<()>> + Send + Sync>>>::new();
19
20    // fold_outfile
21    if let Some(fold_outfile) = tracing.fold_outfile.clone() {
22        let fold_data = Arc::clone(fold_data);
23        output_tasks.push(Box::pin(async move {
24            let mut reader = &*fold_data;
25            let mut writer = tokio::fs::File::create(fold_outfile).await?;
26            tokio::io::copy_buf(&mut reader, &mut writer).await?;
27            Ok(())
28        }));
29    };
30
31    // flamegraph_outfile
32    if let Some(flamegraph_outfile) = tracing.flamegraph_outfile.clone() {
33        let mut options = tracing
34            .flamegraph_options
35            .map(|f| (f)())
36            .unwrap_or_default();
37        output_tasks.push(Box::pin(async move {
38            let writer = tokio::fs::File::create(flamegraph_outfile)
39                .await?
40                .into_std()
41                .await;
42            let fold_data = Arc::clone(fold_data);
43            tokio::task::spawn_blocking(move || {
44                inferno::flamegraph::from_lines(
45                    &mut options,
46                    fold_data
47                        .split(|&b| b == b'\n')
48                        .map(std::str::from_utf8)
49                        .map(Result::unwrap),
50                    writer,
51                )
52            })
53            .await??;
54            Ok(())
55        }));
56    };
57
58    let errors = output_tasks
59        .filter_map(Result::err)
60        .collect::<Vec<_>>()
61        .await;
62    if !errors.is_empty() {
63        Err(MultipleErrors { errors })?;
64    };
65
66    Ok(())
67}
68
69#[derive(Debug)]
70struct MultipleErrors {
71    errors: Vec<Error>,
72}
73impl std::fmt::Display for MultipleErrors {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        if 1 == self.errors.len() {
76            self.errors.first().unwrap().fmt(f)
77        } else {
78            writeln!(f, "({}) errors occured:", self.errors.len())?;
79            writeln!(f)?;
80            for (i, error) in self.errors.iter().enumerate() {
81                write!(f, "({}/{}):", i + 1, self.errors.len())?;
82                error.fmt(f)?;
83                writeln!(f)?;
84            }
85            Ok(())
86        }
87    }
88}
89impl std::error::Error for MultipleErrors {}