hydro_lang/rewrites/
analyze_perf.rs
1use std::collections::HashMap;
2
3use regex::Regex;
4
5use crate::ir::*;
6pub use crate::runtime_support::resource_measurement::CPU_USAGE_PREFIX;
7
8pub fn parse_cpu_usage(measurement: String) -> f64 {
9 let regex = Regex::new(r"Total (\d+\.\d+)%").unwrap();
10 regex
11 .captures_iter(&measurement)
12 .last()
13 .map(|cap| cap[1].parse::<f64>().unwrap())
14 .unwrap_or(0f64)
15}
16
17fn parse_perf(file: String) -> HashMap<usize, HashMap<String, f64>> {
20 let mut total_samples = 0f64;
21 let mut samples_per_operator = HashMap::new();
22 let operator_regex = Regex::new(r"::op_\d+v\d+__(.*?)__(\d+)::").unwrap();
23 let sink_feed_regex = Regex::new(r"sink_feed_flush_(\d+)").unwrap();
24
25 for line in file.lines() {
26 let n_samples_index = line.rfind(' ').unwrap() + 1;
27 let n_samples = &line[n_samples_index..].parse::<f64>().unwrap();
28
29 let mut new_samples = vec![];
30 if let Some(cap) = operator_regex.captures_iter(line).last() {
31 let operator_name = &cap[1];
32 let id = cap[2].parse::<usize>().unwrap();
33 new_samples.push((id, operator_name.to_string()));
34 }
35 if let Some(cap) = sink_feed_regex.captures_iter(line).last() {
37 let id = cap[1].parse::<usize>().unwrap();
38 new_samples.push((id, "sink_feed_flush".to_string()));
39 }
40
41 for (id, operator_name) in new_samples {
42 let dfir_operator_and_samples =
43 samples_per_operator.entry(id).or_insert(HashMap::new());
44 let prev_samples = dfir_operator_and_samples
45 .entry(operator_name)
46 .or_insert(0f64);
47 *prev_samples += n_samples;
48 }
49
50 total_samples += n_samples;
51 }
52
53 samples_per_operator.iter_mut().for_each(|(_, v)| {
54 v.iter_mut()
55 .for_each(|(_, samples)| *samples /= total_samples)
56 });
57 samples_per_operator
58}
59
60fn inject_perf_leaf(
61 leaf: &mut HydroLeaf,
62 id_to_usage: &HashMap<usize, HashMap<String, f64>>,
63 next_stmt_id: &mut usize,
64) {
65 if let Some(dfir_operator_and_samples) = id_to_usage.get(next_stmt_id) {
66 leaf.metadata_mut().cpu_usage = Some(dfir_operator_and_samples.values().sum());
67 }
68}
69
70fn inject_perf_node(
71 node: &mut HydroNode,
72 id_to_usage: &HashMap<usize, HashMap<String, f64>>,
73 next_stmt_id: &mut usize,
74) {
75 if let Some(dfir_operator_and_samples) = id_to_usage.get(next_stmt_id) {
76 node.metadata_mut().cpu_usage = Some(dfir_operator_and_samples.values().sum());
77 }
78}
79
80pub fn analyze_perf(ir: &mut [HydroLeaf], folded_data: Vec<u8>) {
81 let id_to_usage = parse_perf(String::from_utf8(folded_data).unwrap());
82 traverse_dfir(
83 ir,
84 |leaf, next_stmt_id| {
85 inject_perf_leaf(leaf, &id_to_usage, next_stmt_id);
86 },
87 |node, next_stmt_id| {
88 inject_perf_node(node, &id_to_usage, next_stmt_id);
89 },
90 );
91}