1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use super::HalfJoinState;
use crate::util::clear::Clear;
type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;
use smallvec::{smallvec, SmallVec};
#[derive(Debug)]
pub struct HalfMultisetJoinState<Key, ValBuild, ValProbe> {
// Here a smallvec with inline storage of 1 is chosen.
// The rationale for this decision is that, I speculate, that joins possibly have a bimodal distribution with regards to how much key contention they have.
// That is, I think that there are many joins that have 1 value per key on LHS/RHS, and there are also a large category of joins that have multiple values per key.
// For the category of joins that have multiple values per key, it's not clear why they would only have 2, 3, 4, or N specific number of values per key. So there's no good number to set the smallvec storage to.
// Instead we can just focus on the first group of joins that have 1 value per key and get benefit there without hurting the other group too much with excessive memory usage.
/// Table to probe, vec val contains all matches.
table: HashMap<Key, SmallVec<[ValBuild; 1]>>,
/// Not-yet emitted matches.
current_matches: VecDeque<(Key, ValProbe, ValBuild)>,
len: usize,
}
impl<Key, ValBuild, ValProbe> Default for HalfMultisetJoinState<Key, ValBuild, ValProbe> {
fn default() -> Self {
Self {
table: HashMap::default(),
current_matches: VecDeque::default(),
len: 0,
}
}
}
impl<Key, ValBuild, ValProbe> Clear for HalfMultisetJoinState<Key, ValBuild, ValProbe> {
fn clear(&mut self) {
self.table.clear();
self.current_matches.clear();
self.len = 0;
}
}
impl<Key, ValBuild, ValProbe> HalfJoinState<Key, ValBuild, ValProbe>
for HalfMultisetJoinState<Key, ValBuild, ValProbe>
where
Key: Clone + Eq + std::hash::Hash,
ValBuild: Clone,
ValProbe: Clone,
{
fn build(&mut self, k: Key, v: &ValBuild) -> bool {
let entry = self.table.entry(k);
match entry {
Entry::Occupied(mut e) => {
let vec = e.get_mut();
vec.push(v.clone());
self.len += 1;
}
Entry::Vacant(e) => {
e.insert(smallvec![v.clone()]);
self.len += 1;
}
};
true
}
fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)> {
// TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
// This mean it will grow to eventually become the largest number of matches in a single probe call.
// Maybe we should clear this memory at the beginning of every tick/periodically?
let mut iter = self
.table
.get(k)?
.iter()
.map(|valbuild| (k.clone(), v.clone(), valbuild.clone()));
let first = iter.next();
self.current_matches.extend(iter);
first
}
fn full_probe(&self, k: &Key) -> std::slice::Iter<'_, ValBuild> {
let Some(sv) = self.table.get(k) else {
return [].iter();
};
sv.iter()
}
fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)> {
self.current_matches.pop_front()
}
fn len(&self) -> usize {
self.len
}
fn iter(&self) -> std::collections::hash_map::Iter<'_, Key, SmallVec<[ValBuild; 1]>> {
self.table.iter()
}
}