hydro_test/cluster/
two_pc.rs

1use hydro_lang::*;
2use hydro_std::quorum::collect_quorum;
3
4// if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side.
5//
6
7pub struct Participants {}
8
9pub struct Coordinator {}
10
11pub struct Client {}
12
13pub fn two_pc<'a>(
14    flow: &FlowBuilder<'a>,
15    num_participants: u32,
16) -> (
17    Process<'a, Coordinator>,
18    Cluster<'a, Participants>,
19    Process<'a, Client>,
20) {
21    // Assume single client.
22    let client = flow.process::<Client>();
23
24    // Assume single coordinator.
25    let coordinator = flow.process::<Coordinator>();
26
27    // Assume 3 participants.
28    let participants = flow.cluster::<Participants>();
29
30    // assume 3 transactions are generated from 0 to 3
31    let client_transaction = client.source_iter(q!(0..3));
32
33    let c_receive_client_transactions = client_transaction.send_bincode(&coordinator);
34    c_receive_client_transactions
35        .clone()
36        .for_each(q!(|t| println!(
37            "receive transaction {}, ready to broadcast",
38            t
39        )));
40
41    // broadcast prepare message to participants.
42    let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants);
43
44    // participant 1 aborts transaction 1
45    let p_ready_to_commit = p_receive_prepare.map(q!(move |t| (
46        t,
47        if t == 1 && CLUSTER_SELF_ID.raw_id == 1 {
48            "abort".to_string()
49        } else {
50            "commit".to_string()
51        }
52    )));
53    let c_received_reply = p_ready_to_commit.send_bincode(&coordinator);
54    // c_received_reply.clone().inspect(q!(|(id, (t, reply))| println!("participant {id} said {reply} for transaction {t}")));
55
56    // collect votes from participant.
57    let coordinator_tick = coordinator.tick();
58    let (c_all_commit, c_participant_voted_abort) = collect_quorum(
59        c_received_reply
60            .map(q!(|(id, (t, reply))| (
61                t,
62                if reply == "commit" { Ok(()) } else { Err(id) }
63            )))
64            .atomic(&coordinator_tick),
65        num_participants as usize,
66        num_participants as usize,
67    );
68
69    let p_receive_abort = c_participant_voted_abort
70        // TODO(shadaj): if multiple participants vote abort we should deduplicate
71        .inspect(q!(|(t, id)| println!(
72            "{} vote abort for transaction {}",
73            id, t
74        )))
75        .broadcast_bincode(&participants);
76    let c_receive_ack = p_receive_abort.send_bincode(&coordinator);
77    c_receive_ack.for_each(q!(|(id, (t, _))| println!(
78        "Coordinator receive participant {} abort for transaction {}",
79        id, t
80    )));
81
82    // broadcast commit transactions to participants.
83    let p_receive_commit = c_all_commit.broadcast_bincode(&participants);
84    // p_receive_commit.clone().for_each(q!(|t| println!("commit for transaction {}", t)));
85
86    let c_receive_ack = p_receive_commit.send_bincode(&coordinator);
87    c_receive_ack.for_each(q!(|(id, t)| println!(
88        "receive participant {} commit for transaction {}",
89        id, t
90    )));
91    (coordinator, participants, client)
92}