hydro_test/cluster/
two_pc.rs1use std::fmt::Debug;
2use std::hash::Hash;
3
4use hydro_lang::*;
5use hydro_std::quorum::collect_quorum;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9pub struct Participant {}
10
11pub struct Coordinator {}
12
13pub fn two_pc<'a, Payload>(
14 coordinator: &Process<'a, Coordinator>,
15 participants: &Cluster<'a, Participant>,
16 num_participants: usize,
17 payloads: Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>,
18) -> Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>
19where
20 Payload: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug + Send,
21{
22 let p_prepare = payloads
25 .ir_node_named("c_prepare")
26 .broadcast_bincode(participants, nondet!());
27
28 let c_votes = p_prepare
31 .ir_node_named("p_prepare")
32 .send_bincode(coordinator)
33 .ir_node_named("c_votes")
34 .values();
35
36 let coordinator_tick = coordinator.tick();
38 let (c_all_vote_yes, _) = collect_quorum(
39 c_votes
40 .map(q!(|kv| (kv, Ok::<(), ()>(()))))
41 .atomic(&coordinator_tick),
42 num_participants,
43 num_participants,
44 );
45
46 let p_commit = c_all_vote_yes
50 .end_atomic()
51 .broadcast_bincode(participants, nondet!());
52 let c_commits = p_commit
55 .ir_node_named("p_commits")
56 .send_bincode(coordinator)
57 .ir_node_named("c_commits")
58 .values();
59 let (c_all_commit, _) = collect_quorum(
60 c_commits
61 .map(q!(|kv| (kv, Ok::<(), ()>(()))))
62 .atomic(&coordinator_tick),
63 num_participants,
64 num_participants,
65 );
66 c_all_commit.end_atomic()
69}