hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20use crate::forward_handle::ForwardRef;
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
31///
32/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
33/// removed / changed), this also includes an additional variant [`BoundedValue`], which indicates
34/// that entries may be added over time, but once an entry is added it will never be removed and
35/// its value will never change.
36pub trait KeyedSingletonBound {
37    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
38    type UnderlyingBound: Boundedness;
39    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
40    type ValueBound: Boundedness;
41
42    /// The type of the keyed singleton if the value for each key is immutable.
43    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
44
45    /// The type of the keyed singleton if the value for each key may change asynchronously.
46    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
47
48    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
49    fn bound_kind() -> KeyedSingletonBoundKind;
50}
51
52impl KeyedSingletonBound for Unbounded {
53    type UnderlyingBound = Unbounded;
54    type ValueBound = Unbounded;
55    type WithBoundedValue = BoundedValue;
56    type WithUnboundedValue = Unbounded;
57
58    fn bound_kind() -> KeyedSingletonBoundKind {
59        KeyedSingletonBoundKind::Unbounded
60    }
61}
62
63impl KeyedSingletonBound for Bounded {
64    type UnderlyingBound = Bounded;
65    type ValueBound = Bounded;
66    type WithBoundedValue = Bounded;
67    type WithUnboundedValue = UnreachableBound;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Bounded
71    }
72}
73
74/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
75/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
76/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
77pub struct BoundedValue;
78
79impl KeyedSingletonBound for BoundedValue {
80    type UnderlyingBound = Unbounded;
81    type ValueBound = Bounded;
82    type WithBoundedValue = BoundedValue;
83    type WithUnboundedValue = Unbounded;
84
85    fn bound_kind() -> KeyedSingletonBoundKind {
86        KeyedSingletonBoundKind::BoundedValue
87    }
88}
89
90#[doc(hidden)]
91pub struct UnreachableBound;
92
93impl KeyedSingletonBound for UnreachableBound {
94    type UnderlyingBound = Bounded;
95    type ValueBound = Unbounded;
96
97    type WithBoundedValue = Bounded;
98    type WithUnboundedValue = UnreachableBound;
99
100    fn bound_kind() -> KeyedSingletonBoundKind {
101        unreachable!("UnreachableBound cannot be instantiated")
102    }
103}
104
105/// Mapping from keys of type `K` to values of type `V`.
106///
107/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
108/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
109/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
110/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
111/// keys cannot be removed and the value for each key is immutable.
112///
113/// Type Parameters:
114/// - `K`: the type of the key for each entry
115/// - `V`: the type of the value for each entry
116/// - `Loc`: the [`Location`] where the keyed singleton is materialized
117/// - `Bound`: tracks whether the entries are:
118///     - [`Bounded`] (local and finite)
119///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
120///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
121pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
122    pub(crate) location: Loc,
123    pub(crate) ir_node: RefCell<HydroNode>,
124
125    _phantom: PhantomData<(K, V, Loc, Bound)>,
126}
127
128impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
129    for KeyedSingleton<K, V, Loc, Bound>
130{
131    fn clone(&self) -> Self {
132        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
133            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
134            *self.ir_node.borrow_mut() = HydroNode::Tee {
135                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
136                metadata: self.location.new_node_metadata(Self::collection_kind()),
137            };
138        }
139
140        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
141            KeyedSingleton {
142                location: self.location.clone(),
143                ir_node: HydroNode::Tee {
144                    inner: TeeNode(inner.0.clone()),
145                    metadata: metadata.clone(),
146                }
147                .into(),
148                _phantom: PhantomData,
149            }
150        } else {
151            unreachable!()
152        }
153    }
154}
155
156impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
157    for KeyedSingleton<K, V, L, B>
158where
159    L: Location<'a> + NoTick,
160{
161    type Location = L;
162
163    fn create_source(ident: syn::Ident, location: L) -> Self {
164        KeyedSingleton {
165            location: location.clone(),
166            ir_node: RefCell::new(HydroNode::CycleSource {
167                ident,
168                metadata: location.new_node_metadata(Self::collection_kind()),
169            }),
170            _phantom: PhantomData,
171        }
172    }
173}
174
175impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
176    for KeyedSingleton<K, V, L, B>
177where
178    L: Location<'a> + NoTick,
179{
180    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
181        assert_eq!(
182            Location::id(&self.location),
183            expected_location,
184            "locations do not match"
185        );
186        self.location
187            .flow_state()
188            .borrow_mut()
189            .push_root(HydroRoot::CycleSink {
190                ident,
191                input: Box::new(self.ir_node.into_inner()),
192                op_metadata: HydroIrOpMetadata::new(),
193            });
194    }
195}
196
197impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
198    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
199        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
200        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
201
202        KeyedSingleton {
203            location,
204            ir_node: RefCell::new(ir_node),
205            _phantom: PhantomData,
206        }
207    }
208
209    /// Returns the [`Location`] where this keyed singleton is being materialized.
210    pub fn location(&self) -> &L {
211        &self.location
212    }
213}
214
215#[cfg(stageleft_runtime)]
216fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
217    me: KeyedSingleton<K, V, L, Bounded>,
218) -> Singleton<usize, L, Bounded> {
219    me.entries().count()
220}
221
222#[cfg(stageleft_runtime)]
223fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
224    me: KeyedSingleton<K, V, L, Bounded>,
225) -> Singleton<HashMap<K, V>, L, Bounded>
226where
227    K: Eq + Hash,
228{
229    me.entries()
230        .assume_ordering(nondet!(
231            /// Because this is a keyed singleton, there is only one value per key.
232        ))
233        .fold(
234            q!(|| HashMap::new()),
235            q!(|map, (k, v)| {
236                map.insert(k, v);
237            }),
238        )
239}
240
241impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
242    pub(crate) fn collection_kind() -> CollectionKind {
243        CollectionKind::KeyedSingleton {
244            bound: B::bound_kind(),
245            key_type: stageleft::quote_type::<K>().into(),
246            value_type: stageleft::quote_type::<V>().into(),
247        }
248    }
249
250    /// Transforms each value by invoking `f` on each element, with keys staying the same
251    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
252    ///
253    /// If you do not want to modify the stream and instead only want to view
254    /// each item use [`KeyedSingleton::inspect`] instead.
255    ///
256    /// # Example
257    /// ```rust
258    /// # use hydro_lang::prelude::*;
259    /// # use futures::StreamExt;
260    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
261    /// let keyed_singleton = // { 1: 2, 2: 4 }
262    /// # process
263    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
264    /// #     .into_keyed()
265    /// #     .first();
266    /// keyed_singleton.map(q!(|v| v + 1))
267    /// #   .entries()
268    /// # }, |mut stream| async move {
269    /// // { 1: 3, 2: 5 }
270    /// # let mut results = Vec::new();
271    /// # for _ in 0..2 {
272    /// #     results.push(stream.next().await.unwrap());
273    /// # }
274    /// # results.sort();
275    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
276    /// # }));
277    /// ```
278    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
279    where
280        F: Fn(V) -> U + 'a,
281    {
282        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
283        let map_f = q!({
284            let orig = f;
285            move |(k, v)| (k, orig(v))
286        })
287        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
288        .into();
289
290        KeyedSingleton::new(
291            self.location.clone(),
292            HydroNode::Map {
293                f: map_f,
294                input: Box::new(self.ir_node.into_inner()),
295                metadata: self
296                    .location
297                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
298            },
299        )
300    }
301
302    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
303    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
304    ///
305    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
306    /// the new value `U`. The key remains unchanged in the output.
307    ///
308    /// # Example
309    /// ```rust
310    /// # use hydro_lang::prelude::*;
311    /// # use futures::StreamExt;
312    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
313    /// let keyed_singleton = // { 1: 2, 2: 4 }
314    /// # process
315    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
316    /// #     .into_keyed()
317    /// #     .first();
318    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
319    /// #   .entries()
320    /// # }, |mut stream| async move {
321    /// // { 1: 3, 2: 6 }
322    /// # let mut results = Vec::new();
323    /// # for _ in 0..2 {
324    /// #     results.push(stream.next().await.unwrap());
325    /// # }
326    /// # results.sort();
327    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
328    /// # }));
329    /// ```
330    pub fn map_with_key<U, F>(
331        self,
332        f: impl IntoQuotedMut<'a, F, L> + Copy,
333    ) -> KeyedSingleton<K, U, L, B>
334    where
335        F: Fn((K, V)) -> U + 'a,
336        K: Clone,
337    {
338        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
339        let map_f = q!({
340            let orig = f;
341            move |(k, v)| {
342                let out = orig((Clone::clone(&k), v));
343                (k, out)
344            }
345        })
346        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
347        .into();
348
349        KeyedSingleton::new(
350            self.location.clone(),
351            HydroNode::Map {
352                f: map_f,
353                input: Box::new(self.ir_node.into_inner()),
354                metadata: self
355                    .location
356                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
357            },
358        )
359    }
360
361    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
362    ///
363    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
364    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
365    /// is filtered out.
366    ///
367    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
368    /// not modify or take ownership of the values. If you need to modify the values while filtering
369    /// use [`KeyedSingleton::filter_map`] instead.
370    ///
371    /// # Example
372    /// ```rust
373    /// # use hydro_lang::prelude::*;
374    /// # use futures::StreamExt;
375    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
376    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
377    /// # process
378    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
379    /// #     .into_keyed()
380    /// #     .first();
381    /// keyed_singleton.filter(q!(|&v| v > 1))
382    /// #   .entries()
383    /// # }, |mut stream| async move {
384    /// // { 1: 2, 2: 4 }
385    /// # let mut results = Vec::new();
386    /// # for _ in 0..2 {
387    /// #     results.push(stream.next().await.unwrap());
388    /// # }
389    /// # results.sort();
390    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
391    /// # }));
392    /// ```
393    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
394    where
395        F: Fn(&V) -> bool + 'a,
396    {
397        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
398        let filter_f = q!({
399            let orig = f;
400            move |t: &(_, _)| orig(&t.1)
401        })
402        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
403        .into();
404
405        KeyedSingleton::new(
406            self.location.clone(),
407            HydroNode::Filter {
408                f: filter_f,
409                input: Box::new(self.ir_node.into_inner()),
410                metadata: self
411                    .location
412                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
413            },
414        )
415    }
416
417    /// An operator that both filters and maps values. It yields only the key-value pairs where
418    /// the supplied closure `f` returns `Some(value)`.
419    ///
420    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
421    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
422    /// If it returns `None`, the key-value pair is filtered out.
423    ///
424    /// # Example
425    /// ```rust
426    /// # use hydro_lang::prelude::*;
427    /// # use futures::StreamExt;
428    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
429    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
430    /// # process
431    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
432    /// #     .into_keyed()
433    /// #     .first();
434    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
435    /// #   .entries()
436    /// # }, |mut stream| async move {
437    /// // { 1: 42, 3: 100 }
438    /// # let mut results = Vec::new();
439    /// # for _ in 0..2 {
440    /// #     results.push(stream.next().await.unwrap());
441    /// # }
442    /// # results.sort();
443    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
444    /// # }));
445    /// ```
446    pub fn filter_map<F, U>(
447        self,
448        f: impl IntoQuotedMut<'a, F, L> + Copy,
449    ) -> KeyedSingleton<K, U, L, B>
450    where
451        F: Fn(V) -> Option<U> + 'a,
452    {
453        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
454        let filter_map_f = q!({
455            let orig = f;
456            move |(k, v)| orig(v).map(|o| (k, o))
457        })
458        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
459        .into();
460
461        KeyedSingleton::new(
462            self.location.clone(),
463            HydroNode::FilterMap {
464                f: filter_map_f,
465                input: Box::new(self.ir_node.into_inner()),
466                metadata: self
467                    .location
468                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
469            },
470        )
471    }
472
473    /// Gets the number of keys in the keyed singleton.
474    ///
475    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
476    /// since keys may be added / removed over time. When the set of keys changes, the count will
477    /// be asynchronously updated.
478    ///
479    /// # Example
480    /// ```rust
481    /// # use hydro_lang::prelude::*;
482    /// # use futures::StreamExt;
483    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
484    /// # let tick = process.tick();
485    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
486    /// # process
487    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
488    /// #     .into_keyed()
489    /// #     .batch(&tick, nondet!(/** test */))
490    /// #     .first();
491    /// keyed_singleton.key_count()
492    /// # .all_ticks()
493    /// # }, |mut stream| async move {
494    /// // 3
495    /// # assert_eq!(stream.next().await.unwrap(), 3);
496    /// # }));
497    /// ```
498    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
499        if B::ValueBound::BOUNDED {
500            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
501                location: self.location,
502                ir_node: self.ir_node,
503                _phantom: PhantomData,
504            };
505
506            me.entries().count()
507        } else if L::is_top_level()
508            && let Some(tick) = self.location.try_tick()
509        {
510            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
511                location: self.location,
512                ir_node: self.ir_node,
513                _phantom: PhantomData,
514            };
515
516            let out =
517                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
518                    .latest();
519            Singleton::new(out.location, out.ir_node.into_inner())
520        } else {
521            panic!("Unbounded KeyedSingleton inside a tick");
522        }
523    }
524
525    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
526    ///
527    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
528    /// asynchronously as well.
529    ///
530    /// # Example
531    /// ```rust
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
536    /// # process
537    /// #     .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
538    /// #     .into_keyed()
539    /// #     .batch(&process.tick(), nondet!(/** test */))
540    /// #     .first();
541    /// keyed_singleton.into_singleton()
542    /// # .all_ticks()
543    /// # }, |mut stream| async move {
544    /// // { 1: "a", 2: "b", 3: "c" }
545    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
546    /// # }));
547    /// ```
548    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
549    where
550        K: Eq + Hash,
551    {
552        if B::ValueBound::BOUNDED {
553            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
554                location: self.location,
555                ir_node: self.ir_node,
556                _phantom: PhantomData,
557            };
558
559            me.entries()
560                .assume_ordering(nondet!(
561                    /// Because this is a keyed singleton, there is only one value per key.
562                ))
563                .fold(
564                    q!(|| HashMap::new()),
565                    q!(|map, (k, v)| {
566                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
567                        map.insert(k, v);
568                    }),
569                )
570        } else if L::is_top_level()
571            && let Some(tick) = self.location.try_tick()
572        {
573            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
574                location: self.location,
575                ir_node: self.ir_node,
576                _phantom: PhantomData,
577            };
578
579            let out = into_singleton_inside_tick(
580                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
581            )
582            .latest();
583            Singleton::new(out.location, out.ir_node.into_inner())
584        } else {
585            panic!("Unbounded KeyedSingleton inside a tick");
586        }
587    }
588
589    /// An operator which allows you to "name" a `HydroNode`.
590    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
591    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
592        {
593            let mut node = self.ir_node.borrow_mut();
594            let metadata = node.metadata_mut();
595            metadata.tag = Some(name.to_string());
596        }
597        self
598    }
599}
600
601impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
602    KeyedSingleton<K, V, L, B>
603{
604    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
605    ///
606    /// The value for each key must be bounded, otherwise the resulting stream elements would be
607    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
608    /// into the output.
609    ///
610    /// # Example
611    /// ```rust
612    /// # use hydro_lang::prelude::*;
613    /// # use futures::StreamExt;
614    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
615    /// let keyed_singleton = // { 1: 2, 2: 4 }
616    /// # process
617    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
618    /// #     .into_keyed()
619    /// #     .first();
620    /// keyed_singleton.entries()
621    /// # }, |mut stream| async move {
622    /// // (1, 2), (2, 4) in any order
623    /// # let mut results = Vec::new();
624    /// # for _ in 0..2 {
625    /// #     results.push(stream.next().await.unwrap());
626    /// # }
627    /// # results.sort();
628    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
629    /// # }));
630    /// ```
631    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
632        self.into_keyed_stream().entries()
633    }
634
635    /// Flattens the keyed singleton into an unordered stream of just the values.
636    ///
637    /// The value for each key must be bounded, otherwise the resulting stream elements would be
638    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
639    /// into the output.
640    ///
641    /// # Example
642    /// ```rust
643    /// # use hydro_lang::prelude::*;
644    /// # use futures::StreamExt;
645    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
646    /// let keyed_singleton = // { 1: 2, 2: 4 }
647    /// # process
648    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
649    /// #     .into_keyed()
650    /// #     .first();
651    /// keyed_singleton.values()
652    /// # }, |mut stream| async move {
653    /// // 2, 4 in any order
654    /// # let mut results = Vec::new();
655    /// # for _ in 0..2 {
656    /// #     results.push(stream.next().await.unwrap());
657    /// # }
658    /// # results.sort();
659    /// # assert_eq!(results, vec![2, 4]);
660    /// # }));
661    /// ```
662    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
663        let map_f = q!(|(_, v)| v)
664            .splice_fn1_ctx::<(K, V), V>(&self.location)
665            .into();
666
667        Stream::new(
668            self.location.clone(),
669            HydroNode::Map {
670                f: map_f,
671                input: Box::new(self.ir_node.into_inner()),
672                metadata: self.location.new_node_metadata(Stream::<
673                    V,
674                    L,
675                    B::UnderlyingBound,
676                    NoOrder,
677                    ExactlyOnce,
678                >::collection_kind()),
679            },
680        )
681    }
682
683    /// Flattens the keyed singleton into an unordered stream of just the keys.
684    ///
685    /// The value for each key must be bounded, otherwise the removal of keys would result in
686    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
687    /// into the output.
688    ///
689    /// # Example
690    /// ```rust
691    /// # use hydro_lang::prelude::*;
692    /// # use futures::StreamExt;
693    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
694    /// let keyed_singleton = // { 1: 2, 2: 4 }
695    /// # process
696    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
697    /// #     .into_keyed()
698    /// #     .first();
699    /// keyed_singleton.keys()
700    /// # }, |mut stream| async move {
701    /// // 1, 2 in any order
702    /// # let mut results = Vec::new();
703    /// # for _ in 0..2 {
704    /// #     results.push(stream.next().await.unwrap());
705    /// # }
706    /// # results.sort();
707    /// # assert_eq!(results, vec![1, 2]);
708    /// # }));
709    /// ```
710    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
711        self.entries().map(q!(|(k, _)| k))
712    }
713
714    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
715    /// entries whose keys are not in the provided stream.
716    ///
717    /// # Example
718    /// ```rust
719    /// # use hydro_lang::prelude::*;
720    /// # use futures::StreamExt;
721    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722    /// let tick = process.tick();
723    /// let keyed_singleton = // { 1: 2, 2: 4 }
724    /// # process
725    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
726    /// #     .into_keyed()
727    /// #     .first()
728    /// #     .batch(&tick, nondet!(/** test */));
729    /// let keys_to_remove = process
730    ///     .source_iter(q!(vec![1]))
731    ///     .batch(&tick, nondet!(/** test */));
732    /// keyed_singleton.filter_key_not_in(keys_to_remove)
733    /// #   .entries().all_ticks()
734    /// # }, |mut stream| async move {
735    /// // { 2: 4 }
736    /// # for w in vec![(2, 4)] {
737    /// #     assert_eq!(stream.next().await.unwrap(), w);
738    /// # }
739    /// # }));
740    /// ```
741    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
742        self,
743        other: Stream<K, L, Bounded, O2, R2>,
744    ) -> Self
745    where
746        K: Hash + Eq,
747    {
748        check_matching_location(&self.location, &other.location);
749
750        KeyedSingleton::new(
751            self.location.clone(),
752            HydroNode::AntiJoin {
753                pos: Box::new(self.ir_node.into_inner()),
754                neg: Box::new(other.ir_node.into_inner()),
755                metadata: self.location.new_node_metadata(Self::collection_kind()),
756            },
757        )
758    }
759
760    /// An operator which allows you to "inspect" each value of a keyed singleton without
761    /// modifying it. The closure `f` is called on a reference to each value. This is
762    /// mainly useful for debugging, and should not be used to generate side-effects.
763    ///
764    /// # Example
765    /// ```rust
766    /// # use hydro_lang::prelude::*;
767    /// # use futures::StreamExt;
768    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769    /// let keyed_singleton = // { 1: 2, 2: 4 }
770    /// # process
771    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
772    /// #     .into_keyed()
773    /// #     .first();
774    /// keyed_singleton
775    ///     .inspect(q!(|v| println!("{}", v)))
776    /// #   .entries()
777    /// # }, |mut stream| async move {
778    /// // { 1: 2, 2: 4 }
779    /// # for w in vec![(1, 2), (2, 4)] {
780    /// #     assert_eq!(stream.next().await.unwrap(), w);
781    /// # }
782    /// # }));
783    /// ```
784    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
785    where
786        F: Fn(&V) + 'a,
787    {
788        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
789        let inspect_f = q!({
790            let orig = f;
791            move |t: &(_, _)| orig(&t.1)
792        })
793        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
794        .into();
795
796        KeyedSingleton::new(
797            self.location.clone(),
798            HydroNode::Inspect {
799                f: inspect_f,
800                input: Box::new(self.ir_node.into_inner()),
801                metadata: self.location.new_node_metadata(Self::collection_kind()),
802            },
803        )
804    }
805
806    /// An operator which allows you to "inspect" each entry of a keyed singleton without
807    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
808    /// mainly useful for debugging, and should not be used to generate side-effects.
809    ///
810    /// # Example
811    /// ```rust
812    /// # use hydro_lang::prelude::*;
813    /// # use futures::StreamExt;
814    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815    /// let keyed_singleton = // { 1: 2, 2: 4 }
816    /// # process
817    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
818    /// #     .into_keyed()
819    /// #     .first();
820    /// keyed_singleton
821    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
822    /// #   .entries()
823    /// # }, |mut stream| async move {
824    /// // { 1: 2, 2: 4 }
825    /// # for w in vec![(1, 2), (2, 4)] {
826    /// #     assert_eq!(stream.next().await.unwrap(), w);
827    /// # }
828    /// # }));
829    /// ```
830    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
831    where
832        F: Fn(&(K, V)) + 'a,
833    {
834        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
835
836        KeyedSingleton::new(
837            self.location.clone(),
838            HydroNode::Inspect {
839                f: inspect_f,
840                input: Box::new(self.ir_node.into_inner()),
841                metadata: self.location.new_node_metadata(Self::collection_kind()),
842            },
843        )
844    }
845
846    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
847    ///
848    /// Because this method requires values to be bounded, the output [`Optional`] will only be
849    /// asynchronously updated if a new key is added that is higher than the previous max key.
850    ///
851    /// # Example
852    /// ```rust
853    /// # use hydro_lang::prelude::*;
854    /// # use futures::StreamExt;
855    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
856    /// let tick = process.tick();
857    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
858    /// # process
859    /// #     .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
860    /// #     .into_keyed()
861    /// #     .first();
862    /// keyed_singleton.get_max_key()
863    /// # .sample_eager(nondet!(/** test */))
864    /// # }, |mut stream| async move {
865    /// // (2, 456)
866    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
867    /// # }));
868    /// ```
869    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
870    where
871        K: Ord,
872    {
873        self.entries()
874            .assume_ordering(nondet!(
875                /// There is only one element associated with each key, and the keys are totallly
876                /// ordered so we will produce a deterministic value. We can't call
877                /// `reduce_commutative_idempotent` because the closure technically isn't commutative
878                /// in the case where both passed entries have the same key but different values.
879                ///
880                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
881                /// the two inputs do not have the same key.
882            ))
883            .reduce_idempotent(q!({
884                move |curr, new| {
885                    if new.0 > curr.0 {
886                        *curr = new;
887                    }
888                }
889            }))
890    }
891
892    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
893    /// element, the value.
894    ///
895    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
896    ///
897    /// # Example
898    /// ```rust
899    /// # use hydro_lang::prelude::*;
900    /// # use futures::StreamExt;
901    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
902    /// let keyed_singleton = // { 1: 2, 2: 4 }
903    /// # process
904    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
905    /// #     .into_keyed()
906    /// #     .first();
907    /// keyed_singleton
908    ///     .clone()
909    ///     .into_keyed_stream()
910    ///     .interleave(
911    ///         keyed_singleton.into_keyed_stream()
912    ///     )
913    /// #   .entries()
914    /// # }, |mut stream| async move {
915    /// /// // { 1: [2, 2], 2: [4, 4] }
916    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
917    /// #     assert_eq!(stream.next().await.unwrap(), w);
918    /// # }
919    /// # }));
920    /// ```
921    pub fn into_keyed_stream(
922        self,
923    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
924        KeyedStream::new(
925            self.location.clone(),
926            HydroNode::Cast {
927                inner: Box::new(self.ir_node.into_inner()),
928                metadata: self.location.new_node_metadata(KeyedStream::<
929                    K,
930                    V,
931                    L,
932                    B::UnderlyingBound,
933                    TotalOrder,
934                    ExactlyOnce,
935                >::collection_kind()),
936            },
937        )
938    }
939}
940
941impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
942    /// Gets the value associated with a specific key from the keyed singleton.
943    ///
944    /// # Example
945    /// ```rust
946    /// # use hydro_lang::prelude::*;
947    /// # use futures::StreamExt;
948    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
949    /// let tick = process.tick();
950    /// let keyed_data = process
951    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
952    ///     .into_keyed()
953    ///     .batch(&tick, nondet!(/** test */))
954    ///     .first();
955    /// let key = tick.singleton(q!(1));
956    /// keyed_data.get(key).all_ticks()
957    /// # }, |mut stream| async move {
958    /// // 2
959    /// # assert_eq!(stream.next().await.unwrap(), 2);
960    /// # }));
961    /// ```
962    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
963        self.entries()
964            .join(key.into_stream().map(q!(|k| (k, ()))))
965            .map(q!(|(_, (v, _))| v))
966            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
967            .first()
968    }
969
970    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
971    /// is some additional metadata, emits a keyed stream of lookup results where the key
972    /// is the same as before, but the value is a tuple of the lookup result and the metadata
973    /// of the request. If the key is not found, no output will be produced.
974    ///
975    /// # Example
976    /// ```rust
977    /// # use hydro_lang::prelude::*;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// let tick = process.tick();
981    /// let keyed_data = process
982    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
983    ///     .into_keyed()
984    ///     .batch(&tick, nondet!(/** test */))
985    ///     .first();
986    /// let other_data = process
987    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
988    ///     .into_keyed()
989    ///     .batch(&tick, nondet!(/** test */));
990    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
991    /// # }, |mut stream| async move {
992    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
993    /// # let mut results = vec![];
994    /// # for _ in 0..3 {
995    /// #     results.push(stream.next().await.unwrap());
996    /// # }
997    /// # results.sort();
998    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
999    /// # }));
1000    /// ```
1001    pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
1002        self,
1003        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
1004    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
1005        self.entries()
1006            .weaker_retries::<R2>()
1007            .join(requests.entries())
1008            .into_keyed()
1009    }
1010
1011    /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
1012    /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
1013    /// containing the value from `self` and an option of the value from `from`. If the key is not
1014    /// present in `from`, the option will be [`None`].
1015    ///
1016    /// # Example
1017    /// ```rust
1018    /// # use hydro_lang::prelude::*;
1019    /// # use futures::StreamExt;
1020    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1021    /// # let tick = process.tick();
1022    /// let requests = // { 1: 10, 2: 20 }
1023    /// # process
1024    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
1025    /// #     .into_keyed()
1026    /// #     .batch(&tick, nondet!(/** test */))
1027    /// #     .first();
1028    /// let other_data = // { 10: 100, 11: 101 }
1029    /// # process
1030    /// #     .source_iter(q!(vec![(10, 100), (11, 101)]))
1031    /// #     .into_keyed()
1032    /// #     .batch(&tick, nondet!(/** test */))
1033    /// #     .first();
1034    /// requests.get_from(other_data)
1035    /// # .entries().all_ticks()
1036    /// # }, |mut stream| async move {
1037    /// // { 1: (10, Some(100)), 2: (20, None) }
1038    /// # let mut results = vec![];
1039    /// # for _ in 0..2 {
1040    /// #     results.push(stream.next().await.unwrap());
1041    /// # }
1042    /// # results.sort();
1043    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1044    /// # }));
1045    /// ```
1046    pub fn get_from<V2: Clone>(
1047        self,
1048        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
1049    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
1050    where
1051        K: Clone,
1052        V: Hash + Eq + Clone,
1053    {
1054        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
1055        let lookup_result = from.get_many_if_present(to_lookup.clone());
1056        let missing_values =
1057            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
1058        let result_stream = lookup_result
1059            .entries()
1060            .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
1061            .into_keyed()
1062            .chain(
1063                missing_values
1064                    .entries()
1065                    .map(q!(|(v, k)| (k, (v, None))))
1066                    .into_keyed(),
1067            );
1068
1069        KeyedSingleton::new(
1070            result_stream.location.clone(),
1071            HydroNode::Cast {
1072                inner: Box::new(result_stream.ir_node.into_inner()),
1073                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1074                    K,
1075                    (V, Option<V2>),
1076                    Tick<L>,
1077                    Bounded,
1078                >::collection_kind(
1079                )),
1080            },
1081        )
1082    }
1083}
1084
1085impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1086where
1087    L: Location<'a>,
1088{
1089    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1090    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1091    ///
1092    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1093    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1094    /// argument that declares where the keyed singleton will be atomically processed. Batching a
1095    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1096    /// batching into a different [`Tick`] will introduce asynchrony.
1097    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1098        let out_location = Atomic { tick: tick.clone() };
1099        KeyedSingleton::new(
1100            out_location.clone(),
1101            HydroNode::BeginAtomic {
1102                inner: Box::new(self.ir_node.into_inner()),
1103                metadata: out_location
1104                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1105            },
1106        )
1107    }
1108}
1109
1110impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1111where
1112    L: Location<'a> + NoTick,
1113{
1114    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1115    /// See [`KeyedSingleton::atomic`] for more details.
1116    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1117        KeyedSingleton::new(
1118            self.location.tick.l.clone(),
1119            HydroNode::EndAtomic {
1120                inner: Box::new(self.ir_node.into_inner()),
1121                metadata: self
1122                    .location
1123                    .tick
1124                    .l
1125                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1126            },
1127        )
1128    }
1129}
1130
1131impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1132    /// Asynchronously yields this keyed singleton outside the tick, which will
1133    /// be asynchronously updated with the latest set of entries inside the tick.
1134    ///
1135    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1136    /// tick that tracks the inner value. This is useful for getting the value as of the
1137    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1138    ///
1139    /// The entire set of entries are propagated on each tick, which means that if a tick
1140    /// does not have a key "XYZ" that was present in the previous tick, the entry for "XYZ" will
1141    /// also be removed from the output.
1142    ///
1143    /// # Example
1144    /// ```rust
1145    /// # use hydro_lang::prelude::*;
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148    /// let tick = process.tick();
1149    /// # // ticks are lazy by default, forces the second tick to run
1150    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1151    /// # let batch_first_tick = process
1152    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1153    /// #   .batch(&tick, nondet!(/** test */))
1154    /// #   .into_keyed();
1155    /// # let batch_second_tick = process
1156    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1157    /// #   .batch(&tick, nondet!(/** test */))
1158    /// #   .into_keyed()
1159    /// #   .defer_tick(); // appears on the second tick
1160    /// # let input_batch = batch_first_tick.chain(batch_second_tick).first();
1161    /// input_batch // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1162    ///     .latest()
1163    /// # .snapshot(&tick, nondet!(/** test */))
1164    /// # .entries()
1165    /// # .all_ticks()
1166    /// # }, |mut stream| async move {
1167    /// // asynchronously changes from { 1: 2, 2: 3 } ~> { 2: 4, 3: 5 }
1168    /// # for w in vec![(1, 2), (2, 3), (2, 4), (3, 5)] {
1169    /// #     assert_eq!(stream.next().await.unwrap(), w);
1170    /// # }
1171    /// # }));
1172    /// ```
1173    pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
1174        KeyedSingleton::new(
1175            self.location.outer().clone(),
1176            HydroNode::YieldConcat {
1177                inner: Box::new(self.ir_node.into_inner()),
1178                metadata: self.location.outer().new_node_metadata(KeyedSingleton::<
1179                    K,
1180                    V,
1181                    L,
1182                    Unbounded,
1183                >::collection_kind(
1184                )),
1185            },
1186        )
1187    }
1188
1189    /// Synchronously yields this keyed singleton outside the tick as an unbounded keyed singleton,
1190    /// which will be updated with the latest set of entries inside the tick.
1191    ///
1192    /// Unlike [`KeyedSingleton::latest`], this preserves synchronous execution, as the output
1193    /// keyed singleton is emitted in an [`Atomic`] context that will process elements synchronously
1194    /// with the input keyed singleton's [`Tick`] context.
1195    pub fn latest_atomic(self) -> KeyedSingleton<K, V, Atomic<L>, Unbounded> {
1196        let out_location = Atomic {
1197            tick: self.location.clone(),
1198        };
1199
1200        KeyedSingleton::new(
1201            out_location.clone(),
1202            HydroNode::YieldConcat {
1203                inner: Box::new(self.ir_node.into_inner()),
1204                metadata: out_location.new_node_metadata(KeyedSingleton::<
1205                    K,
1206                    V,
1207                    Atomic<L>,
1208                    Unbounded,
1209                >::collection_kind()),
1210            },
1211        )
1212    }
1213
1214    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1215    /// tick `T` always has the entries of `self` at tick `T - 1`.
1216    ///
1217    /// At tick `0`, the output has no entries, since there is no previous tick.
1218    ///
1219    /// This operator enables stateful iterative processing with ticks, by sending data from one
1220    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1221    ///
1222    /// # Example
1223    /// ```rust
1224    /// # use hydro_lang::prelude::*;
1225    /// # use futures::StreamExt;
1226    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1227    /// let tick = process.tick();
1228    /// # // ticks are lazy by default, forces the second tick to run
1229    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1230    /// # let batch_first_tick = process
1231    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1232    /// #   .batch(&tick, nondet!(/** test */))
1233    /// #   .into_keyed();
1234    /// # let batch_second_tick = process
1235    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1236    /// #   .batch(&tick, nondet!(/** test */))
1237    /// #   .into_keyed()
1238    /// #   .defer_tick(); // appears on the second tick
1239    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1240    /// # batch_first_tick.chain(batch_second_tick).first();
1241    /// input_batch.clone().filter_key_not_in(
1242    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1243    /// )
1244    /// # .entries().all_ticks()
1245    /// # }, |mut stream| async move {
1246    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1247    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1248    /// #     assert_eq!(stream.next().await.unwrap(), w);
1249    /// # }
1250    /// # }));
1251    /// ```
1252    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1253        KeyedSingleton::new(
1254            self.location.clone(),
1255            HydroNode::DeferTick {
1256                input: Box::new(self.ir_node.into_inner()),
1257                metadata: self
1258                    .location
1259                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1260            },
1261        )
1262    }
1263}
1264
1265impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1266where
1267    L: Location<'a>,
1268{
1269    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1270    /// point in time.
1271    ///
1272    /// # Non-Determinism
1273    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1274    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1275    pub fn snapshot(
1276        self,
1277        tick: &Tick<L>,
1278        _nondet: NonDet,
1279    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1280        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1281        KeyedSingleton::new(
1282            tick.clone(),
1283            HydroNode::Batch {
1284                inner: Box::new(self.ir_node.into_inner()),
1285                metadata: tick
1286                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1287            },
1288        )
1289    }
1290}
1291
1292impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1293where
1294    L: Location<'a> + NoTick,
1295{
1296    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1297    /// state of the keyed singleton being atomically processed.
1298    ///
1299    /// # Non-Determinism
1300    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1301    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1302    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1303        KeyedSingleton::new(
1304            self.location.clone().tick,
1305            HydroNode::Batch {
1306                inner: Box::new(self.ir_node.into_inner()),
1307                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1308                    K,
1309                    V,
1310                    Tick<L>,
1311                    Bounded,
1312                >::collection_kind(
1313                )),
1314            },
1315        )
1316    }
1317}
1318
1319impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1320where
1321    L: Location<'a> + NoTick,
1322{
1323    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1324    /// arrived since the previous batch was released.
1325    ///
1326    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1327    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1328    ///
1329    /// # Non-Determinism
1330    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1331    /// has a non-deterministic set of key-value pairs.
1332    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1333        self.atomic(tick).batch_atomic(nondet)
1334    }
1335}
1336
1337impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1338where
1339    L: Location<'a> + NoTick,
1340{
1341    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1342    /// atomically processed.
1343    ///
1344    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1345    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1346    ///
1347    /// # Non-Determinism
1348    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1349    /// has a non-deterministic set of key-value pairs.
1350    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1351        let _ = nondet;
1352        KeyedSingleton::new(
1353            self.location.clone().tick,
1354            HydroNode::Batch {
1355                inner: Box::new(self.ir_node.into_inner()),
1356                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1357                    K,
1358                    V,
1359                    Tick<L>,
1360                    Bounded,
1361                >::collection_kind(
1362                )),
1363            },
1364        )
1365    }
1366}
1367
1368#[cfg(test)]
1369mod tests {
1370    use std::collections::HashMap;
1371
1372    use futures::{SinkExt, StreamExt};
1373    use hydro_deploy::Deployment;
1374    use stageleft::q;
1375
1376    use crate::compile::builder::FlowBuilder;
1377    use crate::location::Location;
1378    use crate::nondet::nondet;
1379
1380    #[tokio::test]
1381    async fn key_count_bounded_value() {
1382        let mut deployment = Deployment::new();
1383
1384        let flow = FlowBuilder::new();
1385        let node = flow.process::<()>();
1386        let external = flow.external::<()>();
1387
1388        let (input_port, input) = node.source_external_bincode(&external);
1389        let out = input
1390            .into_keyed()
1391            .first()
1392            .key_count()
1393            .sample_eager(nondet!(/** test */))
1394            .send_bincode_external(&external);
1395
1396        let nodes = flow
1397            .with_process(&node, deployment.Localhost())
1398            .with_external(&external, deployment.Localhost())
1399            .deploy(&mut deployment);
1400
1401        deployment.deploy().await.unwrap();
1402
1403        let mut external_in = nodes.connect(input_port).await;
1404        let mut external_out = nodes.connect(out).await;
1405
1406        deployment.start().await.unwrap();
1407
1408        assert_eq!(external_out.next().await.unwrap(), 0);
1409
1410        external_in.send((1, 1)).await.unwrap();
1411        assert_eq!(external_out.next().await.unwrap(), 1);
1412
1413        external_in.send((2, 2)).await.unwrap();
1414        assert_eq!(external_out.next().await.unwrap(), 2);
1415    }
1416
1417    #[tokio::test]
1418    async fn key_count_unbounded_value() {
1419        let mut deployment = Deployment::new();
1420
1421        let flow = FlowBuilder::new();
1422        let node = flow.process::<()>();
1423        let external = flow.external::<()>();
1424
1425        let (input_port, input) = node.source_external_bincode(&external);
1426        let out = input
1427            .into_keyed()
1428            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1429            .key_count()
1430            .sample_eager(nondet!(/** test */))
1431            .send_bincode_external(&external);
1432
1433        let nodes = flow
1434            .with_process(&node, deployment.Localhost())
1435            .with_external(&external, deployment.Localhost())
1436            .deploy(&mut deployment);
1437
1438        deployment.deploy().await.unwrap();
1439
1440        let mut external_in = nodes.connect(input_port).await;
1441        let mut external_out = nodes.connect(out).await;
1442
1443        deployment.start().await.unwrap();
1444
1445        assert_eq!(external_out.next().await.unwrap(), 0);
1446
1447        external_in.send((1, 1)).await.unwrap();
1448        assert_eq!(external_out.next().await.unwrap(), 1);
1449
1450        external_in.send((1, 2)).await.unwrap();
1451        assert_eq!(external_out.next().await.unwrap(), 1);
1452
1453        external_in.send((2, 2)).await.unwrap();
1454        assert_eq!(external_out.next().await.unwrap(), 2);
1455
1456        external_in.send((1, 1)).await.unwrap();
1457        assert_eq!(external_out.next().await.unwrap(), 2);
1458
1459        external_in.send((3, 1)).await.unwrap();
1460        assert_eq!(external_out.next().await.unwrap(), 3);
1461    }
1462
1463    #[tokio::test]
1464    async fn into_singleton_bounded_value() {
1465        let mut deployment = Deployment::new();
1466
1467        let flow = FlowBuilder::new();
1468        let node = flow.process::<()>();
1469        let external = flow.external::<()>();
1470
1471        let (input_port, input) = node.source_external_bincode(&external);
1472        let out = input
1473            .into_keyed()
1474            .first()
1475            .into_singleton()
1476            .sample_eager(nondet!(/** test */))
1477            .send_bincode_external(&external);
1478
1479        let nodes = flow
1480            .with_process(&node, deployment.Localhost())
1481            .with_external(&external, deployment.Localhost())
1482            .deploy(&mut deployment);
1483
1484        deployment.deploy().await.unwrap();
1485
1486        let mut external_in = nodes.connect(input_port).await;
1487        let mut external_out = nodes.connect(out).await;
1488
1489        deployment.start().await.unwrap();
1490
1491        assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1492
1493        external_in.send((1, 1)).await.unwrap();
1494        assert_eq!(
1495            external_out.next().await.unwrap(),
1496            vec![(1, 1)].into_iter().collect()
1497        );
1498
1499        external_in.send((2, 2)).await.unwrap();
1500        assert_eq!(
1501            external_out.next().await.unwrap(),
1502            vec![(1, 1), (2, 2)].into_iter().collect()
1503        );
1504    }
1505
1506    #[tokio::test]
1507    async fn into_singleton_unbounded_value() {
1508        let mut deployment = Deployment::new();
1509
1510        let flow = FlowBuilder::new();
1511        let node = flow.process::<()>();
1512        let external = flow.external::<()>();
1513
1514        let (input_port, input) = node.source_external_bincode(&external);
1515        let out = input
1516            .into_keyed()
1517            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1518            .into_singleton()
1519            .sample_eager(nondet!(/** test */))
1520            .send_bincode_external(&external);
1521
1522        let nodes = flow
1523            .with_process(&node, deployment.Localhost())
1524            .with_external(&external, deployment.Localhost())
1525            .deploy(&mut deployment);
1526
1527        deployment.deploy().await.unwrap();
1528
1529        let mut external_in = nodes.connect(input_port).await;
1530        let mut external_out = nodes.connect(out).await;
1531
1532        deployment.start().await.unwrap();
1533
1534        assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1535
1536        external_in.send((1, 1)).await.unwrap();
1537        assert_eq!(
1538            external_out.next().await.unwrap(),
1539            vec![(1, 1)].into_iter().collect()
1540        );
1541
1542        external_in.send((1, 2)).await.unwrap();
1543        assert_eq!(
1544            external_out.next().await.unwrap(),
1545            vec![(1, 2)].into_iter().collect()
1546        );
1547
1548        external_in.send((2, 2)).await.unwrap();
1549        assert_eq!(
1550            external_out.next().await.unwrap(),
1551            vec![(1, 2), (2, 1)].into_iter().collect()
1552        );
1553
1554        external_in.send((1, 1)).await.unwrap();
1555        assert_eq!(
1556            external_out.next().await.unwrap(),
1557            vec![(1, 3), (2, 1)].into_iter().collect()
1558        );
1559
1560        external_in.send((3, 1)).await.unwrap();
1561        assert_eq!(
1562            external_out.next().await.unwrap(),
1563            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1564        );
1565    }
1566}