Skip to main content

hydro_test/cluster/
paxos_log_bench.rs

1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::location::cluster::CLUSTER_SELF_ID;
3use hydro_lang::prelude::*;
4use hydro_std::bench_client::{
5    BenchResult, aggregate_bench_results, bench_client, compute_throughput_latency,
6};
7
8use super::paxos_with_client::PaxosLike;
9use crate::cluster::paxos_bench::inc_i32_workload_generator;
10
11pub struct Client;
12pub struct Aggregator;
13
14#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
15pub fn paxos_log_bench<'a>(
16    checkpoint_frequency: usize, // How many sequence numbers to commit before checkpointing
17    paxos: impl PaxosLike<'a>,
18    clients: &Cluster<'a, Client>,
19    num_clients_per_node: Singleton<usize, Cluster<'a, Client>, Bounded>,
20    client_aggregator: &Process<'a, Aggregator>,
21    client_interval_millis: u64,
22    aggregate_interval_millis: u64,
23    print_results: impl FnOnce(BenchResult<Process<'a, Aggregator>>),
24) {
25    let latencies = bench_client(
26        clients,
27        num_clients_per_node,
28        inc_i32_workload_generator,
29        |input| {
30            let acceptors = paxos.log_stores().clone();
31            let (acceptor_checkpoint_complete, acceptor_checkpoint) =
32                acceptors.forward_ref::<Optional<_, _, _>>();
33
34            let sequenced_payloads = paxos.with_client(
35                clients,
36                input
37                    .entries()
38                    .map(q!(move |(virtual_id, payload)| {
39                        // Append Client ID so replicas know who to contact later
40                        (virtual_id, (CLUSTER_SELF_ID.clone(), payload))
41                    }))
42                    .assume_ordering(nondet!(/** benchmarking, order actually doesn't matter */)),
43                acceptor_checkpoint,
44                // TODO(shadaj): we should retry when a payload is dropped due to stale leader
45                nondet!(/** benchmarking, assuming no re-election */),
46                nondet!(
47                    /// clients 'own' certain keys, so interleaving elements from clients will not affect
48                    /// the order of writes to the same key
49                ),
50            );
51
52            // Leader sends committed payloads directly to client
53            let sequenced_to_clients = sequenced_payloads
54                .clone()
55                .map(q!(|(_seq, payload)| {
56                    let (virtual_id, (client_location, value)) = payload.unwrap();
57                    (client_location, (virtual_id, value))
58                }))
59                .demux(clients, TCP.fail_stop().bincode())
60                .values()
61                .into_keyed();
62
63            // Compute checkpoints on the leader
64            let nondet_log_holes = nondet!(/** Current max log sequence number dpeends on when the commit is confirmed */);
65            let p_checkpoint = sliced! {
66                let new_slots = use(sequenced_payloads.into_keyed().keys(), nondet_log_holes);
67                let mut log_holes = use::state_null::<Stream<usize, Tick<_>, Bounded, NoOrder>>();
68                let mut prev_checkpoint_slot = use::state::<Singleton<usize, Tick<_>, Bounded>>(|l| l.singleton(q!(0)));
69
70                // Min log hole = max contiguous slot
71                let max_contiguous_slot = log_holes.clone().min().unwrap_or_default();
72                let new_checkpoint = max_contiguous_slot
73                    .zip(prev_checkpoint_slot.clone())
74                    .filter_map(q!(move |(max_contiguous, prev_checkpoint)|
75                        (max_contiguous - prev_checkpoint >= checkpoint_frequency).then_some(max_contiguous)));
76                prev_checkpoint_slot = new_checkpoint.clone().unwrap_or(prev_checkpoint_slot);
77
78                // Calculate the new log holes
79                let max_log_hole = log_holes.clone().max().unwrap_or_default();
80                let max_new_slot = new_slots.clone().max();
81                // max_new_slot+2 because we want the next hole to be the largest slot + 1. The other +1 is because the range is exclusive
82                let new_potential_holes = max_log_hole
83                    .zip(max_new_slot)
84                    .flat_map_unordered(q!(|(max_hole, max_new_slot)| max_hole+1..max_new_slot+2));
85                let new_holes = new_potential_holes.chain(log_holes.clone())
86                    .filter_not_in(new_slots);
87                log_holes = new_holes;
88
89                new_checkpoint.into_stream()
90            };
91            let a_checkpoint = p_checkpoint
92                .broadcast(&acceptors, TCP.fail_stop().bincode(), nondet!(/** Acceptor membership is static */))
93                .values()
94                .max();
95            acceptor_checkpoint_complete.complete(a_checkpoint);
96
97            sequenced_to_clients
98        },
99    )
100    .entries()
101    .map(q!(|(_virtual_client_id, (_output, latency))| latency));
102
103    // Create throughput/latency graphs
104    let bench_results = compute_throughput_latency(
105        clients,
106        latencies,
107        client_interval_millis,
108        nondet!(/** bench */),
109    );
110    let aggregate_results =
111        aggregate_bench_results(bench_results, client_aggregator, aggregate_interval_millis);
112    print_results(aggregate_results);
113}
114
115#[cfg(test)]
116mod tests {
117    use hydro_deploy::Deployment;
118    use hydro_lang::deploy::{DeployCrateWrapper, TrybuildHost};
119
120    #[cfg(stageleft_runtime)]
121    use crate::cluster::paxos::{CorePaxos, PaxosConfig};
122
123    const PAXOS_F: usize = 1;
124
125    #[cfg(stageleft_runtime)]
126    fn create_paxos<'a>(
127        proposers: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Proposer>,
128        acceptors: &hydro_lang::location::Cluster<'a, crate::cluster::paxos::Acceptor>,
129        clients: &hydro_lang::location::Cluster<'a, super::Client>,
130        client_aggregator: &hydro_lang::location::Process<'a, super::Aggregator>,
131    ) {
132        use hydro_lang::location::Location;
133        use hydro_std::bench_client::pretty_print_bench_results;
134        use stageleft::q;
135
136        super::paxos_log_bench(
137            1000,
138            CorePaxos {
139                proposers: proposers.clone(),
140                acceptors: acceptors.clone(),
141                paxos_config: PaxosConfig {
142                    f: 1,
143                    i_am_leader_send_timeout: 5,
144                    i_am_leader_check_timeout: 10,
145                    i_am_leader_check_timeout_delay_multiplier: 15,
146                },
147            },
148            clients,
149            clients.singleton(q!(100usize)),
150            client_aggregator,
151            100,
152            1000,
153            pretty_print_bench_results,
154        );
155    }
156
157    #[tokio::test]
158    async fn paxos_log_some_throughput() {
159        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
160        let proposers = builder.cluster();
161        let acceptors = builder.cluster();
162        let clients = builder.cluster();
163        let client_aggregator = builder.process();
164
165        create_paxos(&proposers, &acceptors, &clients, &client_aggregator);
166        let mut deployment = Deployment::new();
167
168        let nodes = builder
169            .with_cluster(
170                &proposers,
171                (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
172            )
173            .with_cluster(
174                &acceptors,
175                (0..2 * PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
176            )
177            .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
178            .with_process(
179                &client_aggregator,
180                TrybuildHost::new(deployment.Localhost()),
181            )
182            .deploy(&mut deployment);
183
184        deployment.deploy().await.unwrap();
185
186        let client_node = &nodes.get_process(&client_aggregator);
187        let client_out = client_node.stdout_filter("Throughput:");
188
189        deployment.start().await.unwrap();
190
191        use std::str::FromStr;
192
193        use regex::Regex;
194
195        let re = Regex::new(r"Throughput: ([^ ]+) requests/s").unwrap();
196        let mut found = 0;
197        let mut client_out = client_out;
198        while let Some(line) = client_out.recv().await {
199            if let Some(caps) = re.captures(&line)
200                && let Ok(lower) = f64::from_str(&caps[1])
201                && 0.0 < lower
202            {
203                println!("Found throughput lower-bound: {}", lower);
204                found += 1;
205                if found == 2 {
206                    break;
207                }
208            }
209        }
210    }
211}