Skip to main content

hydro_test/cluster/
two_pc.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3
4use hydro_lang::live_collections::stream::NoOrder;
5use hydro_lang::prelude::*;
6use hydro_std::quorum::collect_quorum;
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10pub struct Participant {}
11
12pub struct Coordinator {}
13
14pub fn two_pc<'a, Payload>(
15    coordinator: &Process<'a, Coordinator>,
16    participants: &Cluster<'a, Participant>,
17    num_participants: usize,
18    payloads: Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>,
19) -> Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>
20where
21    Payload: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug + Send,
22{
23    // TODO: Coordinator logs
24    // broadcast prepare message to participants
25    let p_prepare = payloads.ir_node_named("c_prepare").broadcast(
26        participants,
27        TCP.fail_stop().bincode(),
28        nondet!(/** TODO */),
29    );
30
31    // participant 1 aborts transaction 1
32    // TODO: Participants log
33    let c_votes = p_prepare
34        .ir_node_named("p_prepare")
35        .send(coordinator, TCP.fail_stop().bincode())
36        .ir_node_named("c_votes")
37        .values();
38
39    // collect votes from participant.
40    let (c_all_vote_yes, _) = collect_quorum(
41        c_votes.map(q!(|kv| (kv, Ok::<(), ()>(())))),
42        num_participants,
43        num_participants,
44    );
45
46    // TODO: Coordinator log
47
48    // broadcast commit transactions to participants.
49    let p_commit = c_all_vote_yes.broadcast(
50        participants,
51        TCP.fail_stop().bincode(),
52        nondet!(/** TODO */),
53    );
54    // TODO: Participants log
55
56    let c_commits = p_commit
57        .ir_node_named("p_commits")
58        .send(coordinator, TCP.fail_stop().bincode())
59        .ir_node_named("c_commits")
60        .values();
61    let (c_all_commit, _) = collect_quorum(
62        c_commits.map(q!(|kv| (kv, Ok::<(), ()>(())))),
63        num_participants,
64        num_participants,
65    );
66    // TODO: Coordinator log
67
68    c_all_commit
69}