hydro_test/cluster/
two_pc.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3
4use hydro_lang::*;
5use hydro_std::quorum::collect_quorum;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9pub struct Participant {}
10
11pub struct Coordinator {}
12
13pub fn two_pc<'a, Payload>(
14    coordinator: &Process<'a, Coordinator>,
15    participants: &Cluster<'a, Participant>,
16    num_participants: usize,
17    payloads: Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>,
18) -> Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>
19where
20    Payload: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug + Send,
21{
22    // TODO: Coordinator logs
23    // broadcast prepare message to participants
24    let p_prepare = payloads
25        .ir_node_named("c_prepare")
26        .broadcast_bincode(participants, nondet!(/** TODO */));
27
28    // participant 1 aborts transaction 1
29    // TODO: Participants log
30    let c_votes = p_prepare
31        .ir_node_named("p_prepare")
32        .send_bincode(coordinator)
33        .ir_node_named("c_votes")
34        .values();
35
36    // collect votes from participant.
37    let coordinator_tick = coordinator.tick();
38    let (c_all_vote_yes, _) = collect_quorum(
39        c_votes
40            .map(q!(|kv| (kv, Ok::<(), ()>(()))))
41            .atomic(&coordinator_tick),
42        num_participants,
43        num_participants,
44    );
45
46    // TODO: Coordinator log
47
48    // broadcast commit transactions to participants.
49    let p_commit = c_all_vote_yes
50        .end_atomic()
51        .broadcast_bincode(participants, nondet!(/** TODO */));
52    // TODO: Participants log
53
54    let c_commits = p_commit
55        .ir_node_named("p_commits")
56        .send_bincode(coordinator)
57        .ir_node_named("c_commits")
58        .values();
59    let (c_all_commit, _) = collect_quorum(
60        c_commits
61            .map(q!(|kv| (kv, Ok::<(), ()>(()))))
62            .atomic(&coordinator_tick),
63        num_participants,
64        num_participants,
65    );
66    // TODO: Coordinator log
67
68    c_all_commit.end_atomic()
69}