hydro_lang/rewrites/
analyze_perf_and_counters.rs
1use std::collections::HashMap;
2
3use tokio::sync::mpsc::UnboundedReceiver;
4
5use crate::builder::deploy::DeployResult;
6use crate::deploy::HydroDeploy;
7use crate::deploy::deploy_graph::DeployCrateWrapper;
8use crate::ir::HydroLeaf;
9use crate::location::LocationId;
10use crate::rewrites::analyze_counter::{inject_count, parse_counter_usage};
11use crate::rewrites::analyze_perf::{analyze_perf, parse_cpu_usage};
12
13pub async fn analyze_results(
14 nodes: DeployResult<'static, HydroDeploy>,
15 ir: &mut [HydroLeaf],
16 usage_out: &mut HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
17 cardinality_out: &mut HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
18) {
19 for (id, name, cluster) in nodes.get_all_clusters() {
20 let mut max_usage = None;
22 for (idx, _) in cluster.members().iter().enumerate() {
23 let measurement = usage_out
24 .get_mut(&(id.clone(), name.clone(), idx))
25 .unwrap()
26 .recv()
27 .await
28 .unwrap();
29 let usage = parse_cpu_usage(measurement);
30 if let Some((prev_usage, _)) = max_usage {
31 if usage > prev_usage {
32 max_usage = Some((usage, idx));
33 }
34 } else {
35 max_usage = Some((usage, idx));
36 }
37 }
38
39 if let Some((usage, idx)) = max_usage {
40 if let Some(perf_results) = cluster.members().get(idx).unwrap().tracing_results().await
41 {
42 println!("{}: {}", &name, usage);
43
44 analyze_perf(ir, perf_results.folded_data);
46
47 let node_cardinality = cardinality_out
49 .get_mut(&(id.clone(), name.clone(), idx))
50 .unwrap();
51 let mut op_to_counter = HashMap::new();
52 while let Some(measurement) = node_cardinality.recv().await {
53 let (op_id, count) = parse_counter_usage(measurement);
54 op_to_counter.insert(op_id, count);
55 }
56
57 inject_count(ir, &op_to_counter);
58 }
59 }
60 }
61}