hydro_test/cluster/
two_pc.rs1use std::fmt::Debug;
2use std::hash::Hash;
3
4use hydro_lang::live_collections::stream::NoOrder;
5use hydro_lang::prelude::*;
6use hydro_std::quorum::collect_quorum;
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10pub struct Participant {}
11
12pub struct Coordinator {}
13
14pub fn two_pc<'a, Payload>(
15 coordinator: &Process<'a, Coordinator>,
16 participants: &Cluster<'a, Participant>,
17 num_participants: usize,
18 payloads: Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>,
19) -> Stream<Payload, Process<'a, Coordinator>, Unbounded, NoOrder>
20where
21 Payload: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug + Send,
22{
23 let p_prepare = payloads.ir_node_named("c_prepare").broadcast(
26 participants,
27 TCP.fail_stop().bincode(),
28 nondet!(),
29 );
30
31 let c_votes = p_prepare
34 .ir_node_named("p_prepare")
35 .send(coordinator, TCP.fail_stop().bincode())
36 .ir_node_named("c_votes")
37 .values();
38
39 let (c_all_vote_yes, _) = collect_quorum(
41 c_votes.map(q!(|kv| (kv, Ok::<(), ()>(())))),
42 num_participants,
43 num_participants,
44 );
45
46 let p_commit = c_all_vote_yes.broadcast(
50 participants,
51 TCP.fail_stop().bincode(),
52 nondet!(),
53 );
54 let c_commits = p_commit
57 .ir_node_named("p_commits")
58 .send(coordinator, TCP.fail_stop().bincode())
59 .ir_node_named("c_commits")
60 .values();
61 let (c_all_commit, _) = collect_quorum(
62 c_commits.map(q!(|kv| (kv, Ok::<(), ()>(())))),
63 num_participants,
64 num_participants,
65 );
66 c_all_commit
69}