1use std::cell::RefCell;
2use std::rc::Rc;
3use std::time::{Duration, Instant};
4
5use hdrhistogram::Histogram;
6use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
7use hydro_lang::*;
8use serde::{Deserialize, Serialize};
9
10pub mod rolling_average;
11use rolling_average::RollingAverage;
12
13use crate::membership::track_membership;
14
15pub struct SerializableHistogramWrapper {
16 pub histogram: Rc<RefCell<Histogram<u64>>>,
17}
18
19impl Serialize for SerializableHistogramWrapper {
20 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
21 let mut vec = Vec::new();
22 V2Serializer::new()
23 .serialize(&self.histogram.borrow(), &mut vec)
24 .unwrap();
25 serializer.serialize_bytes(&vec)
26 }
27}
28impl<'a> Deserialize<'a> for SerializableHistogramWrapper {
29 fn deserialize<D: serde::Deserializer<'a>>(deserializer: D) -> Result<Self, D::Error> {
30 let mut bytes: &[u8] = Deserialize::deserialize(deserializer)?;
31 let mut histogram = Deserializer::new().deserialize(&mut bytes).unwrap();
32 histogram.auto(true);
34 Ok(SerializableHistogramWrapper {
35 histogram: Rc::new(RefCell::new(histogram)),
36 })
37 }
38}
39pub struct BenchResult<'a, Client> {
40 pub latency_histogram: Singleton<Rc<RefCell<Histogram<u64>>>, Cluster<'a, Client>, Unbounded>,
41 pub throughput: Singleton<RollingAverage, Cluster<'a, Client>, Unbounded>,
42}
43
44pub fn bench_client<'a, Client, Payload>(
53 clients: &Cluster<'a, Client>,
54 workload_generator: impl FnOnce(
55 &Cluster<'a, Client>,
56 Stream<(u32, Option<Payload>), Cluster<'a, Client>, Unbounded, NoOrder>,
57 )
58 -> Stream<(u32, Payload), Cluster<'a, Client>, Unbounded, NoOrder>,
59 transaction_cycle: impl FnOnce(
60 Stream<(u32, Payload), Cluster<'a, Client>, Unbounded>,
61 )
62 -> Stream<(u32, Payload), Cluster<'a, Client>, Unbounded, NoOrder>,
63 num_clients_per_node: usize,
64 nondet_throughput_window: NonDet,
65) -> BenchResult<'a, Client>
66where
67 Payload: Clone,
68{
69 let client_tick = clients.tick();
70
71 let start_this_tick = client_tick.optional_first_tick(q!(()));
73
74 let c_new_payloads_on_start = start_this_tick.clone().flat_map_ordered(q!(move |_| (0
75 ..num_clients_per_node as u32)
76 .map(move |virtual_id| { (virtual_id, None) })));
77
78 let (c_to_proposers_complete_cycle, c_to_proposers) =
79 clients.forward_ref::<Stream<_, _, _, TotalOrder>>();
80
81 let c_received_quorum_payloads = transaction_cycle(c_to_proposers)
83 .batch(
84 &client_tick,
85 nondet!(
86 ),
90 )
91 .map(q!(|(virtual_id, payload)| (virtual_id, Some(payload))));
92
93 let c_new_payloads = workload_generator(
94 clients,
95 c_new_payloads_on_start
96 .chain(c_received_quorum_payloads.clone())
97 .all_ticks(),
98 );
99 c_to_proposers_complete_cycle.complete(c_new_payloads.assume_ordering::<TotalOrder>(nondet!(
100 )));
104
105 let (c_timers_complete_cycle, c_timers) =
107 client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
108 let c_new_timers_when_leader_elected = start_this_tick
109 .map(q!(|_| Instant::now()))
110 .flat_map_ordered(q!(
111 move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
112 ));
113 let c_updated_timers = c_received_quorum_payloads
114 .clone()
115 .map(q!(|(key, _payload)| (key as usize, Instant::now())));
116 let c_new_timers = c_timers
117 .clone() .chain(c_new_timers_when_leader_elected)
119 .chain(c_updated_timers.clone())
120 .into_keyed()
121 .reduce_commutative(q!(|curr_time, new_time| {
122 if new_time > *curr_time {
123 *curr_time = new_time;
124 }
125 }))
126 .entries();
127 c_timers_complete_cycle.complete_next_tick(c_new_timers);
128
129 let c_latencies = c_timers
130 .join(c_updated_timers)
131 .map(q!(
132 |(_virtual_id, (prev_time, curr_time))| curr_time.duration_since(prev_time)
133 ))
134 .all_ticks()
135 .fold_commutative(
136 q!(move || Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap()))),
137 q!(move |latencies, latency| {
138 latencies
139 .borrow_mut()
140 .record(latency.as_nanos() as u64)
141 .unwrap();
142 }),
143 );
144
145 let c_stats_output_timer = clients
146 .source_interval(q!(Duration::from_secs(1)), nondet_throughput_window)
147 .batch(&client_tick, nondet_throughput_window)
148 .first();
149
150 let c_throughput_new_batch = c_received_quorum_payloads
151 .count()
152 .continue_unless(c_stats_output_timer.clone())
153 .map(q!(|batch_size| (batch_size, false)));
154
155 let c_throughput_reset = c_stats_output_timer.map(q!(|_| (0, true))).defer_tick();
156
157 let c_throughput = c_throughput_new_batch
158 .union(c_throughput_reset)
159 .all_ticks()
160 .fold(
161 q!(|| (0, { RollingAverage::new() })),
162 q!(|(total, stats), (batch_size, reset)| {
163 if reset {
164 if *total > 0 {
165 stats.add_sample(*total as f64);
166 }
167
168 *total = 0;
169 } else {
170 *total += batch_size;
171 }
172 }),
173 )
174 .map(q!(|(_, stats)| { stats }));
175
176 BenchResult {
177 latency_histogram: c_latencies,
178 throughput: c_throughput,
179 }
180}
181
182pub fn print_bench_results<'a, Client: 'a, Aggregator>(
185 results: BenchResult<'a, Client>,
186 aggregator: &Process<'a, Aggregator>,
187 clients: &Cluster<'a, Client>,
188) {
189 let nondet_client_count = nondet!();
190 let nondet_sampling = nondet!();
191 let print_tick = aggregator.tick();
192 let client_members = aggregator.source_cluster_members(clients);
193 let client_count = track_membership(client_members)
194 .key_count()
195 .snapshot(&print_tick, nondet_client_count);
196
197 let keyed_throughputs = results
198 .throughput
199 .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
200 .send_bincode(aggregator);
201
202 let latest_throughputs = keyed_throughputs
203 .reduce_idempotent(q!(|combined, new| {
204 *combined = new;
205 }))
206 .filter(q!(|throughputs| throughputs.sample_mean() > 0.0));
208
209 let clients_with_throughputs_count = latest_throughputs
210 .clone()
211 .key_count()
212 .snapshot(&print_tick, nondet_client_count);
213
214 let waiting_for_clients = client_count
215 .clone()
216 .zip(clients_with_throughputs_count)
217 .filter_map(q!(|(num_clients, num_clients_with_throughput)| {
218 if num_clients > num_clients_with_throughput {
219 Some(num_clients - num_clients_with_throughput)
220 } else {
221 None
222 }
223 }));
224
225 waiting_for_clients
226 .clone()
227 .all_ticks()
228 .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
229 .for_each(q!(|num_clients_not_responded| println!(
230 "Awaiting {} clients",
231 num_clients_not_responded
232 )));
233
234 let combined_throughputs = latest_throughputs
235 .snapshot(&aggregator.tick(), nondet_sampling)
236 .values()
237 .reduce_commutative(q!(|combined, new| {
238 combined.add(new);
239 }))
240 .latest();
241
242 combined_throughputs
243 .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
244 .batch(&print_tick, nondet_client_count)
245 .cross_singleton(client_count.clone())
246 .continue_unless(waiting_for_clients.clone())
247 .all_ticks()
248 .for_each(q!(move |(throughputs, num_client_machines)| {
249 if throughputs.sample_count() >= 2 {
250 let mean = throughputs.sample_mean() * num_client_machines as f64;
251
252 if let Some((lower, upper)) = throughputs.confidence_interval_99() {
253 println!(
254 "Throughput: {:.2} - {:.2} - {:.2} requests/s",
255 lower * num_client_machines as f64,
256 mean,
257 upper * num_client_machines as f64
258 );
259 }
260 }
261 }));
262
263 let keyed_latencies = results
264 .latency_histogram
265 .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
266 .map(q!(|latencies| {
267 SerializableHistogramWrapper {
268 histogram: latencies,
269 }
270 }))
271 .send_bincode(aggregator);
272
273 let combined_latencies = keyed_latencies
274 .map(q!(|histogram| histogram.histogram.borrow().clone()))
275 .reduce_idempotent(q!(|combined, new| {
276 *combined = new;
278 }))
279 .snapshot(&aggregator.tick(), nondet_sampling)
280 .values()
281 .reduce_commutative(q!(|combined, new| {
282 combined.add(new).unwrap();
283 }))
284 .latest();
285
286 combined_latencies
287 .sample_every(q!(Duration::from_millis(1000)), nondet_sampling)
288 .batch(&print_tick, nondet_client_count)
289 .continue_unless(waiting_for_clients)
290 .all_ticks()
291 .for_each(q!(move |latencies| {
292 println!(
293 "Latency p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({:} samples)",
294 Duration::from_nanos(latencies.value_at_quantile(0.5)).as_micros() as f64 / 1000.0,
295 Duration::from_nanos(latencies.value_at_quantile(0.99)).as_micros() as f64 / 1000.0,
296 Duration::from_nanos(latencies.value_at_quantile(0.999)).as_micros() as f64
297 / 1000.0,
298 latencies.len()
299 );
300 }));
301}