hydro_lang/
keyed_singleton.rs

1use std::hash::Hash;
2
3use stageleft::{IntoQuotedMut, QuotedWithContext, q};
4
5use crate::boundedness::Boundedness;
6use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
7use crate::location::tick::NoAtomic;
8use crate::location::{LocationId, NoTick};
9use crate::manual_expr::ManualExpr;
10use crate::stream::ExactlyOnce;
11use crate::unsafety::NonDet;
12use crate::{
13    Atomic, Bounded, KeyedStream, Location, NoOrder, Optional, Singleton, Stream, Tick, TotalOrder,
14    Unbounded, nondet,
15};
16
17pub trait KeyedSingletonBound {
18    type UnderlyingBound: Boundedness;
19    type ValueBound: Boundedness;
20}
21
22impl KeyedSingletonBound for Unbounded {
23    type UnderlyingBound = Unbounded;
24    type ValueBound = Unbounded;
25}
26
27impl KeyedSingletonBound for Bounded {
28    type UnderlyingBound = Bounded;
29    type ValueBound = Bounded;
30}
31
32/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
33/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
34/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
35pub struct BoundedValue;
36
37impl KeyedSingletonBound for BoundedValue {
38    type UnderlyingBound = Unbounded;
39    type ValueBound = Bounded;
40}
41
42pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
43    pub(crate) underlying: Stream<(K, V), Loc, Bound::UnderlyingBound, NoOrder, ExactlyOnce>,
44}
45
46impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
47    for KeyedSingleton<K, V, Loc, Bound>
48{
49    fn clone(&self) -> Self {
50        KeyedSingleton {
51            underlying: self.underlying.clone(),
52        }
53    }
54}
55
56impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRefMarker>
57    for KeyedSingleton<K, V, L, B>
58where
59    L: Location<'a> + NoTick,
60{
61    type Location = L;
62
63    fn create_source(ident: syn::Ident, location: L) -> Self {
64        KeyedSingleton {
65            underlying: Stream::create_source(ident, location),
66        }
67    }
68}
69
70impl<'a, K, V, L, B: KeyedSingletonBound> CycleComplete<'a, ForwardRefMarker>
71    for KeyedSingleton<K, V, L, B>
72where
73    L: Location<'a> + NoTick,
74{
75    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
76        self.underlying.complete(ident, expected_location);
77    }
78}
79
80impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
81    KeyedSingleton<K, V, L, B>
82{
83    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
84        self.underlying
85    }
86
87    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
88        self.entries().map(q!(|(_, v)| v))
89    }
90
91    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
92        self.entries().map(q!(|(k, _)| k))
93    }
94
95    pub fn filter_key_not_in<O2, R2>(self, other: Stream<K, L, Bounded, O2, R2>) -> Self
96    where
97        K: Hash + Eq,
98    {
99        KeyedSingleton {
100            underlying: self.entries().anti_join(other),
101        }
102    }
103
104    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
105    where
106        F: Fn(&V) + 'a,
107    {
108        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
109        KeyedSingleton {
110            underlying: self.underlying.inspect(q!({
111                let orig = f;
112                move |(_k, v)| orig(v)
113            })),
114        }
115    }
116
117    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> KeyedSingleton<K, V, L, B>
118    where
119        F: Fn(&(K, V)) + 'a,
120    {
121        KeyedSingleton {
122            underlying: self.underlying.inspect(f),
123        }
124    }
125
126    pub fn into_keyed_stream(
127        self,
128    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
129        self.underlying
130            .into_keyed()
131            .assume_ordering(nondet!(/** only one element per key */))
132    }
133}
134
135impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
136    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
137    where
138        F: Fn(V) -> U + 'a,
139    {
140        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
141        KeyedSingleton {
142            underlying: self.underlying.map(q!({
143                let orig = f;
144                move |(k, v)| (k, orig(v))
145            })),
146        }
147    }
148
149    pub fn map_with_key<U, F>(
150        self,
151        f: impl IntoQuotedMut<'a, F, L> + Copy,
152    ) -> KeyedSingleton<K, U, L, B>
153    where
154        F: Fn((K, V)) -> U + 'a,
155        K: Clone,
156    {
157        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
158        KeyedSingleton {
159            underlying: self.underlying.map(q!({
160                let orig = f;
161                move |(k, v)| {
162                    let out = orig((k.clone(), v));
163                    (k, out)
164                }
165            })),
166        }
167    }
168
169    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
170    where
171        F: Fn(&V) -> bool + 'a,
172    {
173        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
174        KeyedSingleton {
175            underlying: self.underlying.filter(q!({
176                let orig = f;
177                move |(_k, v)| orig(v)
178            })),
179        }
180    }
181
182    pub fn filter_map<F, U>(
183        self,
184        f: impl IntoQuotedMut<'a, F, L> + Copy,
185    ) -> KeyedSingleton<K, U, L, B>
186    where
187        F: Fn(V) -> Option<U> + 'a,
188    {
189        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
190        KeyedSingleton {
191            underlying: self.underlying.filter_map(q!({
192                let orig = f;
193                move |(k, v)| orig(v).map(|v| (k, v))
194            })),
195        }
196    }
197
198    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
199        self.underlying.count()
200    }
201
202    /// An operator which allows you to "name" a `HydroNode`.
203    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
204    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
205        {
206            let mut node = self.underlying.ir_node.borrow_mut();
207            let metadata = node.metadata_mut();
208            metadata.tag = Some(name.to_string());
209        }
210        self
211    }
212}
213
214impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
215    pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
216        KeyedSingleton {
217            underlying: Stream::new(
218                self.underlying.location.outer().clone(),
219                // no need to persist due to top-level replay
220                self.underlying.ir_node.into_inner(),
221            ),
222        }
223    }
224}
225
226impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
227    /// Gets the value associated with a specific key from the keyed singleton.
228    ///
229    /// # Example
230    /// ```rust
231    /// # use hydro_lang::*;
232    /// # use futures::StreamExt;
233    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
234    /// let tick = process.tick();
235    /// let keyed_data = process
236    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
237    ///     .into_keyed()
238    ///     .batch(&tick, nondet!(/** test */))
239    ///     .fold(q!(|| 0), q!(|acc, x| *acc = x));
240    /// let key = tick.singleton(q!(1));
241    /// keyed_data.get(key).all_ticks()
242    /// # }, |mut stream| async move {
243    /// // 2
244    /// # assert_eq!(stream.next().await.unwrap(), 2);
245    /// # }));
246    /// ```
247    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
248        self.entries()
249            .join(key.into_stream().map(q!(|k| (k, ()))))
250            .map(q!(|(_, (v, _))| v))
251            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
252            .first()
253    }
254
255    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
256    /// is some additional metadata, emits a keyed stream of lookup results where the key
257    /// is the same as before, but the value is a tuple of the lookup result and the metadata
258    /// of the request. If the key is not found, no output will be produced.
259    ///
260    /// # Example
261    /// ```rust
262    /// # use hydro_lang::*;
263    /// # use futures::StreamExt;
264    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
265    /// let tick = process.tick();
266    /// let keyed_data = process
267    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
268    ///     .into_keyed()
269    ///     .batch(&tick, nondet!(/** test */))
270    ///     .fold(q!(|| 0), q!(|acc, x| *acc = x));
271    /// let other_data = process
272    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
273    ///     .into_keyed()
274    ///     .batch(&tick, nondet!(/** test */));
275    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
276    /// # }, |mut stream| async move {
277    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
278    /// # let mut results = vec![];
279    /// # for _ in 0..3 {
280    /// #     results.push(stream.next().await.unwrap());
281    /// # }
282    /// # results.sort();
283    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
284    /// # }));
285    /// ```
286    pub fn get_many_if_present<O2, R2, V2>(
287        self,
288        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
289    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
290        self.entries()
291            .weaker_retries()
292            .join(requests.entries())
293            .into_keyed()
294    }
295
296    pub fn get_from<V2: Clone>(
297        self,
298        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
299    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
300    where
301        K: Clone,
302        V: Hash + Eq + Clone,
303    {
304        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
305        let lookup_result = from.get_many_if_present(to_lookup.clone());
306        let missing_values =
307            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
308        KeyedSingleton {
309            underlying: lookup_result
310                .entries()
311                .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
312                .chain(missing_values.entries().map(q!(|(v, k)| (k, (v, None))))),
313        }
314    }
315}
316
317impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
318where
319    L: Location<'a> + NoTick + NoAtomic,
320{
321    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
322        KeyedSingleton {
323            underlying: self.underlying.atomic(tick),
324        }
325    }
326
327    /// Given a tick, returns a keyed singleton with a entries consisting of keys with
328    /// snapshots of the value singleton.
329    ///
330    /// # Non-Determinism
331    /// Because this picks a snapshot of each singleton whose value is continuously changing,
332    /// the output singleton has a non-deterministic value since each snapshot can be at an
333    /// arbitrary point in time.
334    pub fn snapshot(
335        self,
336        tick: &Tick<L>,
337        nondet: NonDet,
338    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
339        self.atomic(tick).snapshot(nondet)
340    }
341}
342
343impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
344where
345    L: Location<'a> + NoTick + NoAtomic,
346{
347    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
348    /// arrived since the previous batch was released.
349    ///
350    /// # Non-Determinism
351    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
352    /// has a non-deterministic set of key-value pairs.
353    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
354        self.atomic(tick).batch(nondet)
355    }
356}
357
358impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
359where
360    L: Location<'a> + NoTick + NoAtomic,
361{
362    /// Returns a keyed singleton with a entries consisting of keys with snapshots of the value
363    /// singleton being atomically processed.
364    ///
365    /// # Non-Determinism
366    /// Because this picks a snapshot of each singleton whose value is continuously changing,
367    /// each output singleton has a non-deterministic value since each snapshot can be at an
368    /// arbitrary point in time.
369    pub fn snapshot(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
370        KeyedSingleton {
371            underlying: Stream::new(
372                self.underlying.location.tick,
373                // no need to unpersist due to top-level replay
374                self.underlying.ir_node.into_inner(),
375            ),
376        }
377    }
378
379    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
380        KeyedSingleton {
381            underlying: self.underlying.end_atomic(),
382        }
383    }
384}
385
386impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
387where
388    L: Location<'a> + NoTick + NoAtomic,
389{
390    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
391    /// arrived since the previous batch was released.
392    ///
393    /// # Non-Determinism
394    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
395    /// has a non-deterministic set of key-value pairs.
396    pub fn batch(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
397        KeyedSingleton {
398            underlying: self.underlying.batch(nondet),
399        }
400    }
401}