hydro_deploy/rust_crate/
flamegraph.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use anyhow::{Error, Result};
6use futures::stream::FuturesUnordered;
7use tokio_stream::StreamExt;
8
9use super::tracing_options::TracingOptions;
10
11pub async fn handle_fold_data(
12 tracing: &TracingOptions,
13 fold_data: impl Into<Arc<[u8]>>,
14) -> Result<()> {
15 let fold_data = &fold_data.into();
17 let output_tasks =
18 FuturesUnordered::<Pin<Box<dyn Future<Output = Result<()>> + Send + Sync>>>::new();
19
20 if let Some(fold_outfile) = tracing.fold_outfile.clone() {
22 let fold_data = Arc::clone(fold_data);
23 output_tasks.push(Box::pin(async move {
24 let mut reader = &*fold_data;
25 let mut writer = tokio::fs::File::create(fold_outfile).await?;
26 tokio::io::copy_buf(&mut reader, &mut writer).await?;
27 Ok(())
28 }));
29 };
30
31 if let Some(flamegraph_outfile) = tracing.flamegraph_outfile.clone() {
33 let mut options = tracing
34 .flamegraph_options
35 .map(|f| (f)())
36 .unwrap_or_default();
37 output_tasks.push(Box::pin(async move {
38 let writer = tokio::fs::File::create(flamegraph_outfile)
39 .await?
40 .into_std()
41 .await;
42 let fold_data = Arc::clone(fold_data);
43 tokio::task::spawn_blocking(move || {
44 inferno::flamegraph::from_lines(
45 &mut options,
46 fold_data
47 .split(|&b| b == b'\n')
48 .map(std::str::from_utf8)
49 .map(Result::unwrap),
50 writer,
51 )
52 })
53 .await??;
54 Ok(())
55 }));
56 };
57
58 let errors = output_tasks
59 .filter_map(Result::err)
60 .collect::<Vec<_>>()
61 .await;
62 if !errors.is_empty() {
63 Err(MultipleErrors { errors })?;
64 };
65
66 Ok(())
67}
68
69#[derive(Debug)]
70struct MultipleErrors {
71 errors: Vec<Error>,
72}
73impl std::fmt::Display for MultipleErrors {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 if 1 == self.errors.len() {
76 self.errors.first().unwrap().fmt(f)
77 } else {
78 writeln!(f, "({}) errors occured:", self.errors.len())?;
79 writeln!(f)?;
80 for (i, error) in self.errors.iter().enumerate() {
81 write!(f, "({}/{}):", i + 1, self.errors.len())?;
82 error.fmt(f)?;
83 writeln!(f)?;
84 }
85 Ok(())
86 }
87 }
88}
89impl std::error::Error for MultipleErrors {}