Skip to main content

hydro_std/bench_client/
mod.rs

1use std::cell::RefCell;
2use std::rc::Rc;
3use std::time::{Duration, Instant};
4
5use hdrhistogram::Histogram;
6use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
7use hydro_lang::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
8use hydro_lang::prelude::*;
9use serde::{Deserialize, Serialize};
10
11pub mod rolling_average;
12
13pub struct SerializableHistogramWrapper {
14    pub histogram: Rc<RefCell<Histogram<u64>>>,
15}
16
17impl Serialize for SerializableHistogramWrapper {
18    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
19        let mut vec = Vec::new();
20        V2Serializer::new()
21            .serialize(&self.histogram.borrow(), &mut vec)
22            .unwrap();
23        serializer.serialize_bytes(&vec)
24    }
25}
26impl<'a> Deserialize<'a> for SerializableHistogramWrapper {
27    fn deserialize<D: serde::Deserializer<'a>>(deserializer: D) -> Result<Self, D::Error> {
28        let mut bytes: &[u8] = Deserialize::deserialize(deserializer)?;
29        let mut histogram = Deserializer::new().deserialize(&mut bytes).unwrap();
30        // Allow auto-resizing to prevent error when combining
31        histogram.auto(true);
32        Ok(SerializableHistogramWrapper {
33            histogram: Rc::new(RefCell::new(histogram)),
34        })
35    }
36}
37
38pub struct BenchResult<L> {
39    pub latency_histogram:
40        Stream<Rc<RefCell<Histogram<u64>>>, L, Unbounded, TotalOrder, ExactlyOnce>,
41    pub throughput: Stream<usize, L, Unbounded, TotalOrder, ExactlyOnce>,
42}
43
44/// Benchmarks transactional workloads by concurrently submitting workloads
45/// (up to `num_clients_per_node` per machine)
46/// * `num_clients_per_node`: Number of virtual clients per machine
47/// * `workload_generator`: Converts previous output (or None, for new virtual clients) into the next input payload
48/// * `protocol`: The protocol to benchmark
49///
50/// ## Returns
51/// A stream of latencies per completed client request
52pub fn bench_client<'a, Client, Input, Output>(
53    clients: &Cluster<'a, Client>,
54    num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
55    workload_generator: impl FnOnce(
56        KeyedStream<u32, Option<Output>, Cluster<'a, Client>, Unbounded, NoOrder>,
57    )
58        -> KeyedStream<u32, Input, Cluster<'a, Client>, Unbounded, NoOrder>,
59    protocol: impl FnOnce(
60        KeyedStream<u32, Input, Cluster<'a, Client>, Unbounded, NoOrder>,
61    ) -> KeyedStream<u32, Output, Cluster<'a, Client>, Unbounded, NoOrder>,
62) -> KeyedStream<u32, (Output, Duration), Cluster<'a, Client>, Unbounded, NoOrder>
63where
64    Input: Clone,
65    Output: Clone,
66{
67    let new_payload_ids = sliced! {
68        let num_clients_per_node = use(num_clients_per_node, nondet!(/** This is a constant */));
69        let mut next_virtual_client = use::state(|l| Optional::from(l.singleton(q!((0u32, None)))));
70
71        // Set up virtual clients - spawn new ones each tick until we reach the limit
72        let new_virtual_client = next_virtual_client.clone();
73        next_virtual_client = new_virtual_client
74            .clone()
75            .zip(num_clients_per_node)
76            .filter_map(q!(move |((virtual_id, _), num_clients_per_node)| {
77                assert!(num_clients_per_node > 0, "Must have at least 1 virtual client per node");
78                if virtual_id + 1 < num_clients_per_node as u32 {
79                    Some((virtual_id + 1, None))
80                } else {
81                    None
82                }
83            }),
84        );
85
86        new_virtual_client.into_stream().into_keyed()
87    };
88
89    let (protocol_outputs_complete, protocol_outputs) =
90        clients.forward_ref::<KeyedStream<u32, Output, Cluster<'a, Client>, Unbounded, NoOrder>>();
91    // Use new payload IDS and previous outputs to generate new payloads
92    let protocol_inputs = workload_generator(
93        new_payload_ids.merge_unordered(protocol_outputs.map(q!(|payload| Some(payload)))),
94    );
95    // Feed new payloads to the protocol
96    let protocol_outputs = protocol(protocol_inputs.clone());
97    protocol_outputs_complete.complete(protocol_outputs.clone());
98
99    // Persist start latency, overwrite on new value. Memory footprint = O(num_clients_per_node)
100    let start_times = protocol_inputs.fold(
101        q!(|| Instant::now()),
102        q!(
103            |curr, _new| {
104                *curr = Instant::now();
105            },
106            commutative = manual_proof!(/** The value will be thrown away */)
107        ),
108    );
109
110    sliced! {
111        let start_times = use(start_times, nondet!(/** Only one in-flight message per virtual client at any time, and outputs happen-after inputs, so if an output is received the start_times must contain its input time. */));
112        let current_outputs = use(protocol_outputs, nondet!(/** Batching is required to compare output to input time, but does not actually affect the result. */));
113
114        let end_times_and_output = current_outputs
115            .reduce(q!(|curr, new| {
116                    *curr = new;
117                },
118                commutative = manual_proof!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */)),
119            )
120            .map(q!(|output| (Instant::now(), output)));
121
122        start_times
123            .defer_tick() // Get the start_times before they were overwritten with the newly generated input
124            .join_keyed_singleton(end_times_and_output)
125            .map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time))))
126            .into_keyed_stream()
127            .weaken_ordering()
128    }
129}
130
131/// Computes the throughput and latency of transactions and outputs it every `interval_millis`.
132/// An output is produced even if there are no transactions.
133///
134/// # Non-Determinism
135/// This function uses non-deterministic wall-clock windows for measuring throughput.
136pub fn compute_throughput_latency<'a, Client: 'a>(
137    clients: &Cluster<'a, Client>,
138    latencies: Stream<Duration, Cluster<'a, Client>, Unbounded, NoOrder>,
139    interval_millis: u64,
140    nondet_measurement_window: NonDet,
141) -> BenchResult<Cluster<'a, Client>> {
142    let punctuation = clients.source_interval(
143        q!(Duration::from_millis(interval_millis)),
144        nondet_measurement_window,
145    );
146
147    let (interval_throughput, interval_latency) = sliced! {
148        let punctuation = use(punctuation, nondet_measurement_window);
149        let latencies = use(latencies, nondet_measurement_window);
150        let mut latency_histogram = use::state(|l| l.singleton(q!(Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap())))));
151        let mut throughput = use::state(|l| l.singleton(q!(0usize)));
152
153        let punctuation_option = punctuation.first();
154        let batched_latency_histogram = latencies.clone().fold(
155            q!(move || Histogram::<u64>::new(3).unwrap()),
156            q!(move |latencies, latency| {
157                    latencies
158                        .record(latency.as_nanos() as u64)
159                        .unwrap();
160                },
161                commutative = manual_proof!(/** adding elements to histogram is commutative */)
162            ),
163        );
164
165        // Output every punctuation
166        let interval_throughput = throughput.clone().filter_if(punctuation_option.clone().is_some());
167        let interval_latency = latency_histogram.clone().filter_if(punctuation_option.clone().is_some());
168
169        let batched_throughput = latencies.count();
170        // Clear every punctuation
171        let prev_throughput = throughput.filter_if(punctuation_option.clone().is_none());
172        // Merge new values
173        throughput = batched_throughput
174            .clone()
175            .zip(prev_throughput.clone())
176            .map(q!(|(new, old)| new + old))
177            .unwrap_or(batched_throughput.clone());
178
179        // Clear every punctuation
180        let prev_histogram = latency_histogram.filter_if(punctuation_option.is_none());
181        // Merge new values
182        latency_histogram = batched_latency_histogram
183            .clone()
184            .zip(prev_histogram.clone())
185            .map(q!(|(new, old)| {
186                old.borrow_mut().add(new).expect("Error adding value to histogram");
187                old
188            }))
189            .unwrap_or(batched_latency_histogram.map(q!(|histogram| Rc::new(RefCell::new(histogram)))));
190
191        (interval_throughput.into_stream(), interval_latency.into_stream())
192    };
193
194    BenchResult {
195        latency_histogram: interval_latency,
196        throughput: interval_throughput,
197    }
198}
199
200/// Returns transaction throughput and latency results.
201/// Aggregates results from clients and outputs every `output_interval_millis`.
202///
203/// Note: Inconsistent windowing may result in unexpected outputs unless `output_interval_millis` >> `interval_millis`.
204pub fn aggregate_bench_results<'a, Client: 'a, Aggregator>(
205    results: BenchResult<Cluster<'a, Client>>,
206    aggregator: &Process<'a, Aggregator>,
207    output_interval_millis: u64,
208) -> BenchResult<Process<'a, Aggregator>> {
209    let nondet_sampling = nondet!(/** non-deterministic samping only affects logging */);
210    let punctuation = aggregator.source_interval(
211        q!(Duration::from_millis(output_interval_millis)),
212        nondet_sampling,
213    );
214
215    let a_throughputs = results
216        .throughput
217        .send(aggregator, TCP.fail_stop().bincode())
218        .values();
219
220    let a_latencies = results
221        .latency_histogram
222        .map(q!(|latencies| {
223            SerializableHistogramWrapper {
224                histogram: latencies,
225            }
226        }))
227        .send(aggregator, TCP.fail_stop().bincode())
228        .values()
229        .map(q!(|wrapper| wrapper.histogram));
230
231    let (combined_throughputs, combined_latencies) = sliced! {
232        let punctuation = use(punctuation, nondet_sampling);
233        let a_throughputs = use(a_throughputs, nondet_sampling);
234        let a_latencies = use(a_latencies, nondet_sampling);
235        let mut latency_histogram = use::state(|l| l.singleton(q!(Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap())))));
236        let mut throughput = use::state(|l| l.singleton(q!(0usize)));
237
238        let punctuation_option = punctuation.first();
239
240        // Output every punctuation
241        let interval_throughput = throughput.clone().filter_if(punctuation_option.clone().is_some());
242        let interval_latency = latency_histogram.clone().filter_if(punctuation_option.clone().is_some());
243
244        // Clear every punctuation
245        let prev_throughput = throughput.filter_if(punctuation_option.clone().is_none()).into_stream();
246        // Merge new values
247        throughput = a_throughputs
248            .chain(prev_throughput)
249            .fold(q!(|| 0usize), q!(|curr, new| {
250                    *curr += new;
251                },
252                commutative = manual_proof!(/** Addition is commutative */)
253            ));
254
255        // Merge new values
256        let merged_new_histograms = a_latencies
257            .reduce(
258                q!(|curr, new| {
259                    curr.borrow_mut().add(&*new.borrow_mut()).expect("Error adding value to histogram");
260                },
261                commutative = manual_proof!(/** Merge is commutative */)
262            ));
263        // Clear every punctuation
264        latency_histogram = latency_histogram
265            .zip(merged_new_histograms.into_singleton())
266            .zip(punctuation_option.defer_tick().into_singleton())
267            .map(q!(|((old, new), reset)| {
268                if reset.is_some() {
269                    // Use replace instead of clear, since interval_latency is pointing to the Histogram too
270                    old.replace(Histogram::<u64>::new(3).unwrap());
271                }
272                if let Some(new) = new {
273                    old.borrow_mut().add(&*new.borrow_mut()).expect("Error adding value to histogram");
274                }
275                old
276            }));
277
278        (interval_throughput.into_stream(), interval_latency.into_stream())
279    };
280
281    BenchResult {
282        throughput: combined_throughputs,
283        latency_histogram: combined_latencies,
284    }
285}
286
287/// Pretty prints output of `aggregate_bench_results`.
288///
289/// Prints the throughput, and the 50th, 99th, and 99.9th percentile latencies.
290pub fn pretty_print_bench_results<'a, Aggregator>(
291    aggregate_results: BenchResult<Process<'a, Aggregator>>,
292) {
293    aggregate_results.throughput.for_each(q!(|throughput| {
294        println!("Throughput: {:.2} requests/s", throughput);
295    }));
296    aggregate_results
297        .latency_histogram
298        .map(q!(move |latencies| (
299            // Convert to milliseconds but include floating point (as_millis is for whole numbers only)
300            Duration::from_nanos(latencies.borrow().value_at_quantile(0.5)).as_micros() as f64
301                / 1000.0,
302            Duration::from_nanos(latencies.borrow().value_at_quantile(0.99)).as_micros() as f64
303                / 1000.0,
304            Duration::from_nanos(latencies.borrow().value_at_quantile(0.999)).as_micros() as f64
305                / 1000.0,
306            latencies.borrow().len(),
307        )))
308        .for_each(q!(move |(p50, p99, p999, num_samples)| {
309            println!(
310                "Latency p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({:} samples)",
311                p50, p99, p999, num_samples
312            );
313        }));
314}