hydro_test/cluster/
paxos_bench.rs
1use hydro_lang::*;
2use hydro_std::bench_client::{bench_client, print_bench_results};
3use hydro_std::quorum::collect_quorum;
4
5use super::kv_replica::{KvPayload, Replica, kv_replica};
6use super::paxos_with_client::PaxosLike;
7
8pub struct Client;
9
10pub fn paxos_bench<'a>(
11 num_clients_per_node: usize,
12 checkpoint_frequency: usize, f: usize, num_replicas: usize,
15 paxos: impl PaxosLike<'a>,
16 clients: &Cluster<'a, Client>,
17 replicas: &Cluster<'a, Replica>,
18) {
19 let paxos_processor = |c_to_proposers: Stream<(u32, u32), Cluster<'a, Client>, Unbounded>| {
20 let payloads = c_to_proposers.map(q!(move |(key, value)| KvPayload {
21 key,
22 value: (CLUSTER_SELF_ID, value)
24 }));
25
26 let acceptors = paxos.log_stores().clone();
27 let (acceptor_checkpoint_complete, acceptor_checkpoint) =
28 acceptors.forward_ref::<Optional<_, _, _>>();
29
30 let sequenced_payloads = unsafe {
31 paxos.with_client(clients, payloads, acceptor_checkpoint)
36 };
37
38 let sequenced_to_replicas = sequenced_payloads.broadcast_bincode_anonymous(replicas);
39
40 let (replica_checkpoint, processed_payloads) =
42 kv_replica(replicas, sequenced_to_replicas, checkpoint_frequency);
43
44 let checkpoint_tick = acceptors.tick();
46 let a_checkpoint = unsafe {
47 let a_checkpoint_largest_seqs = replica_checkpoint
52 .broadcast_bincode(&acceptors)
53 .tick_batch(&checkpoint_tick)
54 .persist()
55 .reduce_keyed_commutative(q!(|curr_seq, seq| {
56 if seq > *curr_seq {
57 *curr_seq = seq;
58 }
59 }));
60
61 let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs
62 .clone()
63 .count()
64 .filter_map(q!(move |num_received| if num_received == f + 1 {
65 Some(true)
66 } else {
67 None
68 }));
69
70 a_checkpoint_largest_seqs
72 .continue_if(a_checkpoints_quorum_reached)
73 .map(q!(|(_sender, seq)| seq))
74 .min()
75 .latest()
76 };
77
78 acceptor_checkpoint_complete.complete(a_checkpoint);
79
80 let c_received_payloads = processed_payloads
81 .map(q!(|payload| (
82 payload.value.0,
83 ((payload.key, payload.value.1), Ok(()))
84 )))
85 .send_bincode_anonymous(clients);
86
87 collect_quorum::<_, _, _, ()>(
89 c_received_payloads.atomic(&clients.tick()),
90 f + 1,
91 num_replicas,
92 )
93 .0
94 .end_atomic()
95 };
96
97 let bench_results = unsafe { bench_client(clients, paxos_processor, num_clients_per_node) };
98
99 print_bench_results(bench_results);
100}
101
102#[cfg(test)]
103mod tests {
104 use dfir_rs::lang::graph::WriteConfig;
105 use hydro_lang::deploy::DeployRuntime;
106 use stageleft::RuntimeData;
107
108 use crate::cluster::paxos::{CorePaxos, PaxosConfig};
109
110 #[test]
111 fn paxos_ir() {
112 let builder = hydro_lang::FlowBuilder::new();
113 let proposers = builder.cluster();
114 let acceptors = builder.cluster();
115 let clients = builder.cluster();
116 let replicas = builder.cluster();
117
118 super::paxos_bench(
119 1,
120 1,
121 1,
122 2,
123 CorePaxos {
124 proposers: proposers.clone(),
125 acceptors: acceptors.clone(),
126 paxos_config: PaxosConfig {
127 f: 1,
128 i_am_leader_send_timeout: 1,
129 i_am_leader_check_timeout: 1,
130 i_am_leader_check_timeout_delay_multiplier: 1,
131 },
132 },
133 &clients,
134 &replicas,
135 );
136 let built = builder.with_default_optimize::<DeployRuntime>();
137
138 hydro_lang::ir::dbg_dedup_tee(|| {
139 insta::assert_debug_snapshot!(built.ir());
140 });
141
142 let preview = built.preview_compile();
143 insta::with_settings!({snapshot_suffix => "proposer_mermaid"}, {
144 insta::assert_snapshot!(
145 preview.dfir_for(&proposers).to_mermaid(&WriteConfig {
146 no_subgraphs: true,
147 no_varnames: false,
148 no_pull_push: true,
149 no_handoffs: true,
150 no_references: false,
151 op_short_text: false,
152 op_text_no_imports: true,
153 })
154 );
155 });
156
157 let _ = built.compile(&RuntimeData::new("FAKE"));
158 }
159}