hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::hash::Hash;
4
5use stageleft::{IntoQuotedMut, QuotedWithContext, q};
6
7use super::boundedness::{Bounded, Boundedness, Unbounded};
8use super::keyed_stream::KeyedStream;
9use super::optional::Optional;
10use super::singleton::Singleton;
11use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
12use crate::forward_handle::ForwardRef;
13#[cfg(stageleft_runtime)]
14use crate::forward_handle::{CycleCollection, ReceiverComplete};
15use crate::live_collections::stream::{Ordering, Retries};
16use crate::location::dynamic::LocationId;
17use crate::location::tick::NoAtomic;
18use crate::location::{Atomic, Location, NoTick, Tick};
19use crate::manual_expr::ManualExpr;
20use crate::nondet::{NonDet, nondet};
21
22/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
23///
24/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
25/// removed / changed), this also includes an additional variant [`BoundedValue`], which indicates
26/// that entries may be added over time, but once an entry is added it will never be removed and
27/// its value will never change.
28pub trait KeyedSingletonBound {
29    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
30    type UnderlyingBound: Boundedness;
31    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
32    type ValueBound: Boundedness;
33}
34
35impl KeyedSingletonBound for Unbounded {
36    type UnderlyingBound = Unbounded;
37    type ValueBound = Unbounded;
38}
39
40impl KeyedSingletonBound for Bounded {
41    type UnderlyingBound = Bounded;
42    type ValueBound = Bounded;
43}
44
45/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
46/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
47/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
48pub struct BoundedValue;
49
50impl KeyedSingletonBound for BoundedValue {
51    type UnderlyingBound = Unbounded;
52    type ValueBound = Bounded;
53}
54
55/// Mapping from keys of type `K` to values of type `V`.
56///
57/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
58/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
59/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
60/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
61/// keys cannot be removed and the value for each key is immutable.
62///
63/// Type Parameters:
64/// - `K`: the type of the key for each entry
65/// - `V`: the type of the value for each entry
66/// - `Loc`: the [`Location`] where the keyed singleton is materialized
67/// - `Bound`: tracks whether the entries are:
68///     - [`Bounded`] (local and finite)
69///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
70///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
71pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
72    pub(crate) underlying: Stream<(K, V), Loc, Bound::UnderlyingBound, NoOrder, ExactlyOnce>,
73}
74
75impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
76    for KeyedSingleton<K, V, Loc, Bound>
77{
78    fn clone(&self) -> Self {
79        KeyedSingleton {
80            underlying: self.underlying.clone(),
81        }
82    }
83}
84
85impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
86    for KeyedSingleton<K, V, L, B>
87where
88    L: Location<'a> + NoTick,
89{
90    type Location = L;
91
92    fn create_source(ident: syn::Ident, location: L) -> Self {
93        KeyedSingleton {
94            underlying: Stream::create_source(ident, location),
95        }
96    }
97}
98
99impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
100    for KeyedSingleton<K, V, L, B>
101where
102    L: Location<'a> + NoTick,
103{
104    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
105        self.underlying.complete(ident, expected_location);
106    }
107}
108
109impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
110    KeyedSingleton<K, V, L, B>
111{
112    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
113    ///
114    /// The value for each key must be bounded, otherwise the resulting stream elements would be
115    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
116    /// into the output.
117    ///
118    /// # Example
119    /// ```rust
120    /// # use hydro_lang::prelude::*;
121    /// # use futures::StreamExt;
122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
123    /// let keyed_singleton = // { 1: 2, 2: 4 }
124    /// # process
125    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
126    /// #     .into_keyed()
127    /// #     .first();
128    /// keyed_singleton.entries()
129    /// # }, |mut stream| async move {
130    /// // (1, 2), (2, 4) in any order
131    /// # let mut results = Vec::new();
132    /// # for _ in 0..2 {
133    /// #     results.push(stream.next().await.unwrap());
134    /// # }
135    /// # results.sort();
136    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
137    /// # }));
138    /// ```
139    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
140        self.underlying
141    }
142
143    /// Flattens the keyed singleton into an unordered stream of just the values.
144    ///
145    /// The value for each key must be bounded, otherwise the resulting stream elements would be
146    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
147    /// into the output.
148    ///
149    /// # Example
150    /// ```rust
151    /// # use hydro_lang::prelude::*;
152    /// # use futures::StreamExt;
153    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
154    /// let keyed_singleton = // { 1: 2, 2: 4 }
155    /// # process
156    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
157    /// #     .into_keyed()
158    /// #     .first();
159    /// keyed_singleton.values()
160    /// # }, |mut stream| async move {
161    /// // 2, 4 in any order
162    /// # let mut results = Vec::new();
163    /// # for _ in 0..2 {
164    /// #     results.push(stream.next().await.unwrap());
165    /// # }
166    /// # results.sort();
167    /// # assert_eq!(results, vec![2, 4]);
168    /// # }));
169    /// ```
170    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
171        self.entries().map(q!(|(_, v)| v))
172    }
173
174    /// Flattens the keyed singleton into an unordered stream of just the keys.
175    ///
176    /// The value for each key must be bounded, otherwise the removal of keys would result in
177    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
178    /// into the output.
179    ///
180    /// # Example
181    /// ```rust
182    /// # use hydro_lang::prelude::*;
183    /// # use futures::StreamExt;
184    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
185    /// let keyed_singleton = // { 1: 2, 2: 4 }
186    /// # process
187    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
188    /// #     .into_keyed()
189    /// #     .first();
190    /// keyed_singleton.keys()
191    /// # }, |mut stream| async move {
192    /// // 1, 2 in any order
193    /// # let mut results = Vec::new();
194    /// # for _ in 0..2 {
195    /// #     results.push(stream.next().await.unwrap());
196    /// # }
197    /// # results.sort();
198    /// # assert_eq!(results, vec![1, 2]);
199    /// # }));
200    /// ```
201    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
202        self.entries().map(q!(|(k, _)| k))
203    }
204
205    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
206    /// entries whose keys are not in the provided stream.
207    ///
208    /// # Example
209    /// ```rust
210    /// # use hydro_lang::prelude::*;
211    /// # use futures::StreamExt;
212    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
213    /// let tick = process.tick();
214    /// let keyed_singleton = // { 1: 2, 2: 4 }
215    /// # process
216    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
217    /// #     .into_keyed()
218    /// #     .first()
219    /// #     .snapshot(&tick, nondet!(/** test */));
220    /// let keys_to_remove = process
221    ///     .source_iter(q!(vec![1]))
222    ///     .batch(&tick, nondet!(/** test */));
223    /// keyed_singleton.filter_key_not_in(keys_to_remove)
224    /// #   .entries().all_ticks()
225    /// # }, |mut stream| async move {
226    /// // { 2: 4 }
227    /// # for w in vec![(2, 4)] {
228    /// #     assert_eq!(stream.next().await.unwrap(), w);
229    /// # }
230    /// # }));
231    /// ```
232    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
233        self,
234        other: Stream<K, L, Bounded, O2, R2>,
235    ) -> Self
236    where
237        K: Hash + Eq,
238    {
239        KeyedSingleton {
240            underlying: self.entries().anti_join(other),
241        }
242    }
243
244    /// An operator which allows you to "inspect" each value of a keyed singleton without
245    /// modifying it. The closure `f` is called on a reference to each value. This is
246    /// mainly useful for debugging, and should not be used to generate side-effects.
247    ///
248    /// # Example
249    /// ```rust
250    /// # use hydro_lang::prelude::*;
251    /// # use futures::StreamExt;
252    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
253    /// let keyed_singleton = // { 1: 2, 2: 4 }
254    /// # process
255    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
256    /// #     .into_keyed()
257    /// #     .first();
258    /// keyed_singleton
259    ///     .inspect(q!(|v| println!("{}", v)))
260    /// #   .entries()
261    /// # }, |mut stream| async move {
262    /// // { 1: 2, 2: 4 }
263    /// # for w in vec![(1, 2), (2, 4)] {
264    /// #     assert_eq!(stream.next().await.unwrap(), w);
265    /// # }
266    /// # }));
267    /// ```
268    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
269    where
270        F: Fn(&V) + 'a,
271    {
272        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
273        KeyedSingleton {
274            underlying: self.underlying.inspect(q!({
275                let orig = f;
276                move |(_k, v)| orig(v)
277            })),
278        }
279    }
280
281    /// An operator which allows you to "inspect" each entry of a keyed singleton without
282    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
283    /// mainly useful for debugging, and should not be used to generate side-effects.
284    ///
285    /// # Example
286    /// ```rust
287    /// # use hydro_lang::prelude::*;
288    /// # use futures::StreamExt;
289    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
290    /// let keyed_singleton = // { 1: 2, 2: 4 }
291    /// # process
292    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
293    /// #     .into_keyed()
294    /// #     .first();
295    /// keyed_singleton
296    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
297    /// #   .entries()
298    /// # }, |mut stream| async move {
299    /// // { 1: 2, 2: 4 }
300    /// # for w in vec![(1, 2), (2, 4)] {
301    /// #     assert_eq!(stream.next().await.unwrap(), w);
302    /// # }
303    /// # }));
304    /// ```
305    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> KeyedSingleton<K, V, L, B>
306    where
307        F: Fn(&(K, V)) + 'a,
308    {
309        KeyedSingleton {
310            underlying: self.underlying.inspect(f),
311        }
312    }
313
314    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
315    /// element, the value.
316    ///
317    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
318    ///
319    /// # Example
320    /// ```rust
321    /// # use hydro_lang::prelude::*;
322    /// # use futures::StreamExt;
323    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
324    /// let keyed_singleton = // { 1: 2, 2: 4 }
325    /// # process
326    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
327    /// #     .into_keyed()
328    /// #     .first();
329    /// keyed_singleton
330    ///     .clone()
331    ///     .into_keyed_stream()
332    ///     .interleave(
333    ///         keyed_singleton.into_keyed_stream()
334    ///     )
335    /// #   .entries()
336    /// # }, |mut stream| async move {
337    /// /// // { 1: [2, 2], 2: [4, 4] }
338    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
339    /// #     assert_eq!(stream.next().await.unwrap(), w);
340    /// # }
341    /// # }));
342    /// ```
343    pub fn into_keyed_stream(
344        self,
345    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
346        self.underlying
347            .into_keyed()
348            .assume_ordering(nondet!(/** only one element per key */))
349    }
350}
351
352impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
353    /// Transforms each value by invoking `f` on each element, with keys staying the same
354    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
355    ///
356    /// If you do not want to modify the stream and instead only want to view
357    /// each item use [`KeyedStream::inspect`] instead.
358    ///
359    /// # Example
360    /// ```rust
361    /// # use hydro_lang::prelude::*;
362    /// # use futures::StreamExt;
363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364    /// let keyed_singleton = // { 1: 2, 2: 4 }
365    /// # process
366    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
367    /// #     .into_keyed()
368    /// #     .first();
369    /// keyed_singleton.map(q!(|v| v + 1))
370    /// #   .entries()
371    /// # }, |mut stream| async move {
372    /// // { 1: 3, 2: 5 }
373    /// # let mut results = Vec::new();
374    /// # for _ in 0..2 {
375    /// #     results.push(stream.next().await.unwrap());
376    /// # }
377    /// # results.sort();
378    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
379    /// # }));
380    /// ```
381    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
382    where
383        F: Fn(V) -> U + 'a,
384    {
385        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
386        KeyedSingleton {
387            underlying: self.underlying.map(q!({
388                let orig = f;
389                move |(k, v)| (k, orig(v))
390            })),
391        }
392    }
393
394    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
395    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
396    ///
397    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
398    /// the new value `U`. The key remains unchanged in the output.
399    ///
400    /// # Example
401    /// ```rust
402    /// # use hydro_lang::prelude::*;
403    /// # use futures::StreamExt;
404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405    /// let keyed_singleton = // { 1: 2, 2: 4 }
406    /// # process
407    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
408    /// #     .into_keyed()
409    /// #     .first();
410    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
411    /// #   .entries()
412    /// # }, |mut stream| async move {
413    /// // { 1: 3, 2: 6 }
414    /// # let mut results = Vec::new();
415    /// # for _ in 0..2 {
416    /// #     results.push(stream.next().await.unwrap());
417    /// # }
418    /// # results.sort();
419    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
420    /// # }));
421    /// ```
422    pub fn map_with_key<U, F>(
423        self,
424        f: impl IntoQuotedMut<'a, F, L> + Copy,
425    ) -> KeyedSingleton<K, U, L, B>
426    where
427        F: Fn((K, V)) -> U + 'a,
428        K: Clone,
429    {
430        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
431        KeyedSingleton {
432            underlying: self.underlying.map(q!({
433                let orig = f;
434                move |(k, v)| {
435                    let out = orig((k.clone(), v));
436                    (k, out)
437                }
438            })),
439        }
440    }
441
442    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
443    ///
444    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
445    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
446    /// is filtered out.
447    ///
448    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
449    /// not modify or take ownership of the values. If you need to modify the values while filtering
450    /// use [`KeyedSingleton::filter_map`] instead.
451    ///
452    /// # Example
453    /// ```rust
454    /// # use hydro_lang::prelude::*;
455    /// # use futures::StreamExt;
456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
458    /// # process
459    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
460    /// #     .into_keyed()
461    /// #     .first();
462    /// keyed_singleton.filter(q!(|&v| v > 1))
463    /// #   .entries()
464    /// # }, |mut stream| async move {
465    /// // { 1: 2, 2: 4 }
466    /// # let mut results = Vec::new();
467    /// # for _ in 0..2 {
468    /// #     results.push(stream.next().await.unwrap());
469    /// # }
470    /// # results.sort();
471    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
472    /// # }));
473    /// ```
474    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
475    where
476        F: Fn(&V) -> bool + 'a,
477    {
478        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
479        KeyedSingleton {
480            underlying: self.underlying.filter(q!({
481                let orig = f;
482                move |(_k, v)| orig(v)
483            })),
484        }
485    }
486
487    /// An operator that both filters and maps values. It yields only the key-value pairs where
488    /// the supplied closure `f` returns `Some(value)`.
489    ///
490    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
491    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
492    /// If it returns `None`, the key-value pair is filtered out.
493    ///
494    /// # Example
495    /// ```rust
496    /// # use hydro_lang::prelude::*;
497    /// # use futures::StreamExt;
498    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
499    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
500    /// # process
501    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
502    /// #     .into_keyed()
503    /// #     .first();
504    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
505    /// #   .entries()
506    /// # }, |mut stream| async move {
507    /// // { 1: 42, 3: 100 }
508    /// # let mut results = Vec::new();
509    /// # for _ in 0..2 {
510    /// #     results.push(stream.next().await.unwrap());
511    /// # }
512    /// # results.sort();
513    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
514    /// # }));
515    /// ```
516    pub fn filter_map<F, U>(
517        self,
518        f: impl IntoQuotedMut<'a, F, L> + Copy,
519    ) -> KeyedSingleton<K, U, L, B>
520    where
521        F: Fn(V) -> Option<U> + 'a,
522    {
523        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
524        KeyedSingleton {
525            underlying: self.underlying.filter_map(q!({
526                let orig = f;
527                move |(k, v)| orig(v).map(|v| (k, v))
528            })),
529        }
530    }
531
532    /// Gets the number of keys in the keyed singleton.
533    ///
534    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
535    /// since keys may be added / removed over time. When the set of keys changes, the count will
536    /// be asynchronously updated.
537    ///
538    /// # Example
539    /// ```rust
540    /// # use hydro_lang::prelude::*;
541    /// # use futures::StreamExt;
542    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543    /// # let tick = process.tick();
544    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
545    /// # process
546    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
547    /// #     .into_keyed()
548    /// #     .batch(&tick, nondet!(/** test */))
549    /// #     .first();
550    /// keyed_singleton.key_count()
551    /// # .all_ticks()
552    /// # }, |mut stream| async move {
553    /// // 3
554    /// # assert_eq!(stream.next().await.unwrap(), 3);
555    /// # }));
556    /// ```
557    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
558        self.underlying.count()
559    }
560
561    /// An operator which allows you to "name" a `HydroNode`.
562    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
563    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
564        {
565            let mut node = self.underlying.ir_node.borrow_mut();
566            let metadata = node.metadata_mut();
567            metadata.tag = Some(name.to_string());
568        }
569        self
570    }
571}
572
573impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
574    /// Gets the value associated with a specific key from the keyed singleton.
575    ///
576    /// # Example
577    /// ```rust
578    /// # use hydro_lang::prelude::*;
579    /// # use futures::StreamExt;
580    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
581    /// let tick = process.tick();
582    /// let keyed_data = process
583    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
584    ///     .into_keyed()
585    ///     .batch(&tick, nondet!(/** test */))
586    ///     .first();
587    /// let key = tick.singleton(q!(1));
588    /// keyed_data.get(key).all_ticks()
589    /// # }, |mut stream| async move {
590    /// // 2
591    /// # assert_eq!(stream.next().await.unwrap(), 2);
592    /// # }));
593    /// ```
594    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
595        self.entries()
596            .join(key.into_stream().map(q!(|k| (k, ()))))
597            .map(q!(|(_, (v, _))| v))
598            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
599            .first()
600    }
601
602    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
603    /// is some additional metadata, emits a keyed stream of lookup results where the key
604    /// is the same as before, but the value is a tuple of the lookup result and the metadata
605    /// of the request. If the key is not found, no output will be produced.
606    ///
607    /// # Example
608    /// ```rust
609    /// # use hydro_lang::prelude::*;
610    /// # use futures::StreamExt;
611    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
612    /// let tick = process.tick();
613    /// let keyed_data = process
614    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
615    ///     .into_keyed()
616    ///     .batch(&tick, nondet!(/** test */))
617    ///     .first();
618    /// let other_data = process
619    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
620    ///     .into_keyed()
621    ///     .batch(&tick, nondet!(/** test */));
622    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
623    /// # }, |mut stream| async move {
624    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
625    /// # let mut results = vec![];
626    /// # for _ in 0..3 {
627    /// #     results.push(stream.next().await.unwrap());
628    /// # }
629    /// # results.sort();
630    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
631    /// # }));
632    /// ```
633    pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
634        self,
635        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
636    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
637        self.entries()
638            .weaker_retries::<R2>()
639            .join(requests.entries())
640            .into_keyed()
641    }
642
643    /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
644    /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
645    /// containing the value from `self` and an option of the value from `from`. If the key is not
646    /// present in `from`, the option will be [`None`].
647    ///
648    /// # Example
649    /// ```rust
650    /// # use hydro_lang::prelude::*;
651    /// # use futures::StreamExt;
652    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
653    /// # let tick = process.tick();
654    /// let requests = // { 1: 10, 2: 20 }
655    /// # process
656    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
657    /// #     .into_keyed()
658    /// #     .batch(&tick, nondet!(/** test */))
659    /// #     .first();
660    /// let other_data = // { 10: 100, 11: 101 }
661    /// # process
662    /// #     .source_iter(q!(vec![(10, 100), (11, 101)]))
663    /// #     .into_keyed()
664    /// #     .batch(&tick, nondet!(/** test */))
665    /// #     .first();
666    /// requests.get_from(other_data)
667    /// # .entries().all_ticks()
668    /// # }, |mut stream| async move {
669    /// // { 1: (10, Some(100)), 2: (20, None) }
670    /// # let mut results = vec![];
671    /// # for _ in 0..2 {
672    /// #     results.push(stream.next().await.unwrap());
673    /// # }
674    /// # results.sort();
675    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
676    /// # }));
677    /// ```
678    pub fn get_from<V2: Clone>(
679        self,
680        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
681    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
682    where
683        K: Clone,
684        V: Hash + Eq + Clone,
685    {
686        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
687        let lookup_result = from.get_many_if_present(to_lookup.clone());
688        let missing_values =
689            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
690        KeyedSingleton {
691            underlying: lookup_result
692                .entries()
693                .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
694                .chain(missing_values.entries().map(q!(|(v, k)| (k, (v, None))))),
695        }
696    }
697}
698
699impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
700where
701    L: Location<'a> + NoTick + NoAtomic,
702{
703    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
704    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
705    ///
706    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
707    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
708    /// argument that declares where the keyed singleton will be atomically processed. Batching a
709    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
710    /// batching into a different [`Tick`] will introduce asynchrony.
711    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
712        KeyedSingleton {
713            underlying: self.underlying.atomic(tick),
714        }
715    }
716
717    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
718    /// point in time.
719    ///
720    /// # Non-Determinism
721    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
722    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
723    pub fn snapshot(
724        self,
725        tick: &Tick<L>,
726        nondet: NonDet,
727    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
728        self.atomic(tick).snapshot(nondet)
729    }
730}
731
732impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
733where
734    L: Location<'a> + NoTick + NoAtomic,
735{
736    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
737    /// state of the keyed singleton being atomically processed.
738    ///
739    /// # Non-Determinism
740    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
741    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
742    pub fn snapshot(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
743        KeyedSingleton {
744            underlying: Stream::new(
745                self.underlying.location.tick,
746                // no need to unpersist due to top-level replay
747                self.underlying.ir_node.into_inner(),
748            ),
749        }
750    }
751
752    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
753    /// See [`KeyedSingleton::atomic`] for more details.
754    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
755        KeyedSingleton {
756            underlying: self.underlying.end_atomic(),
757        }
758    }
759}
760
761impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
762    /// Asynchronously yields this keyed singleton outside the tick, which will
763    /// be asynchronously updated with the latest set of entries inside the tick.
764    ///
765    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
766    /// tick that tracks the inner value. This is useful for getting the value as of the
767    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
768    ///
769    /// The entire set of entries are propagated on each tick, which means that if a tick
770    /// does not have a key "XYZ" that was present in the previous tick, the entry for "XYZ" will
771    /// also be removed from the output.
772    ///
773    /// # Example
774    /// ```rust
775    /// # use hydro_lang::prelude::*;
776    /// # use futures::StreamExt;
777    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778    /// let tick = process.tick();
779    /// # // ticks are lazy by default, forces the second tick to run
780    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
781    /// # let batch_first_tick = process
782    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
783    /// #   .batch(&tick, nondet!(/** test */))
784    /// #   .into_keyed();
785    /// # let batch_second_tick = process
786    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
787    /// #   .batch(&tick, nondet!(/** test */))
788    /// #   .into_keyed()
789    /// #   .defer_tick(); // appears on the second tick
790    /// # let input_batch = batch_first_tick.chain(batch_second_tick).first();
791    /// input_batch // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
792    ///     .latest()
793    /// # .snapshot(&tick, nondet!(/** test */))
794    /// # .entries()
795    /// # .all_ticks()
796    /// # }, |mut stream| async move {
797    /// // asynchronously changes from { 1: 2, 2: 3 } ~> { 2: 4, 3: 5 }
798    /// # for w in vec![(1, 2), (2, 3), (2, 4), (3, 5)] {
799    /// #     assert_eq!(stream.next().await.unwrap(), w);
800    /// # }
801    /// # }));
802    /// ```
803    pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
804        KeyedSingleton {
805            underlying: Stream::new(
806                self.underlying.location.outer().clone(),
807                // no need to persist due to top-level replay
808                self.underlying.ir_node.into_inner(),
809            ),
810        }
811    }
812
813    /// Synchronously yields this keyed singleton outside the tick as an unbounded keyed singleton,
814    /// which will be updated with the latest set of entries inside the tick.
815    ///
816    /// Unlike [`KeyedSingleton::latest`], this preserves synchronous execution, as the output
817    /// keyed singleton is emitted in an [`Atomic`] context that will process elements synchronously
818    /// with the input keyed singleton's [`Tick`] context.
819    pub fn latest_atomic(self) -> KeyedSingleton<K, V, Atomic<L>, Unbounded> {
820        KeyedSingleton {
821            underlying: Stream::new(
822                Atomic {
823                    tick: self.underlying.location,
824                },
825                // no need to persist due to top-level replay
826                self.underlying.ir_node.into_inner(),
827            ),
828        }
829    }
830
831    #[expect(missing_docs, reason = "TODO")]
832    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
833        KeyedSingleton {
834            underlying: self.underlying.defer_tick(),
835        }
836    }
837}
838
839impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
840where
841    L: Location<'a> + NoTick + NoAtomic,
842{
843    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
844    /// arrived since the previous batch was released.
845    ///
846    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
847    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
848    ///
849    /// # Non-Determinism
850    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
851    /// has a non-deterministic set of key-value pairs.
852    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
853        self.atomic(tick).batch(nondet)
854    }
855}
856
857impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
858where
859    L: Location<'a> + NoTick + NoAtomic,
860{
861    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
862    /// atomically processed.
863    ///
864    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
865    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
866    ///
867    /// # Non-Determinism
868    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
869    /// has a non-deterministic set of key-value pairs.
870    pub fn batch(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
871        KeyedSingleton {
872            underlying: self.underlying.batch(nondet),
873        }
874    }
875}