use hydro_lang::*;
use hydro_std::quorum::collect_quorum;
pub struct Participants {}
pub struct Coordinator {}
pub struct Client {}
pub fn two_pc<'a>(
flow: &FlowBuilder<'a>,
num_participants: u32,
) -> (
Process<'a, Coordinator>,
Cluster<'a, Participants>,
Process<'a, Client>,
) {
let client = flow.process::<Client>();
let coordinator = flow.process::<Coordinator>();
let participants = flow.cluster::<Participants>();
let client_transaction = client.source_iter(q!(0..3));
let c_receive_client_transactions = client_transaction.send_bincode(&coordinator);
c_receive_client_transactions
.clone()
.for_each(q!(|t| println!(
"receive transaction {}, ready to broadcast",
t
)));
let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants);
let p_ready_to_commit = p_receive_prepare.map(q!(move |t| (
t,
if t == 1 && CLUSTER_SELF_ID.raw_id == 1 {
"abort".to_string()
} else {
"commit".to_string()
}
)));
let c_received_reply = p_ready_to_commit.send_bincode(&coordinator);
let coordinator_tick = coordinator.tick();
let (c_all_commit, c_participant_voted_abort) = collect_quorum(
c_received_reply
.map(q!(|(id, (t, reply))| (
t,
if reply == "commit" { Ok(()) } else { Err(id) }
)))
.timestamped(&coordinator_tick),
num_participants as usize,
num_participants as usize,
);
let p_receive_abort = c_participant_voted_abort
.inspect(q!(|(t, id)| println!(
"{} vote abort for transaction {}",
id, t
)))
.broadcast_bincode(&participants);
let c_receive_ack = p_receive_abort.send_bincode(&coordinator);
c_receive_ack.for_each(q!(|(id, (t, _))| println!(
"Coordinator receive participant {} abort for transaction {}",
id, t
)));
let p_receive_commit = c_all_commit.broadcast_bincode(&participants);
let c_receive_ack = p_receive_commit.send_bincode(&coordinator);
c_receive_ack.for_each(q!(|(id, t)| println!(
"receive participant {} commit for transaction {}",
id, t
)));
(coordinator, participants, client)
}