1use dfir_rs::lattices::map_union::MapUnionHashMap;
2use dfir_rs::lattices::set_union::SetUnionHashSet;
3use dfir_rs::lattices::{DomPair, Max};
45use crate::Namespace;
67/// Primary key for entries in a table.
8pub type RowKey = String;
910/// Value stored in a table. Modelled as a timestamped set of strings.
11///
12/// Each value is timestamped with the time at which it was last updated. Concurrent updates at
13/// the same timestamp are stored as a set.
14pub type RowValue<C> = DomPair<C, SetUnionHashSet<String>>;
1516/// A map from row keys to values in a table.
17pub type Table<V> = MapUnionHashMap<RowKey, V>;
1819/// Name of a table in the data store.
20pub type TableName = String;
2122/// A map from table names to tables.
23pub type TableMap<V> = MapUnionHashMap<TableName, Table<V>>;
2425pub type NamespaceMap<V> = MapUnionHashMap<Namespace, TableMap<V>>;
2627pub type Namespaces<C> = NamespaceMap<RowValue<C>>;
2829/// Timestamps used in the model.
30// TODO: This will be updated to use a more sophisticated clock type with https://github.com/hydro-project/hydro/issues/1207.
31pub type Clock = Max<u64>;
3233/// TableMap element to upsert a row in an existing TableMap.
34///
35/// Merge this into an existing TableMap to upsert a row in a table. If the table does not exist,
36/// it gets created. There's no explicit "create table" operation.
37///
38/// Parameters:
39/// - `row_ts`: New timestamp of the row being upserted.
40/// - `table_name`: Name of the table.
41/// - `key`: Primary key of the row.
42/// - `val`: Row value.
43pub fn upsert_row<C>(
44 row_ts: C,
45 ns: Namespace,
46 table_name: TableName,
47 key: RowKey,
48 val: String,
49) -> Namespaces<C> {
50let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([val]));
51let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
52let table: TableMap<RowValue<C>> = TableMap::new_from([(table_name, row)]);
53 Namespaces::new_from([(ns, table)])
54}
5556/// TableMap element to delete a row from an existing TableMap.
57///
58/// Merge this into an existing TableMap to delete a row from a table.
59///
60/// Parameters:
61/// - `row_ts`: New timestamp of the row being deleted.
62/// - `table_name`: Name of the table.
63/// - `key`: Primary key of the row.
64pub fn delete_row<C>(
65 row_ts: C,
66 ns: Namespace,
67 table_name: TableName,
68 key: RowKey,
69) -> Namespaces<C> {
70let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([]));
71let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
72let table = TableMap::new_from([(table_name, row)]);
73 Namespaces::new_from([(ns, table)])
74}
7576#[cfg(test)]
77mod tests {
78use std::collections::HashSet;
7980use dfir_rs::lattices::Merge;
8182use crate::Namespace::System;
83use crate::model::{Clock, Namespaces, RowKey, TableName, delete_row, upsert_row};
8485#[test]
86fn test_table_map() {
87let mut namespaces: Namespaces<Clock> = Namespaces::default();
8889let first_tick: Clock = Clock::new(0);
90let second_tick: Clock = Clock::new(1);
9192let members_table = TableName::from("members");
93let key_1 = RowKey::from("key1");
94let value_1: String = "value1".to_string();
9596// Table starts out empty.
97assert_eq!(
98 namespaces.as_reveal_ref().len(),
990,
100"Expected no namespaces."
101);
102103let insert = upsert_row(
104 first_tick,
105 System,
106 members_table.clone(),
107 key_1.clone(),
108 value_1.clone(),
109 );
110 Merge::merge(&mut namespaces, insert);
111 {
112let table = namespaces
113 .as_reveal_ref()
114 .get(&System)
115 .unwrap()
116 .as_reveal_ref()
117 .get(&members_table)
118 .unwrap();
119120let row = table.as_reveal_ref().get(&key_1);
121assert!(row.is_some(), "Row should exist");
122assert_eq!(
123*row.unwrap().as_reveal_ref().0,
124 first_tick,
125"Unexpected row timestamp"
126);
127128let value = row.unwrap().as_reveal_ref().1.as_reveal_ref();
129assert_eq!(
130 value,
131&HashSet::from([value_1.to_string()]),
132"Unexpected row value"
133);
134 }
135136let delete_row = delete_row(
137 second_tick,
138 System,
139 members_table.clone(),
140 key_1.to_string(),
141 );
142 Merge::merge(&mut namespaces, delete_row);
143 {
144let table = namespaces
145 .as_reveal_ref()
146 .get(&System)
147 .unwrap()
148 .as_reveal_ref()
149 .get(&members_table)
150 .unwrap();
151152// Deletion in this case leaves a "tombstone"
153let row = table.as_reveal_ref().get(&key_1);
154155assert!(row.is_some(), "Row should exist");
156assert_eq!(
157*row.unwrap().as_reveal_ref().0,
158 second_tick,
159"Unexpected row timestamp"
160);
161162let value = row.unwrap().as_reveal_ref().1.as_reveal_ref();
163assert_eq!(value, &HashSet::from([]), "Row should be empty");
164 }
165 }
166}