1use std::cell::RefCell;
2use std::rc::Rc;
3use std::time::{Duration, Instant};
4
5use hdrhistogram::Histogram;
6use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
7use hydro_lang::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
8use hydro_lang::prelude::*;
9use serde::{Deserialize, Serialize};
10
11pub mod rolling_average;
12
13pub struct SerializableHistogramWrapper {
14 pub histogram: Rc<RefCell<Histogram<u64>>>,
15}
16
17impl Serialize for SerializableHistogramWrapper {
18 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
19 let mut vec = Vec::new();
20 V2Serializer::new()
21 .serialize(&self.histogram.borrow(), &mut vec)
22 .unwrap();
23 serializer.serialize_bytes(&vec)
24 }
25}
26impl<'a> Deserialize<'a> for SerializableHistogramWrapper {
27 fn deserialize<D: serde::Deserializer<'a>>(deserializer: D) -> Result<Self, D::Error> {
28 let mut bytes: &[u8] = Deserialize::deserialize(deserializer)?;
29 let mut histogram = Deserializer::new().deserialize(&mut bytes).unwrap();
30 histogram.auto(true);
32 Ok(SerializableHistogramWrapper {
33 histogram: Rc::new(RefCell::new(histogram)),
34 })
35 }
36}
37
38pub struct BenchResult<L> {
39 pub latency_histogram:
40 Stream<Rc<RefCell<Histogram<u64>>>, L, Unbounded, TotalOrder, ExactlyOnce>,
41 pub throughput: Stream<usize, L, Unbounded, TotalOrder, ExactlyOnce>,
42}
43
44pub fn bench_client<'a, Client, Input, Output>(
53 clients: &Cluster<'a, Client>,
54 num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
55 workload_generator: impl FnOnce(
56 KeyedStream<u32, Option<Output>, Cluster<'a, Client>, Unbounded, NoOrder>,
57 )
58 -> KeyedStream<u32, Input, Cluster<'a, Client>, Unbounded, NoOrder>,
59 protocol: impl FnOnce(
60 KeyedStream<u32, Input, Cluster<'a, Client>, Unbounded, NoOrder>,
61 ) -> KeyedStream<u32, Output, Cluster<'a, Client>, Unbounded, NoOrder>,
62) -> KeyedStream<u32, (Output, Duration), Cluster<'a, Client>, Unbounded, NoOrder>
63where
64 Input: Clone,
65 Output: Clone,
66{
67 let new_payload_ids = sliced! {
68 let num_clients_per_node = use(num_clients_per_node, nondet!());
69 let mut next_virtual_client = use::state(|l| Optional::from(l.singleton(q!((0u32, None)))));
70
71 let new_virtual_client = next_virtual_client.clone();
73 next_virtual_client = new_virtual_client
74 .clone()
75 .zip(num_clients_per_node)
76 .filter_map(q!(move |((virtual_id, _), num_clients_per_node)| {
77 assert!(num_clients_per_node > 0, "Must have at least 1 virtual client per node");
78 if virtual_id + 1 < num_clients_per_node as u32 {
79 Some((virtual_id + 1, None))
80 } else {
81 None
82 }
83 }),
84 );
85
86 new_virtual_client.into_stream().into_keyed()
87 };
88
89 let (protocol_outputs_complete, protocol_outputs) =
90 clients.forward_ref::<KeyedStream<u32, Output, Cluster<'a, Client>, Unbounded, NoOrder>>();
91 let protocol_inputs = workload_generator(
93 new_payload_ids.merge_unordered(protocol_outputs.map(q!(|payload| Some(payload)))),
94 );
95 let protocol_outputs = protocol(protocol_inputs.clone());
97 protocol_outputs_complete.complete(protocol_outputs.clone());
98
99 let start_times = protocol_inputs.fold(
101 q!(|| Instant::now()),
102 q!(
103 |curr, _new| {
104 *curr = Instant::now();
105 },
106 commutative = manual_proof!()
107 ),
108 );
109
110 sliced! {
111 let start_times = use(start_times, nondet!());
112 let current_outputs = use(protocol_outputs, nondet!());
113
114 let end_times_and_output = current_outputs
115 .reduce(q!(|curr, new| {
116 *curr = new;
117 },
118 commutative = manual_proof!()),
119 )
120 .map(q!(|output| (Instant::now(), output)));
121
122 start_times
123 .defer_tick() .join_keyed_singleton(end_times_and_output)
125 .map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time))))
126 .into_keyed_stream()
127 .weaken_ordering()
128 }
129}
130
131pub fn compute_throughput_latency<'a, Client: 'a>(
137 clients: &Cluster<'a, Client>,
138 latencies: Stream<Duration, Cluster<'a, Client>, Unbounded, NoOrder>,
139 interval_millis: u64,
140 nondet_measurement_window: NonDet,
141) -> BenchResult<Cluster<'a, Client>> {
142 let punctuation = clients.source_interval(
143 q!(Duration::from_millis(interval_millis)),
144 nondet_measurement_window,
145 );
146
147 let (interval_throughput, interval_latency) = sliced! {
148 let punctuation = use(punctuation, nondet_measurement_window);
149 let latencies = use(latencies, nondet_measurement_window);
150 let mut latency_histogram = use::state(|l| l.singleton(q!(Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap())))));
151 let mut throughput = use::state(|l| l.singleton(q!(0usize)));
152
153 let punctuation_option = punctuation.first();
154 let batched_latency_histogram = latencies.clone().fold(
155 q!(move || Histogram::<u64>::new(3).unwrap()),
156 q!(move |latencies, latency| {
157 latencies
158 .record(latency.as_nanos() as u64)
159 .unwrap();
160 },
161 commutative = manual_proof!()
162 ),
163 );
164
165 let interval_throughput = throughput.clone().filter_if(punctuation_option.clone().is_some());
167 let interval_latency = latency_histogram.clone().filter_if(punctuation_option.clone().is_some());
168
169 let batched_throughput = latencies.count();
170 let prev_throughput = throughput.filter_if(punctuation_option.clone().is_none());
172 throughput = batched_throughput
174 .clone()
175 .zip(prev_throughput.clone())
176 .map(q!(|(new, old)| new + old))
177 .unwrap_or(batched_throughput.clone());
178
179 let prev_histogram = latency_histogram.filter_if(punctuation_option.is_none());
181 latency_histogram = batched_latency_histogram
183 .clone()
184 .zip(prev_histogram.clone())
185 .map(q!(|(new, old)| {
186 old.borrow_mut().add(new).expect("Error adding value to histogram");
187 old
188 }))
189 .unwrap_or(batched_latency_histogram.map(q!(|histogram| Rc::new(RefCell::new(histogram)))));
190
191 (interval_throughput.into_stream(), interval_latency.into_stream())
192 };
193
194 BenchResult {
195 latency_histogram: interval_latency,
196 throughput: interval_throughput,
197 }
198}
199
200pub fn aggregate_bench_results<'a, Client: 'a, Aggregator>(
205 results: BenchResult<Cluster<'a, Client>>,
206 aggregator: &Process<'a, Aggregator>,
207 output_interval_millis: u64,
208) -> BenchResult<Process<'a, Aggregator>> {
209 let nondet_sampling = nondet!();
210 let punctuation = aggregator.source_interval(
211 q!(Duration::from_millis(output_interval_millis)),
212 nondet_sampling,
213 );
214
215 let a_throughputs = results
216 .throughput
217 .send(aggregator, TCP.fail_stop().bincode())
218 .values();
219
220 let a_latencies = results
221 .latency_histogram
222 .map(q!(|latencies| {
223 SerializableHistogramWrapper {
224 histogram: latencies,
225 }
226 }))
227 .send(aggregator, TCP.fail_stop().bincode())
228 .values()
229 .map(q!(|wrapper| wrapper.histogram));
230
231 let (combined_throughputs, combined_latencies) = sliced! {
232 let punctuation = use(punctuation, nondet_sampling);
233 let a_throughputs = use(a_throughputs, nondet_sampling);
234 let a_latencies = use(a_latencies, nondet_sampling);
235 let mut latency_histogram = use::state(|l| l.singleton(q!(Rc::new(RefCell::new(Histogram::<u64>::new(3).unwrap())))));
236 let mut throughput = use::state(|l| l.singleton(q!(0usize)));
237
238 let punctuation_option = punctuation.first();
239
240 let interval_throughput = throughput.clone().filter_if(punctuation_option.clone().is_some());
242 let interval_latency = latency_histogram.clone().filter_if(punctuation_option.clone().is_some());
243
244 let prev_throughput = throughput.filter_if(punctuation_option.clone().is_none()).into_stream();
246 throughput = a_throughputs
248 .chain(prev_throughput)
249 .fold(q!(|| 0usize), q!(|curr, new| {
250 *curr += new;
251 },
252 commutative = manual_proof!()
253 ));
254
255 let merged_new_histograms = a_latencies
257 .reduce(
258 q!(|curr, new| {
259 curr.borrow_mut().add(&*new.borrow_mut()).expect("Error adding value to histogram");
260 },
261 commutative = manual_proof!()
262 ));
263 latency_histogram = latency_histogram
265 .zip(merged_new_histograms.into_singleton())
266 .zip(punctuation_option.defer_tick().into_singleton())
267 .map(q!(|((old, new), reset)| {
268 if reset.is_some() {
269 old.replace(Histogram::<u64>::new(3).unwrap());
271 }
272 if let Some(new) = new {
273 old.borrow_mut().add(&*new.borrow_mut()).expect("Error adding value to histogram");
274 }
275 old
276 }));
277
278 (interval_throughput.into_stream(), interval_latency.into_stream())
279 };
280
281 BenchResult {
282 throughput: combined_throughputs,
283 latency_histogram: combined_latencies,
284 }
285}
286
287pub fn pretty_print_bench_results<'a, Aggregator>(
291 aggregate_results: BenchResult<Process<'a, Aggregator>>,
292) {
293 aggregate_results.throughput.for_each(q!(|throughput| {
294 println!("Throughput: {:.2} requests/s", throughput);
295 }));
296 aggregate_results
297 .latency_histogram
298 .map(q!(move |latencies| (
299 Duration::from_nanos(latencies.borrow().value_at_quantile(0.5)).as_micros() as f64
301 / 1000.0,
302 Duration::from_nanos(latencies.borrow().value_at_quantile(0.99)).as_micros() as f64
303 / 1000.0,
304 Duration::from_nanos(latencies.borrow().value_at_quantile(0.999)).as_micros() as f64
305 / 1000.0,
306 latencies.borrow().len(),
307 )))
308 .for_each(q!(move |(p50, p99, p999, num_samples)| {
309 println!(
310 "Latency p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({:} samples)",
311 p50, p99, p999, num_samples
312 );
313 }));
314}