hydro_test/cluster/
paxos_bench.rs

1use hydro_lang::*;
2use hydro_std::bench_client::{bench_client, print_bench_results};
3use hydro_std::quorum::collect_quorum;
4
5use super::kv_replica::{KvPayload, Replica, kv_replica};
6use super::paxos_with_client::PaxosLike;
7
8pub struct Client;
9
10pub fn paxos_bench<'a>(
11    num_clients_per_node: usize,
12    checkpoint_frequency: usize, // How many sequence numbers to commit before checkpointing
13    f: usize, /* Maximum number of faulty nodes. A payload has been processed once f+1 replicas have processed it. */
14    num_replicas: usize,
15    paxos: impl PaxosLike<'a>,
16    clients: &Cluster<'a, Client>,
17    replicas: &Cluster<'a, Replica>,
18) {
19    let paxos_processor = |c_to_proposers: Stream<(u32, u32), Cluster<'a, Client>, Unbounded>| {
20        let payloads = c_to_proposers.map(q!(move |(key, value)| KvPayload {
21            key,
22            // we use our ID as part of the value and use that so the replica only notifies us
23            value: (CLUSTER_SELF_ID, value)
24        }));
25
26        let acceptors = paxos.log_stores().clone();
27        let (acceptor_checkpoint_complete, acceptor_checkpoint) =
28            acceptors.forward_ref::<Optional<_, _, _>>();
29
30        let sequenced_payloads = unsafe {
31            // SAFETY: clients "own" certain keys, so interleaving elements from clients will not affect
32            // the order of writes to the same key
33
34            // TODO(shadaj): we should retry when a payload is dropped due to stale leader
35            paxos.with_client(clients, payloads, acceptor_checkpoint)
36        };
37
38        let sequenced_to_replicas = sequenced_payloads.broadcast_bincode_anonymous(replicas);
39
40        // Replicas
41        let (replica_checkpoint, processed_payloads) =
42            kv_replica(replicas, sequenced_to_replicas, checkpoint_frequency);
43
44        // Get the latest checkpoint sequence per replica
45        let checkpoint_tick = acceptors.tick();
46        let a_checkpoint = unsafe {
47            // SAFETY: even though we batch the checkpoint messages, because we reduce over the entire history,
48            // the final min checkpoint is deterministic
49            // TODO(shadaj): once we can reduce keyed over unbounded streams, this should be safe
50
51            let a_checkpoint_largest_seqs = replica_checkpoint
52                .broadcast_bincode(&acceptors)
53                .tick_batch(&checkpoint_tick)
54                .persist()
55                .reduce_keyed_commutative(q!(|curr_seq, seq| {
56                    if seq > *curr_seq {
57                        *curr_seq = seq;
58                    }
59                }));
60
61            let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs
62                .clone()
63                .count()
64                .filter_map(q!(move |num_received| if num_received == f + 1 {
65                    Some(true)
66                } else {
67                    None
68                }));
69
70            // Find the smallest checkpoint seq that everyone agrees to
71            a_checkpoint_largest_seqs
72                .continue_if(a_checkpoints_quorum_reached)
73                .map(q!(|(_sender, seq)| seq))
74                .min()
75                .latest()
76        };
77
78        acceptor_checkpoint_complete.complete(a_checkpoint);
79
80        let c_received_payloads = processed_payloads
81            .map(q!(|payload| (
82                payload.value.0,
83                ((payload.key, payload.value.1), Ok(()))
84            )))
85            .send_bincode_anonymous(clients);
86
87        // we only mark a transaction as committed when all replicas have applied it
88        collect_quorum::<_, _, _, ()>(
89            c_received_payloads.atomic(&clients.tick()),
90            f + 1,
91            num_replicas,
92        )
93        .0
94        .end_atomic()
95    };
96
97    let bench_results = unsafe { bench_client(clients, paxos_processor, num_clients_per_node) };
98
99    print_bench_results(bench_results);
100}
101
102#[cfg(test)]
103mod tests {
104    use dfir_rs::lang::graph::WriteConfig;
105    use hydro_lang::deploy::DeployRuntime;
106    use stageleft::RuntimeData;
107
108    use crate::cluster::paxos::{CorePaxos, PaxosConfig};
109
110    #[test]
111    fn paxos_ir() {
112        let builder = hydro_lang::FlowBuilder::new();
113        let proposers = builder.cluster();
114        let acceptors = builder.cluster();
115        let clients = builder.cluster();
116        let replicas = builder.cluster();
117
118        super::paxos_bench(
119            1,
120            1,
121            1,
122            2,
123            CorePaxos {
124                proposers: proposers.clone(),
125                acceptors: acceptors.clone(),
126                paxos_config: PaxosConfig {
127                    f: 1,
128                    i_am_leader_send_timeout: 1,
129                    i_am_leader_check_timeout: 1,
130                    i_am_leader_check_timeout_delay_multiplier: 1,
131                },
132            },
133            &clients,
134            &replicas,
135        );
136        let built = builder.with_default_optimize::<DeployRuntime>();
137
138        hydro_lang::ir::dbg_dedup_tee(|| {
139            insta::assert_debug_snapshot!(built.ir());
140        });
141
142        let preview = built.preview_compile();
143        insta::with_settings!({snapshot_suffix => "proposer_mermaid"}, {
144            insta::assert_snapshot!(
145                preview.dfir_for(&proposers).to_mermaid(&WriteConfig {
146                    no_subgraphs: true,
147                    no_varnames: false,
148                    no_pull_push: true,
149                    no_handoffs: true,
150                    no_references: false,
151                    op_short_text: false,
152                    op_text_no_imports: true,
153                })
154            );
155        });
156
157        let _ = built.compile(&RuntimeData::new("FAKE"));
158    }
159}