hydro_test/cluster/
kv_replica.rs1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4
5use hydro_lang::*;
6use serde::de::DeserializeOwned;
7use serde::{Deserialize, Serialize};
8
9pub struct Replica {}
10
11pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {}
12impl<K: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug> KvKey for K {}
13
14pub trait KvValue: Serialize + DeserializeOwned + Eq + Clone + Debug {}
15impl<V: Serialize + DeserializeOwned + Eq + Clone + Debug> KvValue for V {}
16
17#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
18pub struct KvPayload<K, V> {
19 pub key: K,
20 pub value: V,
21}
22
23#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
24pub struct SequencedKv<K, V> {
25 pub seq: usize,
27 pub kv: Option<KvPayload<K, V>>,
28}
29
30impl<K: KvKey, V: KvValue> Ord for SequencedKv<K, V> {
31 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32 self.seq.cmp(&other.seq)
33 }
34}
35
36impl<K: KvKey, V: KvValue> PartialOrd for SequencedKv<K, V> {
37 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38 Some(self.cmp(other))
39 }
40}
41
42#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
44pub fn kv_replica<'a, K: KvKey, V: KvValue>(
45 replicas: &Cluster<'a, Replica>,
46 p_to_replicas: impl Into<
47 Stream<(usize, Option<KvPayload<K, V>>), Cluster<'a, Replica>, Unbounded, NoOrder>,
48 >,
49 checkpoint_frequency: usize,
50) -> (
51 Stream<usize, Cluster<'a, Replica>, Unbounded>,
52 Stream<KvPayload<K, V>, Cluster<'a, Replica>, Unbounded>,
53) {
54 let p_to_replicas: Stream<SequencedKv<K, V>, Cluster<'a, Replica>, Unbounded, NoOrder> =
55 p_to_replicas
56 .into()
57 .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv }));
58
59 let replica_tick = replicas.tick();
60
61 let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick.cycle();
62 let r_sorted_payloads = p_to_replicas.batch(&replica_tick, nondet!(
64 ))
67 .chain(r_buffered_payloads) .sort();
69 let (r_next_slot_complete_cycle, r_next_slot) =
71 replica_tick.cycle_with_initial(replica_tick.singleton(q!(0)));
72 let r_next_slot_after_processing_payloads = r_sorted_payloads
74 .clone()
75 .cross_singleton(r_next_slot.clone())
76 .fold(
77 q!(|| 0),
78 q!(|new_next_slot, (sorted_payload, next_slot)| {
79 if sorted_payload.seq == std::cmp::max(*new_next_slot, next_slot) {
80 *new_next_slot = sorted_payload.seq + 1;
81 }
82 }),
83 );
84 let r_processable_payloads = r_sorted_payloads
86 .clone()
87 .cross_singleton(r_next_slot_after_processing_payloads.clone())
88 .filter(q!(
89 |(sorted_payload, highest_seq)| sorted_payload.seq < *highest_seq
90 ))
91 .map(q!(|(sorted_payload, _)| { sorted_payload }));
92 let r_new_non_processable_payloads = r_sorted_payloads
93 .clone()
94 .cross_singleton(r_next_slot_after_processing_payloads.clone())
95 .filter(q!(
96 |(sorted_payload, highest_seq)| sorted_payload.seq > *highest_seq
97 ))
98 .map(q!(|(sorted_payload, _)| { sorted_payload }));
99 r_buffered_payloads_complete_cycle.complete_next_tick(r_new_non_processable_payloads);
101
102 let r_kv_store = r_processable_payloads
103 .clone()
104 .persist() .fold(q!(|| (HashMap::new(), 0)), q!(|(kv_store, next_slot), payload| {
106 if let Some(kv) = payload.kv {
107 kv_store.insert(kv.key, kv.value);
108 }
109 *next_slot = payload.seq + 1;
110 }));
111 r_next_slot_complete_cycle
113 .complete_next_tick(r_kv_store.map(q!(|(_kv_store, next_slot)| next_slot)));
114
115 let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) =
117 replica_tick.cycle::<Optional<usize, _, _>>();
118 let r_max_checkpointed_seq = r_checkpointed_seqs.persist().max().into_singleton();
119 let r_checkpoint_seq_new = r_max_checkpointed_seq
120 .zip(r_next_slot)
121 .filter_map(q!(
122 move |(max_checkpointed_seq, next_slot)| if max_checkpointed_seq
123 .map(|m| next_slot - m >= checkpoint_frequency)
124 .unwrap_or(true)
125 {
126 Some(next_slot)
127 } else {
128 None
129 }
130 ));
131 r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone());
132
133 let r_to_clients = r_processable_payloads
135 .filter_map(q!(|payload| payload.kv))
136 .all_ticks();
137 (r_checkpoint_seq_new.all_ticks(), r_to_clients)
138}