1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::collections::hash_map::Entry;
use std::collections::VecDeque;

use super::HalfJoinState;
use crate::util::clear::Clear;

type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;

use smallvec::{smallvec, SmallVec};
#[derive(Debug)]
pub struct HalfMultisetJoinState<Key, ValBuild, ValProbe> {
    // Here a smallvec with inline storage of 1 is chosen.
    // The rationale for this decision is that, I speculate, that joins possibly have a bimodal distribution with regards to how much key contention they have.
    // That is, I think that there are many joins that have 1 value per key on LHS/RHS, and there are also a large category of joins that have multiple values per key.
    // For the category of joins that have multiple values per key, it's not clear why they would only have 2, 3, 4, or N specific number of values per key. So there's no good number to set the smallvec storage to.
    // Instead we can just focus on the first group of joins that have 1 value per key and get benefit there without hurting the other group too much with excessive memory usage.
    /// Table to probe, vec val contains all matches.
    table: HashMap<Key, SmallVec<[ValBuild; 1]>>,
    /// Not-yet emitted matches.
    current_matches: VecDeque<(Key, ValProbe, ValBuild)>,
    len: usize,
}
impl<Key, ValBuild, ValProbe> Default for HalfMultisetJoinState<Key, ValBuild, ValProbe> {
    fn default() -> Self {
        Self {
            table: HashMap::default(),
            current_matches: VecDeque::default(),
            len: 0,
        }
    }
}
impl<Key, ValBuild, ValProbe> Clear for HalfMultisetJoinState<Key, ValBuild, ValProbe> {
    fn clear(&mut self) {
        self.table.clear();
        self.current_matches.clear();
        self.len = 0;
    }
}
impl<Key, ValBuild, ValProbe> HalfJoinState<Key, ValBuild, ValProbe>
    for HalfMultisetJoinState<Key, ValBuild, ValProbe>
where
    Key: Clone + Eq + std::hash::Hash,
    ValBuild: Clone,
    ValProbe: Clone,
{
    fn build(&mut self, k: Key, v: &ValBuild) -> bool {
        let entry = self.table.entry(k);

        match entry {
            Entry::Occupied(mut e) => {
                let vec = e.get_mut();

                vec.push(v.clone());
                self.len += 1;
            }
            Entry::Vacant(e) => {
                e.insert(smallvec![v.clone()]);
                self.len += 1;
            }
        };

        true
    }

    fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)> {
        // TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
        // This mean it will grow to eventually become the largest number of matches in a single probe call.
        // Maybe we should clear this memory at the beginning of every tick/periodically?
        let mut iter = self
            .table
            .get(k)?
            .iter()
            .map(|valbuild| (k.clone(), v.clone(), valbuild.clone()));

        let first = iter.next();

        self.current_matches.extend(iter);

        first
    }

    fn full_probe(&self, k: &Key) -> std::slice::Iter<'_, ValBuild> {
        let Some(sv) = self.table.get(k) else {
            return [].iter();
        };

        sv.iter()
    }

    fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)> {
        self.current_matches.pop_front()
    }

    fn len(&self) -> usize {
        self.len
    }
    fn iter(&self) -> std::collections::hash_map::Iter<'_, Key, SmallVec<[ValBuild; 1]>> {
        self.table.iter()
    }
}