Skip to main content

dfir_rs/compiled/pull/half_join_state/
multiset.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::collections::hash_map::Entry;
4
5use super::HalfJoinState;
6
7type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;
8
9use smallvec::{SmallVec, smallvec};
10
11/// [`HalfJoinState`] with multiset semantics.
12#[derive(Debug)]
13pub struct HalfMultisetJoinState<Key, ValBuild, ValProbe> {
14    // Here a smallvec with inline storage of 1 is chosen.
15    // The rationale for this decision is that, I speculate, that joins possibly have a bimodal distribution with regards to how much key contention they have.
16    // That is, I think that there are many joins that have 1 value per key on LHS/RHS, and there are also a large category of joins that have multiple values per key.
17    // For the category of joins that have multiple values per key, it's not clear why they would only have 2, 3, 4, or N specific number of values per key. So there's no good number to set the smallvec storage to.
18    // Instead we can just focus on the first group of joins that have 1 value per key and get benefit there without hurting the other group too much with excessive memory usage.
19    /// Table to probe, vec val contains all matches.
20    table: HashMap<Key, SmallVec<[ValBuild; 1]>>,
21    /// Not-yet emitted matches.
22    current_matches: VecDeque<(Key, ValProbe, ValBuild)>,
23    len: usize,
24}
25
26impl<Key, ValBuild, ValProbe> Default for HalfMultisetJoinState<Key, ValBuild, ValProbe> {
27    fn default() -> Self {
28        Self {
29            table: HashMap::default(),
30            current_matches: VecDeque::default(),
31            len: 0,
32        }
33    }
34}
35
36impl<Key, ValBuild, ValProbe> HalfJoinState<Key, ValBuild, ValProbe>
37    for HalfMultisetJoinState<Key, ValBuild, ValProbe>
38where
39    Key: Clone + Eq + std::hash::Hash,
40    ValBuild: Clone,
41    ValProbe: Clone,
42{
43    fn build(&mut self, k: Key, v: Cow<'_, ValBuild>) -> bool {
44        let entry = self.table.entry(k);
45
46        match entry {
47            Entry::Occupied(mut e) => {
48                let vec = e.get_mut();
49
50                vec.push(v.into_owned());
51                self.len += 1;
52            }
53            Entry::Vacant(e) => {
54                e.insert(smallvec![v.into_owned()]);
55                self.len += 1;
56            }
57        };
58
59        true
60    }
61
62    fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)> {
63        // TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
64        // This mean it will grow to eventually become the largest number of matches in a single probe call.
65        // Maybe we should clear this memory at the beginning of every tick/periodically?
66        let mut iter = self
67            .table
68            .get(k)?
69            .iter()
70            .map(|valbuild| (k.clone(), v.clone(), valbuild.clone()));
71
72        let first = iter.next();
73
74        self.current_matches.extend(iter);
75
76        first
77    }
78
79    fn full_probe(&self, k: &Key) -> std::slice::Iter<'_, ValBuild> {
80        let Some(sv) = self.table.get(k) else {
81            return [].iter();
82        };
83
84        sv.iter()
85    }
86
87    fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)> {
88        self.current_matches.pop_front()
89    }
90
91    fn len(&self) -> usize {
92        self.len
93    }
94
95    fn iter(&self) -> std::collections::hash_map::Iter<'_, Key, SmallVec<[ValBuild; 1]>> {
96        #[expect(clippy::disallowed_methods, reason = "FxHasher is deterministic")]
97        self.table.iter()
98    }
99
100    fn clear(&mut self) {
101        self.table.clear();
102        self.current_matches.clear();
103        self.len = 0;
104    }
105}