hydro_std/bench_client/
mod.rs

1use std::cell::RefCell;
2use std::rc::Rc;
3use std::time::{Duration, Instant};
4
5use hdrhistogram::Histogram;
6use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
7use hydro_lang::*;
8use serde::{Deserialize, Serialize};
9
10pub mod rolling_average;
11use rolling_average::RollingAverage;
12
13use crate::membership::track_membership;
14
15pub struct SerializableHistogramWrapper {
16    pub histogram: Rc<RefCell<Histogram<u64>>>,
17}
18
19impl Serialize for SerializableHistogramWrapper {
20    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
21        let mut vec = Vec::new();
22        V2Serializer::new()
23            .serialize(&self.histogram.borrow(), &mut vec)
24            .unwrap();
25        serializer.serialize_bytes(&vec)
26    }
27}
28impl<'a> Deserialize<'a> for SerializableHistogramWrapper {
29    fn deserialize<D: serde::Deserializer<'a>>(deserializer: D) -> Result<Self, D::Error> {
30        let mut bytes: &[u8] = Deserialize::deserialize(deserializer)?;
31        let mut histogram = Deserializer::new().deserialize(&mut bytes).unwrap();
32        // Allow auto-resizing to prevent error when combining
33        histogram.auto(true);
34        Ok(SerializableHistogramWrapper {
35            histogram: Rc::new(RefCell::new(histogram)),
36        })
37    }
38}
39pub struct BenchResult<'a, Client> {
40    pub latency_histogram: Singleton<Rc<RefCell<Histogram<u64>>>, Cluster<'a, Client>, Unbounded>,
41    pub throughput: Singleton<RollingAverage, Cluster<'a, Client>, Unbounded>,
42}
43
44/// Benchmarks transactional workloads by concurrently submitting workloads
45/// (up to `num_clients_per_node` per machine), measuring the latency
46/// of each transaction and throughput over the entire workload.
47/// * `workload_generator` - Generates a payload `P` for each virtual client
48/// * `transaction_cycle` - Processes the payloads and returns after processing
49///
50/// # Non-Determinism
51/// This function uses non-deterministic wall-clock windows for measuring throughput.
52pub fn bench_client<'a, Client, Payload>(
53    clients: &Cluster<'a, Client>,
54    workload_generator: impl FnOnce(
55        &Cluster<'a, Client>,
56        Stream<(u32, Option<Payload>), Cluster<'a, Client>, Unbounded, NoOrder>,
57    )
58        -> Stream<(u32, Payload), Cluster<'a, Client>, Unbounded, NoOrder>,
59    transaction_cycle: impl FnOnce(
60        Stream<(u32, Payload), Cluster<'a, Client>, Unbounded>,
61    )
62        -> Stream<(u32, Payload), Cluster<'a, Client>, Unbounded, NoOrder>,
63    num_clients_per_node: usize,
64    nondet_throughput_window: NonDet,
65) -> BenchResult<'a, Client>
66where
67    Payload: Clone,
68{
69    let client_tick = clients.tick();
70
71    // Set up an initial set of payloads on the first tick
72    let start_this_tick = client_tick.optional_first_tick(q!(()));
73
74    let c_new_payloads_on_start = start_this_tick.clone().flat_map_ordered(q!(move |_| (0
75        ..num_clients_per_node as u32)
76        .map(move |virtual_id| { (virtual_id, None) })));
77
78    let (c_to_proposers_complete_cycle, c_to_proposers) =
79        clients.forward_ref::<Stream<_, _, _, TotalOrder>>();
80
81    // Whenever all replicas confirm that a payload was committed, send another payload
82    let c_received_quorum_payloads = transaction_cycle(c_to_proposers)
83        .batch(
84            &client_tick,
85            nondet!(
86                /// because the transaction processor is required to handle arbitrary reordering
87                /// across *different* keys, we are safe because delaying a transaction result for a key
88                /// will only affect when the next request for that key is emitted with respect to other keys
89            ),
90        )
91        .map(q!(|(virtual_id, payload)| (virtual_id, Some(payload))));
92
93    let c_new_payloads = workload_generator(
94        clients,
95        c_new_payloads_on_start
96            .chain(c_received_quorum_payloads.clone())
97            .all_ticks(),
98    );
99    c_to_proposers_complete_cycle.complete(c_new_payloads.assume_ordering::<TotalOrder>(nondet!(
100        /// We don't send a new write for the same key until the previous one is committed,
101        /// so this contains only a single write per key, and we don't care about order
102        /// across keys.
103    )));
104
105    // Track statistics
106    let (c_timers_complete_cycle, c_timers) =
107        client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
108    let c_new_timers_when_leader_elected = start_this_tick
109        .map(q!(|_| Instant::now()))
110        .flat_map_ordered(q!(
111            move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
112        ));
113    let c_updated_timers = c_received_quorum_payloads
114        .clone()
115        .map(q!(|(key, _payload)| (key as usize, Instant::now())));
116    let c_new_timers = c_timers
117        .clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency)
118        .chain(c_new_timers_when_leader_elected)
119        .chain(c_updated_timers.clone())
120        .into_keyed()
121        .reduce_commutative(q!(|curr_time, new_time| {
122            if new_time > *curr_time {
123                *curr_time = new_time;
124            }
125        }))
126        .entries();
127    c_timers_complete_cycle.complete_next_tick(c_new_timers);
128
129    let c_latencies = c_timers
130        .join(c_updated_timers)
131        .map(q!(
132            |(_virtual_id, (prev_time, curr_time))| curr_time.duration_since(prev_time)
133        ))
134        .all_ticks()
135        .fold_commutative(
136            q!(move || Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap()))),
137            q!(move |latencies, latency| {
138                latencies
139                    .borrow_mut()
140                    .record(latency.as_nanos() as u64)
141                    .unwrap();
142            }),
143        );
144
145    let c_stats_output_timer = clients
146        .source_interval(q!(Duration::from_secs(1)), nondet_throughput_window)
147        .batch(&client_tick, nondet_throughput_window)
148        .first();
149
150    let c_throughput_new_batch = c_received_quorum_payloads
151        .count()
152        .continue_unless(c_stats_output_timer.clone())
153        .map(q!(|batch_size| (batch_size, false)));
154
155    let c_throughput_reset = c_stats_output_timer.map(q!(|_| (0, true))).defer_tick();
156
157    let c_throughput = c_throughput_new_batch
158        .union(c_throughput_reset)
159        .all_ticks()
160        .fold(
161            q!(|| (0, { RollingAverage::new() })),
162            q!(|(total, stats), (batch_size, reset)| {
163                if reset {
164                    if *total > 0 {
165                        stats.add_sample(*total as f64);
166                    }
167
168                    *total = 0;
169                } else {
170                    *total += batch_size;
171                }
172            }),
173        )
174        .map(q!(|(_, stats)| { stats }));
175
176    BenchResult {
177        latency_histogram: c_latencies,
178        throughput: c_throughput,
179    }
180}
181
182/// Prints transaction latency and throughput results to stdout,
183/// with percentiles for latency and a confidence interval for throughput.
184pub fn print_bench_results<'a, Client: 'a, Aggregator>(
185    results: BenchResult<'a, Client>,
186    aggregator: &Process<'a, Aggregator>,
187    clients: &Cluster<'a, Client>,
188) {
189    let nondet_client_count = nondet!(/** client count is stable in bench */);
190    let nondet_sampling = nondet!(/** non-deterministic samping only affects logging */);
191    let print_tick = aggregator.tick();
192    let client_members = aggregator.source_cluster_members(clients);
193    let client_count = track_membership(client_members)
194        .key_count()
195        .snapshot(&print_tick, nondet_client_count);
196
197    let keyed_throughputs = results
198        .throughput
199        .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
200        .send_bincode(aggregator);
201
202    let latest_throughputs = keyed_throughputs
203        .reduce_idempotent(q!(|combined, new| {
204            *combined = new;
205        }))
206        // Remove throughputs from clients that have yet to actually record process
207        .filter(q!(|throughputs| throughputs.sample_mean() > 0.0));
208
209    let clients_with_throughputs_count = latest_throughputs
210        .clone()
211        .key_count()
212        .snapshot(&print_tick, nondet_client_count);
213
214    let waiting_for_clients = client_count
215        .clone()
216        .zip(clients_with_throughputs_count)
217        .filter_map(q!(|(num_clients, num_clients_with_throughput)| {
218            if num_clients > num_clients_with_throughput {
219                Some(num_clients - num_clients_with_throughput)
220            } else {
221                None
222            }
223        }));
224
225    waiting_for_clients
226        .clone()
227        .all_ticks()
228        .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
229        .for_each(q!(|num_clients_not_responded| println!(
230            "Awaiting {} clients",
231            num_clients_not_responded
232        )));
233
234    let combined_throughputs = latest_throughputs
235        .snapshot(&aggregator.tick(), nondet_sampling)
236        .values()
237        .reduce_commutative(q!(|combined, new| {
238            combined.add(new);
239        }))
240        .latest();
241
242    combined_throughputs
243        .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
244        .batch(&print_tick, nondet_client_count)
245        .cross_singleton(client_count.clone())
246        .continue_unless(waiting_for_clients.clone())
247        .all_ticks()
248        .for_each(q!(move |(throughputs, num_client_machines)| {
249            if throughputs.sample_count() >= 2 {
250                let mean = throughputs.sample_mean() * num_client_machines as f64;
251
252                if let Some((lower, upper)) = throughputs.confidence_interval_99() {
253                    println!(
254                        "Throughput: {:.2} - {:.2} - {:.2} requests/s",
255                        lower * num_client_machines as f64,
256                        mean,
257                        upper * num_client_machines as f64
258                    );
259                }
260            }
261        }));
262
263    let keyed_latencies = results
264        .latency_histogram
265        .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
266        .map(q!(|latencies| {
267            SerializableHistogramWrapper {
268                histogram: latencies,
269            }
270        }))
271        .send_bincode(aggregator);
272
273    let combined_latencies = keyed_latencies
274        .map(q!(|histogram| histogram.histogram.borrow().clone()))
275        .reduce_idempotent(q!(|combined, new| {
276            // get the most recent histogram for each client
277            *combined = new;
278        }))
279        .snapshot(&aggregator.tick(), nondet_sampling)
280        .values()
281        .reduce_commutative(q!(|combined, new| {
282            combined.add(new).unwrap();
283        }))
284        .latest();
285
286    combined_latencies
287        .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
288        .batch(&print_tick, nondet_client_count)
289        .continue_unless(waiting_for_clients)
290        .all_ticks()
291        .for_each(q!(move |latencies| {
292            println!(
293                "Latency p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({:} samples)",
294                Duration::from_nanos(latencies.value_at_quantile(0.5)).as_micros() as f64 / 1000.0,
295                Duration::from_nanos(latencies.value_at_quantile(0.99)).as_micros() as f64 / 1000.0,
296                Duration::from_nanos(latencies.value_at_quantile(0.999)).as_micros() as f64
297                    / 1000.0,
298                latencies.len()
299            );
300        }));
301}