hydro_lang/rewrites/
analyze_perf_and_counters.rs

1use std::collections::HashMap;
2
3use tokio::sync::mpsc::UnboundedReceiver;
4
5use crate::builder::deploy::DeployResult;
6use crate::deploy::HydroDeploy;
7use crate::deploy::deploy_graph::DeployCrateWrapper;
8use crate::ir::HydroLeaf;
9use crate::location::LocationId;
10use crate::rewrites::analyze_counter::{inject_count, parse_counter_usage};
11use crate::rewrites::analyze_perf::{analyze_perf, parse_cpu_usage};
12
13pub async fn analyze_results(
14    nodes: DeployResult<'static, HydroDeploy>,
15    ir: &mut [HydroLeaf],
16    usage_out: &mut HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
17    cardinality_out: &mut HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
18) {
19    for (id, name, cluster) in nodes.get_all_clusters() {
20        // Iterate through nodes' usages and keep the max usage one
21        let mut max_usage = None;
22        for (idx, _) in cluster.members().iter().enumerate() {
23            let measurement = usage_out
24                .get_mut(&(id.clone(), name.clone(), idx))
25                .unwrap()
26                .recv()
27                .await
28                .unwrap();
29            let usage = parse_cpu_usage(measurement);
30            if let Some((prev_usage, _)) = max_usage {
31                if usage > prev_usage {
32                    max_usage = Some((usage, idx));
33                }
34            } else {
35                max_usage = Some((usage, idx));
36            }
37        }
38
39        if let Some((usage, idx)) = max_usage {
40            if let Some(perf_results) = cluster.members().get(idx).unwrap().tracing_results().await
41            {
42                println!("{}: {}", &name, usage);
43
44                // Inject perf usages into metadata
45                analyze_perf(ir, perf_results.folded_data);
46
47                // Get cardinality data. Allow later values to overwrite earlier ones
48                let node_cardinality = cardinality_out
49                    .get_mut(&(id.clone(), name.clone(), idx))
50                    .unwrap();
51                let mut op_to_counter = HashMap::new();
52                while let Some(measurement) = node_cardinality.recv().await {
53                    let (op_id, count) = parse_counter_usage(measurement);
54                    op_to_counter.insert(op_id, count);
55                }
56
57                inject_count(ir, &op_to_counter);
58            }
59        }
60    }
61}