gossip_kv/
model.rs

1use dfir_rs::lattices::map_union::MapUnionHashMap;
2use dfir_rs::lattices::set_union::SetUnionHashSet;
3use dfir_rs::lattices::{DomPair, Max};
4
5use crate::Namespace;
6
7/// Primary key for entries in a table.
8pub type RowKey = String;
9
10/// Value stored in a table. Modelled as a timestamped set of strings.
11///
12/// Each value is timestamped with the time at which it was last updated. Concurrent updates at
13/// the same timestamp are stored as a set.
14pub type RowValue<C> = DomPair<C, SetUnionHashSet<String>>;
15
16/// A map from row keys to values in a table.
17pub type Table<V> = MapUnionHashMap<RowKey, V>;
18
19/// Name of a table in the data store.
20pub type TableName = String;
21
22/// A map from table names to tables.
23pub type TableMap<V> = MapUnionHashMap<TableName, Table<V>>;
24
25pub type NamespaceMap<V> = MapUnionHashMap<Namespace, TableMap<V>>;
26
27pub type Namespaces<C> = NamespaceMap<RowValue<C>>;
28
29/// Timestamps used in the model.
30// TODO: This will be updated to use a more sophisticated clock type with https://github.com/hydro-project/hydro/issues/1207.
31pub type Clock = Max<u64>;
32
33/// TableMap element to upsert a row in an existing TableMap.
34///
35/// Merge this into an existing TableMap to upsert a row in a table. If the table does not exist,
36/// it gets created. There's no explicit "create table" operation.
37///
38/// Parameters:
39/// - `row_ts`: New timestamp of the row being upserted.
40/// - `table_name`: Name of the table.
41/// - `key`: Primary key of the row.
42/// - `val`: Row value.
43pub fn upsert_row<C>(
44    row_ts: C,
45    ns: Namespace,
46    table_name: TableName,
47    key: RowKey,
48    val: String,
49) -> Namespaces<C> {
50    let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([val]));
51    let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
52    let table: TableMap<RowValue<C>> = TableMap::new_from([(table_name, row)]);
53    Namespaces::new_from([(ns, table)])
54}
55
56/// TableMap element to delete a row from an existing TableMap.
57///
58/// Merge this into an existing TableMap to delete a row from a table.
59///
60/// Parameters:
61/// - `row_ts`: New timestamp of the row being deleted.
62/// - `table_name`: Name of the table.
63/// - `key`: Primary key of the row.
64pub fn delete_row<C>(
65    row_ts: C,
66    ns: Namespace,
67    table_name: TableName,
68    key: RowKey,
69) -> Namespaces<C> {
70    let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([]));
71    let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
72    let table = TableMap::new_from([(table_name, row)]);
73    Namespaces::new_from([(ns, table)])
74}
75
76#[cfg(test)]
77mod tests {
78    use std::collections::HashSet;
79
80    use dfir_rs::lattices::Merge;
81
82    use crate::Namespace::System;
83    use crate::model::{Clock, Namespaces, RowKey, TableName, delete_row, upsert_row};
84
85    #[test]
86    fn test_table_map() {
87        let mut namespaces: Namespaces<Clock> = Namespaces::default();
88
89        let first_tick: Clock = Clock::new(0);
90        let second_tick: Clock = Clock::new(1);
91
92        let members_table = TableName::from("members");
93        let key_1 = RowKey::from("key1");
94        let value_1: String = "value1".to_string();
95
96        // Table starts out empty.
97        assert_eq!(
98            namespaces.as_reveal_ref().len(),
99            0,
100            "Expected no namespaces."
101        );
102
103        let insert = upsert_row(
104            first_tick,
105            System,
106            members_table.clone(),
107            key_1.clone(),
108            value_1.clone(),
109        );
110        Merge::merge(&mut namespaces, insert);
111        {
112            let table = namespaces
113                .as_reveal_ref()
114                .get(&System)
115                .unwrap()
116                .as_reveal_ref()
117                .get(&members_table)
118                .unwrap();
119
120            let row = table.as_reveal_ref().get(&key_1);
121            assert!(row.is_some(), "Row should exist");
122            assert_eq!(
123                *row.unwrap().as_reveal_ref().0,
124                first_tick,
125                "Unexpected row timestamp"
126            );
127
128            let value = row.unwrap().as_reveal_ref().1.as_reveal_ref();
129            assert_eq!(
130                value,
131                &HashSet::from([value_1.to_string()]),
132                "Unexpected row value"
133            );
134        }
135
136        let delete_row = delete_row(
137            second_tick,
138            System,
139            members_table.clone(),
140            key_1.to_string(),
141        );
142        Merge::merge(&mut namespaces, delete_row);
143        {
144            let table = namespaces
145                .as_reveal_ref()
146                .get(&System)
147                .unwrap()
148                .as_reveal_ref()
149                .get(&members_table)
150                .unwrap();
151
152            // Deletion in this case leaves a "tombstone"
153            let row = table.as_reveal_ref().get(&key_1);
154
155            assert!(row.is_some(), "Row should exist");
156            assert_eq!(
157                *row.unwrap().as_reveal_ref().0,
158                second_tick,
159                "Unexpected row timestamp"
160            );
161
162            let value = row.unwrap().as_reveal_ref().1.as_reveal_ref();
163            assert_eq!(value, &HashSet::from([]), "Row should be empty");
164        }
165    }
166}