Skip to main content

hydro_test/cluster/kv_replica/
mod.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4
5use hydro_lang::live_collections::stream::NoOrder;
6use hydro_lang::prelude::*;
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9
10mod sequence_payloads;
11
12pub struct Replica {}
13
14pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {}
15impl<K: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug> KvKey for K {}
16
17pub trait KvValue: Serialize + DeserializeOwned + Eq + Clone + Debug {}
18impl<V: Serialize + DeserializeOwned + Eq + Clone + Debug> KvValue for V {}
19
20#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
21pub struct KvPayload<K, V> {
22    pub key: K,
23    pub value: V,
24}
25
26#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
27pub struct SequencedKv<K, V> {
28    // Note: Important that seq is the first member of the struct for sorting
29    pub seq: usize,
30    pub kv: Option<KvPayload<K, V>>,
31}
32
33impl<K: KvKey, V: KvValue> Ord for SequencedKv<K, V> {
34    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
35        self.seq.cmp(&other.seq)
36    }
37}
38
39impl<K: KvKey, V: KvValue> PartialOrd for SequencedKv<K, V> {
40    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
41        Some(self.cmp(other))
42    }
43}
44
45// Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing.
46#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
47pub fn kv_replica<'a, K: KvKey, V: KvValue>(
48    replicas: &Cluster<'a, Replica>,
49    p_to_replicas: impl Into<
50        Stream<(usize, Option<KvPayload<K, V>>), Cluster<'a, Replica>, Unbounded, NoOrder>,
51    >,
52    checkpoint_frequency: usize,
53) -> (
54    Stream<usize, Cluster<'a, Replica>, Unbounded>,
55    Stream<KvPayload<K, V>, Cluster<'a, Replica>, Unbounded>,
56) {
57    let p_to_replicas: Stream<SequencedKv<K, V>, Cluster<'a, Replica>, Unbounded, NoOrder> =
58        p_to_replicas
59            .into()
60            .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv }));
61
62    let replica_tick = replicas.tick();
63
64    let (r_processable_payloads, r_next_slot_complete_cycle) =
65        sequence_payloads::sequence_payloads(&replica_tick, p_to_replicas);
66
67    let r_kv_store = r_processable_payloads.clone().across_ticks(|s| {
68        s.fold(
69            q!(|| (HashMap::new(), 0)),
70            q!(|(kv_store, next_slot), payload| {
71                if let Some(kv) = payload.kv {
72                    kv_store.insert(kv.key, kv.value);
73                }
74                *next_slot = payload.seq + 1;
75            }),
76        )
77    });
78    // Update the highest seq for the next tick
79    let r_next_slot = r_kv_store.map(q!(|(_kv_store, next_slot)| next_slot));
80    r_next_slot_complete_cycle.complete_next_tick(r_next_slot.clone());
81
82    // Send checkpoints to the acceptors when we've processed enough payloads
83    let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) =
84        replica_tick.cycle::<Optional<usize, _, _>>();
85    let r_max_checkpointed_seq = r_checkpointed_seqs
86        .into_stream()
87        .across_ticks(|s| s.max())
88        .into_singleton();
89    let r_checkpoint_seq_new = r_max_checkpointed_seq
90        .zip(
91            Optional::from(r_next_slot)
92                .defer_tick()
93                .unwrap_or(replica_tick.singleton(q!(0))),
94        )
95        .filter_map(q!(
96            move |(max_checkpointed_seq, next_slot)| if max_checkpointed_seq
97                .map(|m| next_slot - m >= checkpoint_frequency)
98                .unwrap_or(true)
99            {
100                Some(next_slot)
101            } else {
102                None
103            }
104        ));
105    r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone());
106
107    // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value.
108    let r_to_clients = r_processable_payloads
109        .filter_map(q!(|payload| payload.kv))
110        .all_ticks();
111    (r_checkpoint_seq_new.all_ticks(), r_to_clients)
112}