hydro_test/cluster/
paxos_bench.rs

1use hydro_lang::*;
2use hydro_std::bench_client::{bench_client, print_bench_results};
3use hydro_std::quorum::collect_quorum;
4
5use super::kv_replica::{KvPayload, Replica, kv_replica};
6use super::paxos_with_client::PaxosLike;
7
8pub struct Client;
9pub struct Aggregator;
10
11#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
12pub fn paxos_bench<'a>(
13    num_clients_per_node: usize,
14    checkpoint_frequency: usize, // How many sequence numbers to commit before checkpointing
15    f: usize, /* Maximum number of faulty nodes. A payload has been processed once f+1 replicas have processed it. */
16    num_replicas: usize,
17    paxos: impl PaxosLike<'a>,
18    clients: &Cluster<'a, Client>,
19    client_aggregator: &Process<'a, Aggregator>,
20    replicas: &Cluster<'a, Replica>,
21) {
22    let paxos_processor = |c_to_proposers: Stream<(u32, u32), Cluster<'a, Client>, Unbounded>| {
23        let payloads = c_to_proposers.map(q!(move |(key, value)| KvPayload {
24            key,
25            // we use our ID as part of the value and use that so the replica only notifies us
26            value: (CLUSTER_SELF_ID, value)
27        }));
28
29        let acceptors = paxos.log_stores().clone();
30        let (acceptor_checkpoint_complete, acceptor_checkpoint) =
31            acceptors.forward_ref::<Optional<_, _, _>>();
32
33        let sequenced_payloads = paxos.with_client(
34            clients,
35            payloads,
36            acceptor_checkpoint,
37            // TODO(shadaj): we should retry when a payload is dropped due to stale leader
38            nondet!(/** benchmarking, assuming no re-election */),
39            nondet!(
40                /// clients 'own' certain keys, so interleaving elements from clients will not affect
41                /// the order of writes to the same key
42            ),
43        );
44
45        let sequenced_to_replicas = sequenced_payloads
46            .broadcast_bincode(replicas, nondet!(/** TODO */))
47            .values();
48
49        // Replicas
50        let (replica_checkpoint, processed_payloads) =
51            kv_replica(replicas, sequenced_to_replicas, checkpoint_frequency);
52
53        // Get the latest checkpoint sequence per replica
54        let checkpoint_tick = acceptors.tick();
55        let a_checkpoint = {
56            // TODO(shadaj): once we can reduce keyed over unbounded streams, this should be safe
57            let a_checkpoint_largest_seqs = replica_checkpoint
58                .broadcast_bincode(&acceptors, nondet!(/** TODO */))
59                .entries()
60                .into_keyed()
61                .reduce_commutative(q!(|curr_seq, seq| {
62                    if seq > *curr_seq {
63                        *curr_seq = seq;
64                    }
65                }))
66                .snapshot(
67                    &checkpoint_tick,
68                    nondet!(
69                        /// even though we batch the checkpoint messages, because we reduce over the entire history,
70                        /// the final min checkpoint is deterministic
71                    ),
72                );
73
74            let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs
75                .clone()
76                .key_count()
77                .filter_map(q!(move |num_received| if num_received == f + 1 {
78                    Some(true)
79                } else {
80                    None
81                }));
82
83            // Find the smallest checkpoint seq that everyone agrees to
84            a_checkpoint_largest_seqs
85                .entries()
86                .continue_if(a_checkpoints_quorum_reached)
87                .map(q!(|(_sender, seq)| seq))
88                .min()
89                .latest()
90        };
91
92        acceptor_checkpoint_complete.complete(a_checkpoint);
93
94        let c_received_payloads = processed_payloads
95            .map(q!(|payload| (
96                payload.value.0,
97                ((payload.key, payload.value.1), Ok(()))
98            )))
99            .demux_bincode(clients)
100            .values();
101
102        // we only mark a transaction as committed when all replicas have applied it
103        collect_quorum::<_, _, _, ()>(
104            c_received_payloads.atomic(&clients.tick()),
105            f + 1,
106            num_replicas,
107        )
108        .0
109        .end_atomic()
110    };
111
112    let bench_results = bench_client(
113        clients,
114        inc_u32_workload_generator,
115        paxos_processor,
116        num_clients_per_node,
117        nondet!(/** bench */),
118    );
119
120    print_bench_results(bench_results, client_aggregator, clients);
121}
122
123/// Generates an incrementing u32 for each virtual client ID, starting at 0
124pub fn inc_u32_workload_generator<'a, Client>(
125    _client: &Cluster<'a, Client>,
126    payload_request: Stream<(u32, Option<u32>), Cluster<'a, Client>, Unbounded, NoOrder>,
127) -> Stream<(u32, u32), Cluster<'a, Client>, Unbounded, NoOrder> {
128    payload_request.map(q!(move |(virtual_id, payload)| {
129        let value = if let Some(payload) = payload {
130            payload + 1
131        } else {
132            0
133        };
134        (virtual_id, value)
135    }))
136}
137
138#[cfg(test)]
139mod tests {
140    use dfir_lang::graph::WriteConfig;
141    use hydro_deploy::Deployment;
142    use hydro_lang::deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost};
143
144    use crate::cluster::paxos::{CorePaxos, PaxosConfig};
145
146    const PAXOS_F: usize = 1;
147
148    #[cfg(stageleft_runtime)]
149    fn create_paxos<'a>(
150        proposers: &hydro_lang::Cluster<'a, crate::cluster::paxos::Proposer>,
151        acceptors: &hydro_lang::Cluster<'a, crate::cluster::paxos::Acceptor>,
152        clients: &hydro_lang::Cluster<'a, super::Client>,
153        client_aggregator: &hydro_lang::Process<'a, super::Aggregator>,
154        replicas: &hydro_lang::Cluster<'a, crate::cluster::kv_replica::Replica>,
155    ) {
156        super::paxos_bench(
157            100,
158            1000,
159            PAXOS_F,
160            PAXOS_F + 1,
161            CorePaxos {
162                proposers: proposers.clone(),
163                acceptors: acceptors.clone(),
164                paxos_config: PaxosConfig {
165                    f: 1,
166                    i_am_leader_send_timeout: 5,
167                    i_am_leader_check_timeout: 10,
168                    i_am_leader_check_timeout_delay_multiplier: 15,
169                },
170            },
171            clients,
172            client_aggregator,
173            replicas,
174        );
175    }
176
177    #[test]
178    fn paxos_ir() {
179        let builder = hydro_lang::FlowBuilder::new();
180        let proposers = builder.cluster();
181        let acceptors = builder.cluster();
182        let clients = builder.cluster();
183        let client_aggregator = builder.process();
184        let replicas = builder.cluster();
185
186        create_paxos(
187            &proposers,
188            &acceptors,
189            &clients,
190            &client_aggregator,
191            &replicas,
192        );
193        let built = builder.with_default_optimize::<HydroDeploy>();
194
195        hydro_lang::ir::dbg_dedup_tee(|| {
196            hydro_build_utils::assert_debug_snapshot!(built.ir());
197        });
198
199        let preview = built.preview_compile();
200        hydro_build_utils::insta::with_settings!({
201            snapshot_suffix => "proposer_mermaid"
202        }, {
203            hydro_build_utils::assert_snapshot!(
204                preview.dfir_for(&proposers).to_mermaid(&WriteConfig {
205                    no_subgraphs: true,
206                    no_pull_push: true,
207                    no_handoffs: true,
208                    op_text_no_imports: true,
209                    ..WriteConfig::default()
210                })
211            );
212        });
213        hydro_build_utils::insta::with_settings!({
214            snapshot_suffix => "acceptor_mermaid"
215        }, {
216            hydro_build_utils::assert_snapshot!(
217                preview.dfir_for(&acceptors).to_mermaid(&WriteConfig {
218                    no_subgraphs: true,
219                    no_pull_push: true,
220                    no_handoffs: true,
221                    op_text_no_imports: true,
222                    ..WriteConfig::default()
223                })
224            );
225        });
226    }
227
228    #[tokio::test]
229    async fn paxos_some_throughput() {
230        let builder = hydro_lang::FlowBuilder::new();
231        let proposers = builder.cluster();
232        let acceptors = builder.cluster();
233        let clients = builder.cluster();
234        let client_aggregator = builder.process();
235        let replicas = builder.cluster();
236
237        create_paxos(
238            &proposers,
239            &acceptors,
240            &clients,
241            &client_aggregator,
242            &replicas,
243        );
244        let mut deployment = Deployment::new();
245
246        let nodes = builder
247            .with_cluster(
248                &proposers,
249                (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
250            )
251            .with_cluster(
252                &acceptors,
253                (0..2 * PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
254            )
255            .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())])
256            .with_process(
257                &client_aggregator,
258                TrybuildHost::new(deployment.Localhost()),
259            )
260            .with_cluster(
261                &replicas,
262                (0..PAXOS_F + 1).map(|_| TrybuildHost::new(deployment.Localhost())),
263            )
264            .deploy(&mut deployment);
265
266        deployment.deploy().await.unwrap();
267
268        let client_node = &nodes.get_process(&client_aggregator);
269        let client_out = client_node.stdout_filter("Throughput:").await;
270
271        deployment.start().await.unwrap();
272
273        use std::str::FromStr;
274
275        use regex::Regex;
276
277        let re = Regex::new(r"Throughput: ([^ ]+) - ([^ ]+) - ([^ ]+) requests/s").unwrap();
278        let mut found = 0;
279        let mut client_out = client_out;
280        while let Some(line) = client_out.recv().await {
281            if let Some(caps) = re.captures(&line)
282                && let Ok(lower) = f64::from_str(&caps[1])
283                && 0.0 < lower
284            {
285                println!("Found throughput lower-bound: {}", lower);
286                found += 1;
287                if found == 2 {
288                    break;
289                }
290            }
291        }
292    }
293}