1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use hydro_lang::*;
use hydro_std::quorum::collect_quorum;
use tokio::time::Instant;
use super::paxos::{Acceptor, Ballot, Proposer};
use super::paxos_kv::{paxos_kv, KvPayload, Replica};
pub struct Client {}
// Important: By convention, all relations that represent booleans either have a single "true" value or nothing.
// This allows us to use the continue_if_exists() and continue_if_empty() operators as if they were if (true) and if (false) statements.
#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")]
pub fn paxos_bench<'a>(
flow: &FlowBuilder<'a>,
f: usize,
num_clients_per_node: usize,
median_latency_window_size: usize, /* How many latencies to keep in the window for calculating the median */
checkpoint_frequency: usize, // How many sequence numbers to commit before checkpointing
i_am_leader_send_timeout: u64, // How often to heartbeat
i_am_leader_check_timeout: u64, // How often to check if heartbeat expired
i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */
) -> (
Cluster<'a, Proposer>,
Cluster<'a, Acceptor>,
Cluster<'a, Client>,
Cluster<'a, Replica>,
) {
let proposers = flow.cluster::<Proposer>();
let acceptors = flow.cluster::<Acceptor>();
let clients = flow.cluster::<Client>();
let replicas = flow.cluster::<Replica>();
let (new_leader_elected_complete, new_leader_elected) =
clients.forward_ref::<Stream<_, _, _, NoOrder>>();
let client_tick = clients.tick();
let cur_leader_id = new_leader_elected
.inspect(q!(|ballot| println!(
"Client notified that leader was elected: {:?}",
.map(q!(|ballot: Ballot| ballot.proposer_id));
let leader_changed = unsafe {
// SAFETY: we are okay if we miss a transient leader ID, because we
// will eventually get the latest one and can restart requests then
.map(q!(|_| ()))
|c_to_proposers| {
let to_proposers = unsafe {
// SAFETY: the risk here is that we send a batch of requests
// with a stale leader ID, but because the leader ID comes from the
// network there is no way to guarantee that it is up to date
// TODO(shadaj): we should retry if we get an error due to sending
// to a stale leader
.map(q!(move |((key, value), leader_id)| (
KvPayload {
// we use our ID as part of the value and use that so the replica only notifies us
value: (CLUSTER_SELF_ID, value)
let to_proposers = unsafe {
// SAFETY: clients "own" certain keys, so interleaving elements from clients will not affect
// the order of writes to the same key
let (new_leader_elected, processed_payloads) = unsafe {
// SAFETY: Non-deterministic leader notifications are handled in `to_proposers`. We do not
// care about the order in which key writes are processed, which is the non-determinism in
// `processed_payloads`.
let c_received_payloads = processed_payloads
.map(q!(|payload| (
((payload.key, payload.value.1), Ok(()))
// we only mark a transaction as committed when all replicas have applied it
let (c_quorum_payloads, _) = collect_quorum::<_, _, _, ()>(
f + 1,
f + 1,
(proposers, acceptors, clients, replicas)
fn bench_client<'a>(
clients: &Cluster<'a, Client>,
trigger_restart: Stream<(), Cluster<'a, Client>, Unbounded>,
transaction_cycle: impl FnOnce(
Stream<(u32, u32), Cluster<'a, Client>, Unbounded>,
) -> Stream<(u32, u32), Cluster<'a, Client>, Unbounded, NoOrder>,
num_clients_per_node: usize,
median_latency_window_size: usize,
) {
let client_tick = clients.tick();
// r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload)));
// Whenever the leader changes, make all clients send a message
let restart_this_tick = unsafe {
// SAFETY: non-deterministic delay in restarting requests
// is okay because once it is restarted statistics should reach
// steady state regardless of when the restart happes
let c_new_payloads_when_restart = restart_this_tick.clone().flat_map_ordered(q!(move |_| (0
.map(move |i| (
(CLUSTER_SELF_ID.raw_id * (num_clients_per_node as u32)) + i as u32,
let (c_to_proposers_complete_cycle, c_to_proposers) =
clients.forward_ref::<Stream<_, _, _, TotalOrder>>();
let c_received_quorum_payloads = unsafe {
// SAFETY: because the transaction processor is required to handle arbitrary reordering
// across *different* keys, we are safe because delaying a transaction result for a key
// will only affect when the next request for that key is emitted with respect to other
// keys
// Whenever all replicas confirm that a payload was committed, send another payload
let c_new_payloads_when_committed = c_received_quorum_payloads
.map(q!(|payload| (payload.0, payload.1 + 1)));
.chain(unsafe {
// SAFETY: we don't send a new write for the same key until the previous one is committed,
// so this contains only a single write per key, and we don't care about order
// across keys
// Track statistics
let (c_timers_complete_cycle, c_timers) =
client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
let c_new_timers_when_leader_elected = restart_this_tick
.map(q!(|_| Instant::now()))
move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
let c_updated_timers = c_received_quorum_payloads
.map(q!(|(key, _prev_count)| (key as usize, Instant::now())));
let c_new_timers = c_timers
.clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency)
.reduce_keyed_commutative(q!(|curr_time, new_time| {
if new_time > *curr_time {
*curr_time = new_time;
let c_stats_output_timer = unsafe {
// SAFETY: intentionally sampling statistics
let c_latency_reset = c_stats_output_timer.clone().map(q!(|_| None)).defer_tick();
let c_latencies = c_timers
.map(q!(|(_virtual_id, (prev_time, curr_time))| Some(
// Create window with ring buffer using vec + wraparound index
// TODO: Would be nice if I could use vec![] instead, but that doesn't work in Hydro with RuntimeData *median_latency_window_size
q!(move || (
q!(move |(latencies, write_index), latency| {
let mut latencies_mut = latencies.borrow_mut();
if *write_index < latencies_mut.len() {
latencies_mut[*write_index] = latency;
} else {
// Increment write index and wrap around
*write_index = (*write_index + 1) % median_latency_window_size;
.map(q!(|(latencies, _)| latencies));
let c_throughput_new_batch = c_received_quorum_payloads
.map(q!(|batch_size| (batch_size, false)));
let c_throughput_reset = c_stats_output_timer
.map(q!(|_| (0, true)))
let c_throughput = c_throughput_new_batch
q!(|| 0),
q!(|total, (batch_size, reset)| {
if reset {
*total = 0;
} else {
*total += batch_size;
unsafe {
// SAFETY: intentionally sampling statistics
.for_each(q!(move |(latencies, throughput)| {
let mut latencies_mut = latencies.borrow_mut();
if latencies_mut.len() > 0 {
let middle_idx = latencies_mut.len() / 2;
let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx);
println!("Median latency: {}ms", median.as_micros() as f64 / 1000.0);
println!("Throughput: {} requests/s", throughput);
// End track statistics
mod tests {
use hydro_lang::deploy::DeployRuntime;
use stageleft::RuntimeData;
fn paxos_ir() {
let builder = hydro_lang::FlowBuilder::new();
let _ = super::paxos_bench(&builder, 1, 1, 1, 1, 1, 1, 1);
let built = builder.with_default_optimize::<DeployRuntime>();
hydro_lang::ir::dbg_dedup_tee(|| {
let _ = built.compile(&RuntimeData::new("FAKE"));