hydro_test/cluster/kv_replica/
mod.rs1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4
5use hydro_lang::live_collections::stream::NoOrder;
6use hydro_lang::prelude::*;
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9
10mod sequence_payloads;
11
12pub struct Replica {}
13
14pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {}
15impl<K: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug> KvKey for K {}
16
17pub trait KvValue: Serialize + DeserializeOwned + Eq + Clone + Debug {}
18impl<V: Serialize + DeserializeOwned + Eq + Clone + Debug> KvValue for V {}
19
20#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
21pub struct KvPayload<K, V> {
22 pub key: K,
23 pub value: V,
24}
25
26#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
27pub struct SequencedKv<K, V> {
28 pub seq: usize,
30 pub kv: Option<KvPayload<K, V>>,
31}
32
33impl<K: KvKey, V: KvValue> Ord for SequencedKv<K, V> {
34 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
35 self.seq.cmp(&other.seq)
36 }
37}
38
39impl<K: KvKey, V: KvValue> PartialOrd for SequencedKv<K, V> {
40 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
41 Some(self.cmp(other))
42 }
43}
44
45#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
47pub fn kv_replica<'a, K: KvKey, V: KvValue>(
48 replicas: &Cluster<'a, Replica>,
49 p_to_replicas: impl Into<
50 Stream<(usize, Option<KvPayload<K, V>>), Cluster<'a, Replica>, Unbounded, NoOrder>,
51 >,
52 checkpoint_frequency: usize,
53) -> (
54 Stream<usize, Cluster<'a, Replica>, Unbounded>,
55 Stream<KvPayload<K, V>, Cluster<'a, Replica>, Unbounded>,
56) {
57 let p_to_replicas: Stream<SequencedKv<K, V>, Cluster<'a, Replica>, Unbounded, NoOrder> =
58 p_to_replicas
59 .into()
60 .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv }));
61
62 let replica_tick = replicas.tick();
63
64 let (r_processable_payloads, r_next_slot_complete_cycle) =
65 sequence_payloads::sequence_payloads(&replica_tick, p_to_replicas);
66
67 let r_kv_store = r_processable_payloads.clone().across_ticks(|s| {
68 s.fold(
69 q!(|| (HashMap::new(), 0)),
70 q!(|(kv_store, next_slot), payload| {
71 if let Some(kv) = payload.kv {
72 kv_store.insert(kv.key, kv.value);
73 }
74 *next_slot = payload.seq + 1;
75 }),
76 )
77 });
78 let r_next_slot = r_kv_store.map(q!(|(_kv_store, next_slot)| next_slot));
80 r_next_slot_complete_cycle.complete_next_tick(r_next_slot.clone());
81
82 let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) =
84 replica_tick.cycle::<Optional<usize, _, _>>();
85 let r_max_checkpointed_seq = r_checkpointed_seqs
86 .into_stream()
87 .across_ticks(|s| s.max())
88 .into_singleton();
89 let r_checkpoint_seq_new = r_max_checkpointed_seq
90 .zip(
91 Optional::from(r_next_slot)
92 .defer_tick()
93 .unwrap_or(replica_tick.singleton(q!(0))),
94 )
95 .filter_map(q!(
96 move |(max_checkpointed_seq, next_slot)| if max_checkpointed_seq
97 .map(|m| next_slot - m >= checkpoint_frequency)
98 .unwrap_or(true)
99 {
100 Some(next_slot)
101 } else {
102 None
103 }
104 ));
105 r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone());
106
107 let r_to_clients = r_processable_payloads
109 .filter_map(q!(|payload| payload.kv))
110 .all_ticks();
111 (r_checkpoint_seq_new.all_ticks(), r_to_clients)
112}