hydro_test/cluster/
kv_replica.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4
5use hydro_lang::*;
6use serde::de::DeserializeOwned;
7use serde::{Deserialize, Serialize};
8
9pub struct Replica {}
10
11pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {}
12impl<K: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug> KvKey for K {}
13
14pub trait KvValue: Serialize + DeserializeOwned + Eq + Clone + Debug {}
15impl<V: Serialize + DeserializeOwned + Eq + Clone + Debug> KvValue for V {}
16
17#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
18pub struct KvPayload<K, V> {
19    pub key: K,
20    pub value: V,
21}
22
23#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
24pub struct SequencedKv<K, V> {
25    // Note: Important that seq is the first member of the struct for sorting
26    pub seq: usize,
27    pub kv: Option<KvPayload<K, V>>,
28}
29
30impl<K: KvKey, V: KvValue> Ord for SequencedKv<K, V> {
31    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32        self.seq.cmp(&other.seq)
33    }
34}
35
36impl<K: KvKey, V: KvValue> PartialOrd for SequencedKv<K, V> {
37    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38        Some(self.cmp(other))
39    }
40}
41
42// Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing.
43#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
44pub fn kv_replica<'a, K: KvKey, V: KvValue>(
45    replicas: &Cluster<'a, Replica>,
46    p_to_replicas: impl Into<
47        Stream<(usize, Option<KvPayload<K, V>>), Cluster<'a, Replica>, Unbounded, NoOrder>,
48    >,
49    checkpoint_frequency: usize,
50) -> (
51    Stream<usize, Cluster<'a, Replica>, Unbounded>,
52    Stream<KvPayload<K, V>, Cluster<'a, Replica>, Unbounded>,
53) {
54    let p_to_replicas: Stream<SequencedKv<K, V>, Cluster<'a, Replica>, Unbounded, NoOrder> =
55        p_to_replicas
56            .into()
57            .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv }));
58
59    let replica_tick = replicas.tick();
60
61    let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick.cycle();
62    // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload)));
63    let r_sorted_payloads = p_to_replicas.batch(&replica_tick, nondet!(
64            /// because we fill slots one-by-one, we can safely batch
65            /// because non-determinism is resolved when we sort by slots
66        ))
67        .chain(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet
68        .sort();
69    // Create a cycle since we'll use this seq before we define it
70    let (r_next_slot_complete_cycle, r_next_slot) =
71        replica_tick.cycle_with_initial(replica_tick.singleton(q!(0)));
72    // Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole.
73    let r_next_slot_after_processing_payloads = r_sorted_payloads
74        .clone()
75        .cross_singleton(r_next_slot.clone())
76        .fold(
77            q!(|| 0),
78            q!(|new_next_slot, (sorted_payload, next_slot)| {
79                if sorted_payload.seq == std::cmp::max(*new_next_slot, next_slot) {
80                    *new_next_slot = sorted_payload.seq + 1;
81                }
82            }),
83        );
84    // Find all payloads that can and cannot be processed in this tick.
85    let r_processable_payloads = r_sorted_payloads
86        .clone()
87        .cross_singleton(r_next_slot_after_processing_payloads.clone())
88        .filter(q!(
89            |(sorted_payload, highest_seq)| sorted_payload.seq < *highest_seq
90        ))
91        .map(q!(|(sorted_payload, _)| { sorted_payload }));
92    let r_new_non_processable_payloads = r_sorted_payloads
93        .clone()
94        .cross_singleton(r_next_slot_after_processing_payloads.clone())
95        .filter(q!(
96            |(sorted_payload, highest_seq)| sorted_payload.seq > *highest_seq
97        ))
98        .map(q!(|(sorted_payload, _)| { sorted_payload }));
99    // Save these, we can process them once the hole has been filled
100    r_buffered_payloads_complete_cycle.complete_next_tick(r_new_non_processable_payloads);
101
102    let r_kv_store = r_processable_payloads
103        .clone()
104        .persist() // Optimization: all_ticks() + fold() = fold<static>, where the state of the previous fold is saved and persisted values are deleted.
105        .fold(q!(|| (HashMap::new(), 0)), q!(|(kv_store, next_slot), payload| {
106            if let Some(kv) = payload.kv {
107                kv_store.insert(kv.key, kv.value);
108            }
109            *next_slot = payload.seq + 1;
110        }));
111    // Update the highest seq for the next tick
112    r_next_slot_complete_cycle
113        .complete_next_tick(r_kv_store.map(q!(|(_kv_store, next_slot)| next_slot)));
114
115    // Send checkpoints to the acceptors when we've processed enough payloads
116    let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) =
117        replica_tick.cycle::<Optional<usize, _, _>>();
118    let r_max_checkpointed_seq = r_checkpointed_seqs.persist().max().into_singleton();
119    let r_checkpoint_seq_new = r_max_checkpointed_seq
120        .zip(r_next_slot)
121        .filter_map(q!(
122            move |(max_checkpointed_seq, next_slot)| if max_checkpointed_seq
123                .map(|m| next_slot - m >= checkpoint_frequency)
124                .unwrap_or(true)
125            {
126                Some(next_slot)
127            } else {
128                None
129            }
130        ));
131    r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone());
132
133    // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value.
134    let r_to_clients = r_processable_payloads
135        .filter_map(q!(|payload| payload.kv))
136        .all_ticks();
137    (r_checkpoint_seq_new.all_ticks(), r_to_clients)
138}