hydro_lang/rewrites/
analyze_perf.rs

1use std::collections::HashMap;
2
3use regex::Regex;
4
5use crate::ir::*;
6pub use crate::runtime_support::resource_measurement::CPU_USAGE_PREFIX;
7
8pub fn parse_cpu_usage(measurement: String) -> f64 {
9    let regex = Regex::new(r"Total (\d+\.\d+)%").unwrap();
10    regex
11        .captures_iter(&measurement)
12        .last()
13        .map(|cap| cap[1].parse::<f64>().unwrap())
14        .unwrap_or(0f64)
15}
16
17/// Returns a map from operator ID to a map of (DFIR operator name, percentage of total samples) pairs.
18/// The DFIR operator name is returned because a single Hydro operator can map to multiple DFIR operators
19fn parse_perf(file: String) -> HashMap<usize, HashMap<String, f64>> {
20    let mut total_samples = 0f64;
21    let mut samples_per_operator = HashMap::new();
22    let operator_regex = Regex::new(r"::op_\d+v\d+__(.*?)__(\d+)::").unwrap();
23    let sink_feed_regex = Regex::new(r"sink_feed_flush_(\d+)").unwrap();
24
25    for line in file.lines() {
26        let n_samples_index = line.rfind(' ').unwrap() + 1;
27        let n_samples = &line[n_samples_index..].parse::<f64>().unwrap();
28
29        let mut new_samples = vec![];
30        if let Some(cap) = operator_regex.captures_iter(line).last() {
31            let operator_name = &cap[1];
32            let id = cap[2].parse::<usize>().unwrap();
33            new_samples.push((id, operator_name.to_string()));
34        }
35        // Note: Although we do a regex check twice per line (potentially adding samples twice), there will never be an operator and sink_feed in the same line, so it's ok
36        if let Some(cap) = sink_feed_regex.captures_iter(line).last() {
37            let id = cap[1].parse::<usize>().unwrap();
38            new_samples.push((id, "sink_feed_flush".to_string()));
39        }
40
41        for (id, operator_name) in new_samples {
42            let dfir_operator_and_samples =
43                samples_per_operator.entry(id).or_insert(HashMap::new());
44            let prev_samples = dfir_operator_and_samples
45                .entry(operator_name)
46                .or_insert(0f64);
47            *prev_samples += n_samples;
48        }
49
50        total_samples += n_samples;
51    }
52
53    samples_per_operator.iter_mut().for_each(|(_, v)| {
54        v.iter_mut()
55            .for_each(|(_, samples)| *samples /= total_samples)
56    });
57    samples_per_operator
58}
59
60fn inject_perf_leaf(
61    leaf: &mut HydroLeaf,
62    id_to_usage: &HashMap<usize, HashMap<String, f64>>,
63    next_stmt_id: &mut usize,
64) {
65    if let Some(dfir_operator_and_samples) = id_to_usage.get(next_stmt_id) {
66        leaf.metadata_mut().cpu_usage = Some(dfir_operator_and_samples.values().sum());
67    }
68}
69
70fn inject_perf_node(
71    node: &mut HydroNode,
72    id_to_usage: &HashMap<usize, HashMap<String, f64>>,
73    next_stmt_id: &mut usize,
74) {
75    if let Some(dfir_operator_and_samples) = id_to_usage.get(next_stmt_id) {
76        node.metadata_mut().cpu_usage = Some(dfir_operator_and_samples.values().sum());
77    }
78}
79
80pub fn analyze_perf(ir: &mut [HydroLeaf], folded_data: Vec<u8>) {
81    let id_to_usage = parse_perf(String::from_utf8(folded_data).unwrap());
82    traverse_dfir(
83        ir,
84        |leaf, next_stmt_id| {
85            inject_perf_leaf(leaf, &id_to_usage, next_stmt_id);
86        },
87        |node, next_stmt_id| {
88            inject_perf_node(node, &id_to_usage, next_stmt_id);
89        },
90    );
91}