1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
use hydro_lang::*;
use super::paxos::{paxos_core, Acceptor, Ballot, PaxosConfig, PaxosPayload, Proposer};
/// Wraps the core Paxos algorithm with logic to send payloads from clients to the current
/// leader.
///
/// # Safety
/// Clients may send payloads to a stale leader if the leader changes between the time the
/// payload is sent and the time it is processed. This will result in the payload being dropped.
/// Payloads sent from multiple clients may be interleaved in a non-deterministic order.
pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
clients: &Cluster<'a, C>,
payloads: Stream<P, Cluster<'a, C>, Unbounded>,
replica_checkpoint: Stream<(ClusterId<R>, usize), Cluster<'a, Acceptor>, Unbounded, NoOrder>,
paxos_config: PaxosConfig,
) -> Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder> {
unsafe {
// SAFETY: Non-deterministic leader notifications are handled in `cur_leader_id`. We do not
// care about the order in which key writes are processed, which is the non-determinism in
// `sequenced_payloads`.
paxos_core(
proposers,
acceptors,
replica_checkpoint,
|new_leader_elected| {
let cur_leader_id = new_leader_elected
.broadcast_bincode_interleaved(clients)
.inspect(q!(|ballot| println!(
"Client notified that leader was elected: {:?}",
ballot
)))
.max()
.map(q!(|ballot: Ballot| ballot.proposer_id));
let payloads_at_proposer = {
// SAFETY: the risk here is that we send a batch of requests
// with a stale leader ID, but because the leader ID comes from the
// network there is no way to guarantee that it is up to date. This
// is documented non-determinism.
let client_tick = clients.tick();
let payload_batch = payloads.timestamped(&client_tick).tick_batch();
let latest_leader = cur_leader_id.timestamped(&client_tick).latest_tick();
let (unsent_payloads_complete, unsent_payloads) =
client_tick.cycle::<Stream<_, _, _, TotalOrder>>();
let all_payloads = unsent_payloads.chain(payload_batch);
unsent_payloads_complete.complete_next_tick(
all_payloads.clone().continue_unless(latest_leader.clone()),
);
all_payloads.cross_singleton(latest_leader).all_ticks()
}
.map(q!(move |(payload, leader_id)| (leader_id, payload)))
.send_bincode_interleaved(proposers);
let payloads_at_proposer = {
// SAFETY: documented non-determinism in interleaving of client payloads
payloads_at_proposer.assume_ordering()
};
payloads_at_proposer
},
paxos_config,
)
.1
}
}