hydro_std/
bench_client.rs
1use std::cell::RefCell;
2use std::rc::Rc;
3use std::time::{Duration, Instant};
4
5use hdrhistogram::Histogram;
6use hydro_lang::*;
7use stats_ci::mean::Arithmetic;
8use stats_ci::{Confidence, StatisticsOps};
9
10pub struct BenchResult<'a, Client> {
11 pub latency_histogram: Singleton<Rc<RefCell<Histogram<u64>>>, Cluster<'a, Client>, Unbounded>,
12 pub throughput: Singleton<Arithmetic<f64>, Cluster<'a, Client>, Unbounded>,
13}
14
15pub unsafe fn bench_client<'a, Client>(
23 clients: &Cluster<'a, Client>,
24 transaction_cycle: impl FnOnce(
25 Stream<(u32, u32), Cluster<'a, Client>, Unbounded>,
26 ) -> Stream<(u32, u32), Cluster<'a, Client>, Unbounded, NoOrder>,
27 num_clients_per_node: usize,
28) -> BenchResult<'a, Client> {
29 let client_tick = clients.tick();
30
31 let start_this_tick = client_tick.optional_first_tick(q!(()));
33
34 let c_new_payloads_on_start = start_this_tick.clone().flat_map_ordered(q!(move |_| (0
35 ..num_clients_per_node)
36 .map(move |i| (
37 (CLUSTER_SELF_ID.raw_id * (num_clients_per_node as u32)) + i as u32,
38 0
39 ))));
40
41 let (c_to_proposers_complete_cycle, c_to_proposers) =
42 clients.forward_ref::<Stream<_, _, _, TotalOrder>>();
43 let c_received_quorum_payloads = unsafe {
44 transaction_cycle(c_to_proposers).tick_batch(&client_tick)
49 };
50
51 let c_new_payloads_when_committed = c_received_quorum_payloads
53 .clone()
54 .map(q!(|payload| (payload.0, payload.1 + 1)));
55 c_to_proposers_complete_cycle.complete(
56 c_new_payloads_on_start
57 .chain(unsafe {
58 c_new_payloads_when_committed.assume_ordering::<TotalOrder>()
62 })
63 .all_ticks(),
64 );
65
66 let (c_timers_complete_cycle, c_timers) =
68 client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
69 let c_new_timers_when_leader_elected = start_this_tick
70 .map(q!(|_| Instant::now()))
71 .flat_map_ordered(q!(
72 move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
73 ));
74 let c_updated_timers = c_received_quorum_payloads
75 .clone()
76 .map(q!(|(key, _prev_count)| (key as usize, Instant::now())));
77 let c_new_timers = c_timers
78 .clone() .chain(c_new_timers_when_leader_elected)
80 .chain(c_updated_timers.clone())
81 .reduce_keyed_commutative(q!(|curr_time, new_time| {
82 if new_time > *curr_time {
83 *curr_time = new_time;
84 }
85 }));
86 c_timers_complete_cycle.complete_next_tick(c_new_timers);
87
88 let c_stats_output_timer = unsafe {
89 clients
91 .source_interval(q!(Duration::from_secs(1)))
92 .tick_batch(&client_tick)
93 }
94 .first();
95
96 let c_latencies = c_timers
97 .join(c_updated_timers)
98 .map(q!(
99 |(_virtual_id, (prev_time, curr_time))| curr_time.duration_since(prev_time)
100 ))
101 .all_ticks()
102 .fold_commutative(
103 q!(move || Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap()))),
104 q!(move |latencies, latency| {
105 latencies
106 .borrow_mut()
107 .record(latency.as_nanos() as u64)
108 .unwrap();
109 }),
110 );
111
112 let c_throughput_new_batch = c_received_quorum_payloads
113 .clone()
114 .count()
115 .continue_unless(c_stats_output_timer.clone())
116 .map(q!(|batch_size| (batch_size, false)));
117
118 let c_throughput_reset = c_stats_output_timer
119 .clone()
120 .map(q!(|_| (0, true)))
121 .defer_tick();
122
123 let c_throughput = c_throughput_new_batch
124 .union(c_throughput_reset)
125 .all_ticks()
126 .fold(
127 q!(|| (0, { stats_ci::mean::Arithmetic::new() })),
128 q!(|(total, stats), (batch_size, reset)| {
129 if reset {
130 if *total > 0 {
131 stats.extend(&[*total as f64]).unwrap();
132 }
133
134 *total = 0;
135 } else {
136 *total += batch_size;
137 }
138 }),
139 )
140 .map(q!(|(_, stats)| { stats }));
141
142 BenchResult {
143 latency_histogram: c_latencies,
144 throughput: c_throughput,
145 }
146}
147
148pub fn print_bench_results<Client>(results: BenchResult<Client>) {
151 unsafe {
152 results
154 .latency_histogram
155 .sample_every(q!(Duration::from_millis(1000)))
156 }
157 .for_each(q!(move |latencies| {
158 let latencies = latencies.borrow();
159 println!(
160 "Latency p50: {:.3} | p99 {:.3} ms | p999 {:.3} ms ({:} samples)",
161 Duration::from_nanos(latencies.value_at_quantile(0.5)).as_micros() as f64 / 1000.0,
162 Duration::from_nanos(latencies.value_at_quantile(0.99)).as_micros() as f64 / 1000.0,
163 Duration::from_nanos(latencies.value_at_quantile(0.999)).as_micros() as f64 / 1000.0,
164 latencies.len()
165 );
166 }));
167
168 unsafe {
169 results
171 .throughput
172 .sample_every(q!(Duration::from_millis(1000)))
173 }
174 .for_each(q!(move |throughputs| {
175 let confidence = Confidence::new(0.99);
176
177 if throughputs.sample_count() >= 2 {
178 if let Ok(interval) = throughputs.ci_mean(confidence) {
180 if let Some(lower) = interval.left() {
181 if let Some(upper) = interval.right() {
182 println!(
183 "Throughput 99% interval: {:.2} - {:.2} requests/s",
184 lower, upper
185 );
186 }
187 }
188 }
189 }
190 }));
191}