hydro_std/
bench_client.rs

1use std::cell::RefCell;
2use std::rc::Rc;
3use std::time::{Duration, Instant};
4
5use hdrhistogram::Histogram;
6use hydro_lang::*;
7use stats_ci::mean::Arithmetic;
8use stats_ci::{Confidence, StatisticsOps};
9
10pub struct BenchResult<'a, Client> {
11    pub latency_histogram: Singleton<Rc<RefCell<Histogram<u64>>>, Cluster<'a, Client>, Unbounded>,
12    pub throughput: Singleton<Arithmetic<f64>, Cluster<'a, Client>, Unbounded>,
13}
14
15/// Benchmarks transactional workloads by concurrently submitting workloads
16/// (up to `num_clients_per_node` per machine), measuring the latency
17/// of each transaction and throughput over the entire workload.
18///
19/// # Safety
20/// This function uses non-deterministic time-based samples, and also updates results
21/// at non-deterministic points in time.
22pub unsafe fn bench_client<'a, Client>(
23    clients: &Cluster<'a, Client>,
24    transaction_cycle: impl FnOnce(
25        Stream<(u32, u32), Cluster<'a, Client>, Unbounded>,
26    ) -> Stream<(u32, u32), Cluster<'a, Client>, Unbounded, NoOrder>,
27    num_clients_per_node: usize,
28) -> BenchResult<'a, Client> {
29    let client_tick = clients.tick();
30
31    // Set up an initial set of payloads on the first tick
32    let start_this_tick = client_tick.optional_first_tick(q!(()));
33
34    let c_new_payloads_on_start = start_this_tick.clone().flat_map_ordered(q!(move |_| (0
35        ..num_clients_per_node)
36        .map(move |i| (
37            (CLUSTER_SELF_ID.raw_id * (num_clients_per_node as u32)) + i as u32,
38            0
39        ))));
40
41    let (c_to_proposers_complete_cycle, c_to_proposers) =
42        clients.forward_ref::<Stream<_, _, _, TotalOrder>>();
43    let c_received_quorum_payloads = unsafe {
44        // SAFETY: because the transaction processor is required to handle arbitrary reordering
45        // across *different* keys, we are safe because delaying a transaction result for a key
46        // will only affect when the next request for that key is emitted with respect to other
47        // keys
48        transaction_cycle(c_to_proposers).tick_batch(&client_tick)
49    };
50
51    // Whenever all replicas confirm that a payload was committed, send another payload
52    let c_new_payloads_when_committed = c_received_quorum_payloads
53        .clone()
54        .map(q!(|payload| (payload.0, payload.1 + 1)));
55    c_to_proposers_complete_cycle.complete(
56        c_new_payloads_on_start
57            .chain(unsafe {
58                // SAFETY: we don't send a new write for the same key until the previous one is committed,
59                // so this contains only a single write per key, and we don't care about order
60                // across keys
61                c_new_payloads_when_committed.assume_ordering::<TotalOrder>()
62            })
63            .all_ticks(),
64    );
65
66    // Track statistics
67    let (c_timers_complete_cycle, c_timers) =
68        client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
69    let c_new_timers_when_leader_elected = start_this_tick
70        .map(q!(|_| Instant::now()))
71        .flat_map_ordered(q!(
72            move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
73        ));
74    let c_updated_timers = c_received_quorum_payloads
75        .clone()
76        .map(q!(|(key, _prev_count)| (key as usize, Instant::now())));
77    let c_new_timers = c_timers
78        .clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency)
79        .chain(c_new_timers_when_leader_elected)
80        .chain(c_updated_timers.clone())
81        .reduce_keyed_commutative(q!(|curr_time, new_time| {
82            if new_time > *curr_time {
83                *curr_time = new_time;
84            }
85        }));
86    c_timers_complete_cycle.complete_next_tick(c_new_timers);
87
88    let c_stats_output_timer = unsafe {
89        // SAFETY: intentionally sampling statistics
90        clients
91            .source_interval(q!(Duration::from_secs(1)))
92            .tick_batch(&client_tick)
93    }
94    .first();
95
96    let c_latencies = c_timers
97        .join(c_updated_timers)
98        .map(q!(
99            |(_virtual_id, (prev_time, curr_time))| curr_time.duration_since(prev_time)
100        ))
101        .all_ticks()
102        .fold_commutative(
103            q!(move || Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap()))),
104            q!(move |latencies, latency| {
105                latencies
106                    .borrow_mut()
107                    .record(latency.as_nanos() as u64)
108                    .unwrap();
109            }),
110        );
111
112    let c_throughput_new_batch = c_received_quorum_payloads
113        .clone()
114        .count()
115        .continue_unless(c_stats_output_timer.clone())
116        .map(q!(|batch_size| (batch_size, false)));
117
118    let c_throughput_reset = c_stats_output_timer
119        .clone()
120        .map(q!(|_| (0, true)))
121        .defer_tick();
122
123    let c_throughput = c_throughput_new_batch
124        .union(c_throughput_reset)
125        .all_ticks()
126        .fold(
127            q!(|| (0, { stats_ci::mean::Arithmetic::new() })),
128            q!(|(total, stats), (batch_size, reset)| {
129                if reset {
130                    if *total > 0 {
131                        stats.extend(&[*total as f64]).unwrap();
132                    }
133
134                    *total = 0;
135                } else {
136                    *total += batch_size;
137                }
138            }),
139        )
140        .map(q!(|(_, stats)| { stats }));
141
142    BenchResult {
143        latency_histogram: c_latencies,
144        throughput: c_throughput,
145    }
146}
147
148/// Prints transaction latency and throughput results to stdout,
149/// with percentiles for latency and a confidence interval for throughput.
150pub fn print_bench_results<Client>(results: BenchResult<Client>) {
151    unsafe {
152        // SAFETY: intentional non-determinism
153        results
154            .latency_histogram
155            .sample_every(q!(Duration::from_millis(1000)))
156    }
157    .for_each(q!(move |latencies| {
158        let latencies = latencies.borrow();
159        println!(
160            "Latency p50: {:.3} | p99 {:.3} ms | p999 {:.3} ms ({:} samples)",
161            Duration::from_nanos(latencies.value_at_quantile(0.5)).as_micros() as f64 / 1000.0,
162            Duration::from_nanos(latencies.value_at_quantile(0.99)).as_micros() as f64 / 1000.0,
163            Duration::from_nanos(latencies.value_at_quantile(0.999)).as_micros() as f64 / 1000.0,
164            latencies.len()
165        );
166    }));
167
168    unsafe {
169        // SAFETY: intentional non-determinism
170        results
171            .throughput
172            .sample_every(q!(Duration::from_millis(1000)))
173    }
174    .for_each(q!(move |throughputs| {
175        let confidence = Confidence::new(0.99);
176
177        if throughputs.sample_count() >= 2 {
178            // ci_mean crashes if there are fewer than two samples
179            if let Ok(interval) = throughputs.ci_mean(confidence) {
180                if let Some(lower) = interval.left() {
181                    if let Some(upper) = interval.right() {
182                        println!(
183                            "Throughput 99% interval: {:.2} - {:.2} requests/s",
184                            lower, upper
185                        );
186                    }
187                }
188            }
189        }
190    }));
191}