hydro_test/cluster/
two_pc.rs
1use hydro_lang::*;
2use hydro_std::quorum::collect_quorum;
3
4pub struct Participants {}
8
9pub struct Coordinator {}
10
11pub struct Client {}
12
13pub fn two_pc<'a>(
14 flow: &FlowBuilder<'a>,
15 num_participants: u32,
16) -> (
17 Process<'a, Coordinator>,
18 Cluster<'a, Participants>,
19 Process<'a, Client>,
20) {
21 let client = flow.process::<Client>();
23
24 let coordinator = flow.process::<Coordinator>();
26
27 let participants = flow.cluster::<Participants>();
29
30 let client_transaction = client.source_iter(q!(0..3));
32
33 let c_receive_client_transactions = client_transaction.send_bincode(&coordinator);
34 c_receive_client_transactions
35 .clone()
36 .for_each(q!(|t| println!(
37 "receive transaction {}, ready to broadcast",
38 t
39 )));
40
41 let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants);
43
44 let p_ready_to_commit = p_receive_prepare.map(q!(move |t| (
46 t,
47 if t == 1 && CLUSTER_SELF_ID.raw_id == 1 {
48 "abort".to_string()
49 } else {
50 "commit".to_string()
51 }
52 )));
53 let c_received_reply = p_ready_to_commit.send_bincode(&coordinator);
54 let coordinator_tick = coordinator.tick();
58 let (c_all_commit, c_participant_voted_abort) = collect_quorum(
59 c_received_reply
60 .map(q!(|(id, (t, reply))| (
61 t,
62 if reply == "commit" { Ok(()) } else { Err(id) }
63 )))
64 .atomic(&coordinator_tick),
65 num_participants as usize,
66 num_participants as usize,
67 );
68
69 let p_receive_abort = c_participant_voted_abort
70 .inspect(q!(|(t, id)| println!(
72 "{} vote abort for transaction {}",
73 id, t
74 )))
75 .broadcast_bincode(&participants);
76 let c_receive_ack = p_receive_abort.send_bincode(&coordinator);
77 c_receive_ack.for_each(q!(|(id, (t, _))| println!(
78 "Coordinator receive participant {} abort for transaction {}",
79 id, t
80 )));
81
82 let p_receive_commit = c_all_commit.broadcast_bincode(&participants);
84 let c_receive_ack = p_receive_commit.send_bincode(&coordinator);
87 c_receive_ack.for_each(q!(|(id, t)| println!(
88 "receive participant {} commit for transaction {}",
89 id, t
90 )));
91 (coordinator, participants, client)
92}