hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20use crate::forward_handle::ForwardRef;
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
31///
32/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
33/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
34/// indicates that entries may be added over time, but once an entry is added it will never be
35/// removed and its value will never change.
36pub trait KeyedSingletonBound {
37    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
38    type UnderlyingBound: Boundedness;
39    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
40    type ValueBound: Boundedness;
41
42    /// The type of the keyed singleton if the value for each key is immutable.
43    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
44
45    /// The type of the keyed singleton if the value for each key may change asynchronously.
46    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
47
48    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
49    fn bound_kind() -> KeyedSingletonBoundKind;
50}
51
52impl KeyedSingletonBound for Unbounded {
53    type UnderlyingBound = Unbounded;
54    type ValueBound = Unbounded;
55    type WithBoundedValue = BoundedValue;
56    type WithUnboundedValue = Unbounded;
57
58    fn bound_kind() -> KeyedSingletonBoundKind {
59        KeyedSingletonBoundKind::Unbounded
60    }
61}
62
63impl KeyedSingletonBound for Bounded {
64    type UnderlyingBound = Bounded;
65    type ValueBound = Bounded;
66    type WithBoundedValue = Bounded;
67    type WithUnboundedValue = UnreachableBound;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Bounded
71    }
72}
73
74/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
75/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
76/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
77pub struct BoundedValue;
78
79impl KeyedSingletonBound for BoundedValue {
80    type UnderlyingBound = Unbounded;
81    type ValueBound = Bounded;
82    type WithBoundedValue = BoundedValue;
83    type WithUnboundedValue = Unbounded;
84
85    fn bound_kind() -> KeyedSingletonBoundKind {
86        KeyedSingletonBoundKind::BoundedValue
87    }
88}
89
90#[doc(hidden)]
91pub struct UnreachableBound;
92
93impl KeyedSingletonBound for UnreachableBound {
94    type UnderlyingBound = Bounded;
95    type ValueBound = Unbounded;
96
97    type WithBoundedValue = Bounded;
98    type WithUnboundedValue = UnreachableBound;
99
100    fn bound_kind() -> KeyedSingletonBoundKind {
101        unreachable!("UnreachableBound cannot be instantiated")
102    }
103}
104
105/// Mapping from keys of type `K` to values of type `V`.
106///
107/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
108/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
109/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
110/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
111/// keys cannot be removed and the value for each key is immutable.
112///
113/// Type Parameters:
114/// - `K`: the type of the key for each entry
115/// - `V`: the type of the value for each entry
116/// - `Loc`: the [`Location`] where the keyed singleton is materialized
117/// - `Bound`: tracks whether the entries are:
118///     - [`Bounded`] (local and finite)
119///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
120///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
121pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
122    pub(crate) location: Loc,
123    pub(crate) ir_node: RefCell<HydroNode>,
124
125    _phantom: PhantomData<(K, V, Loc, Bound)>,
126}
127
128impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
129    for KeyedSingleton<K, V, Loc, Bound>
130{
131    fn clone(&self) -> Self {
132        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
133            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
134            *self.ir_node.borrow_mut() = HydroNode::Tee {
135                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
136                metadata: self.location.new_node_metadata(Self::collection_kind()),
137            };
138        }
139
140        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
141            KeyedSingleton {
142                location: self.location.clone(),
143                ir_node: HydroNode::Tee {
144                    inner: TeeNode(inner.0.clone()),
145                    metadata: metadata.clone(),
146                }
147                .into(),
148                _phantom: PhantomData,
149            }
150        } else {
151            unreachable!()
152        }
153    }
154}
155
156impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
157    for KeyedSingleton<K, V, L, B>
158where
159    L: Location<'a> + NoTick,
160{
161    type Location = L;
162
163    fn create_source(ident: syn::Ident, location: L) -> Self {
164        KeyedSingleton {
165            location: location.clone(),
166            ir_node: RefCell::new(HydroNode::CycleSource {
167                ident,
168                metadata: location.new_node_metadata(Self::collection_kind()),
169            }),
170            _phantom: PhantomData,
171        }
172    }
173}
174
175impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
176    for KeyedSingleton<K, V, L, B>
177where
178    L: Location<'a> + NoTick,
179{
180    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
181        assert_eq!(
182            Location::id(&self.location),
183            expected_location,
184            "locations do not match"
185        );
186        self.location
187            .flow_state()
188            .borrow_mut()
189            .push_root(HydroRoot::CycleSink {
190                ident,
191                input: Box::new(self.ir_node.into_inner()),
192                op_metadata: HydroIrOpMetadata::new(),
193            });
194    }
195}
196
197impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
198    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
199        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
200        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
201
202        KeyedSingleton {
203            location,
204            ir_node: RefCell::new(ir_node),
205            _phantom: PhantomData,
206        }
207    }
208
209    /// Returns the [`Location`] where this keyed singleton is being materialized.
210    pub fn location(&self) -> &L {
211        &self.location
212    }
213}
214
215#[cfg(stageleft_runtime)]
216fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
217    me: KeyedSingleton<K, V, L, Bounded>,
218) -> Singleton<usize, L, Bounded> {
219    me.entries().count()
220}
221
222#[cfg(stageleft_runtime)]
223fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
224    me: KeyedSingleton<K, V, L, Bounded>,
225) -> Singleton<HashMap<K, V>, L, Bounded>
226where
227    K: Eq + Hash,
228{
229    me.entries()
230        .assume_ordering(nondet!(
231            /// Because this is a keyed singleton, there is only one value per key.
232        ))
233        .fold(
234            q!(|| HashMap::new()),
235            q!(|map, (k, v)| {
236                map.insert(k, v);
237            }),
238        )
239}
240
241impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
242    pub(crate) fn collection_kind() -> CollectionKind {
243        CollectionKind::KeyedSingleton {
244            bound: B::bound_kind(),
245            key_type: stageleft::quote_type::<K>().into(),
246            value_type: stageleft::quote_type::<V>().into(),
247        }
248    }
249
250    /// Transforms each value by invoking `f` on each element, with keys staying the same
251    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
252    ///
253    /// If you do not want to modify the stream and instead only want to view
254    /// each item use [`KeyedSingleton::inspect`] instead.
255    ///
256    /// # Example
257    /// ```rust
258    /// # use hydro_lang::prelude::*;
259    /// # use futures::StreamExt;
260    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
261    /// let keyed_singleton = // { 1: 2, 2: 4 }
262    /// # process
263    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
264    /// #     .into_keyed()
265    /// #     .first();
266    /// keyed_singleton.map(q!(|v| v + 1))
267    /// #   .entries()
268    /// # }, |mut stream| async move {
269    /// // { 1: 3, 2: 5 }
270    /// # let mut results = Vec::new();
271    /// # for _ in 0..2 {
272    /// #     results.push(stream.next().await.unwrap());
273    /// # }
274    /// # results.sort();
275    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
276    /// # }));
277    /// ```
278    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
279    where
280        F: Fn(V) -> U + 'a,
281    {
282        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
283        let map_f = q!({
284            let orig = f;
285            move |(k, v)| (k, orig(v))
286        })
287        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
288        .into();
289
290        KeyedSingleton::new(
291            self.location.clone(),
292            HydroNode::Map {
293                f: map_f,
294                input: Box::new(self.ir_node.into_inner()),
295                metadata: self
296                    .location
297                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
298            },
299        )
300    }
301
302    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
303    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
304    ///
305    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
306    /// the new value `U`. The key remains unchanged in the output.
307    ///
308    /// # Example
309    /// ```rust
310    /// # use hydro_lang::prelude::*;
311    /// # use futures::StreamExt;
312    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
313    /// let keyed_singleton = // { 1: 2, 2: 4 }
314    /// # process
315    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
316    /// #     .into_keyed()
317    /// #     .first();
318    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
319    /// #   .entries()
320    /// # }, |mut stream| async move {
321    /// // { 1: 3, 2: 6 }
322    /// # let mut results = Vec::new();
323    /// # for _ in 0..2 {
324    /// #     results.push(stream.next().await.unwrap());
325    /// # }
326    /// # results.sort();
327    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
328    /// # }));
329    /// ```
330    pub fn map_with_key<U, F>(
331        self,
332        f: impl IntoQuotedMut<'a, F, L> + Copy,
333    ) -> KeyedSingleton<K, U, L, B>
334    where
335        F: Fn((K, V)) -> U + 'a,
336        K: Clone,
337    {
338        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
339        let map_f = q!({
340            let orig = f;
341            move |(k, v)| {
342                let out = orig((Clone::clone(&k), v));
343                (k, out)
344            }
345        })
346        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
347        .into();
348
349        KeyedSingleton::new(
350            self.location.clone(),
351            HydroNode::Map {
352                f: map_f,
353                input: Box::new(self.ir_node.into_inner()),
354                metadata: self
355                    .location
356                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
357            },
358        )
359    }
360
361    /// Gets the number of keys in the keyed singleton.
362    ///
363    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
364    /// since keys may be added / removed over time. When the set of keys changes, the count will
365    /// be asynchronously updated.
366    ///
367    /// # Example
368    /// ```rust
369    /// # use hydro_lang::prelude::*;
370    /// # use futures::StreamExt;
371    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
372    /// # let tick = process.tick();
373    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
374    /// # process
375    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
376    /// #     .into_keyed()
377    /// #     .batch(&tick, nondet!(/** test */))
378    /// #     .first();
379    /// keyed_singleton.key_count()
380    /// # .all_ticks()
381    /// # }, |mut stream| async move {
382    /// // 3
383    /// # assert_eq!(stream.next().await.unwrap(), 3);
384    /// # }));
385    /// ```
386    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
387        if B::ValueBound::BOUNDED {
388            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
389                location: self.location,
390                ir_node: self.ir_node,
391                _phantom: PhantomData,
392            };
393
394            me.entries().count()
395        } else if L::is_top_level()
396            && let Some(tick) = self.location.try_tick()
397        {
398            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
399                location: self.location,
400                ir_node: self.ir_node,
401                _phantom: PhantomData,
402            };
403
404            let out =
405                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
406                    .latest();
407            Singleton::new(out.location, out.ir_node.into_inner())
408        } else {
409            panic!("Unbounded KeyedSingleton inside a tick");
410        }
411    }
412
413    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
414    ///
415    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
416    /// asynchronously as well.
417    ///
418    /// # Example
419    /// ```rust
420    /// # use hydro_lang::prelude::*;
421    /// # use futures::StreamExt;
422    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
423    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
424    /// # process
425    /// #     .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
426    /// #     .into_keyed()
427    /// #     .batch(&process.tick(), nondet!(/** test */))
428    /// #     .first();
429    /// keyed_singleton.into_singleton()
430    /// # .all_ticks()
431    /// # }, |mut stream| async move {
432    /// // { 1: "a", 2: "b", 3: "c" }
433    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
434    /// # }));
435    /// ```
436    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
437    where
438        K: Eq + Hash,
439    {
440        if B::ValueBound::BOUNDED {
441            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
442                location: self.location,
443                ir_node: self.ir_node,
444                _phantom: PhantomData,
445            };
446
447            me.entries()
448                .assume_ordering(nondet!(
449                    /// Because this is a keyed singleton, there is only one value per key.
450                ))
451                .fold(
452                    q!(|| HashMap::new()),
453                    q!(|map, (k, v)| {
454                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
455                        map.insert(k, v);
456                    }),
457                )
458        } else if L::is_top_level()
459            && let Some(tick) = self.location.try_tick()
460        {
461            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
462                location: self.location,
463                ir_node: self.ir_node,
464                _phantom: PhantomData,
465            };
466
467            let out = into_singleton_inside_tick(
468                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
469            )
470            .latest();
471            Singleton::new(out.location, out.ir_node.into_inner())
472        } else {
473            panic!("Unbounded KeyedSingleton inside a tick");
474        }
475    }
476
477    /// An operator which allows you to "name" a `HydroNode`.
478    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
479    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
480        {
481            let mut node = self.ir_node.borrow_mut();
482            let metadata = node.metadata_mut();
483            metadata.tag = Some(name.to_string());
484        }
485        self
486    }
487}
488
489impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
490    KeyedSingleton<K, V, L, B>
491{
492    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
493    ///
494    /// The value for each key must be bounded, otherwise the resulting stream elements would be
495    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
496    /// into the output.
497    ///
498    /// # Example
499    /// ```rust
500    /// # use hydro_lang::prelude::*;
501    /// # use futures::StreamExt;
502    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
503    /// let keyed_singleton = // { 1: 2, 2: 4 }
504    /// # process
505    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
506    /// #     .into_keyed()
507    /// #     .first();
508    /// keyed_singleton.entries()
509    /// # }, |mut stream| async move {
510    /// // (1, 2), (2, 4) in any order
511    /// # let mut results = Vec::new();
512    /// # for _ in 0..2 {
513    /// #     results.push(stream.next().await.unwrap());
514    /// # }
515    /// # results.sort();
516    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
517    /// # }));
518    /// ```
519    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
520        self.into_keyed_stream().entries()
521    }
522
523    /// Flattens the keyed singleton into an unordered stream of just the values.
524    ///
525    /// The value for each key must be bounded, otherwise the resulting stream elements would be
526    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
527    /// into the output.
528    ///
529    /// # Example
530    /// ```rust
531    /// # use hydro_lang::prelude::*;
532    /// # use futures::StreamExt;
533    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
534    /// let keyed_singleton = // { 1: 2, 2: 4 }
535    /// # process
536    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
537    /// #     .into_keyed()
538    /// #     .first();
539    /// keyed_singleton.values()
540    /// # }, |mut stream| async move {
541    /// // 2, 4 in any order
542    /// # let mut results = Vec::new();
543    /// # for _ in 0..2 {
544    /// #     results.push(stream.next().await.unwrap());
545    /// # }
546    /// # results.sort();
547    /// # assert_eq!(results, vec![2, 4]);
548    /// # }));
549    /// ```
550    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
551        let map_f = q!(|(_, v)| v)
552            .splice_fn1_ctx::<(K, V), V>(&self.location)
553            .into();
554
555        Stream::new(
556            self.location.clone(),
557            HydroNode::Map {
558                f: map_f,
559                input: Box::new(self.ir_node.into_inner()),
560                metadata: self.location.new_node_metadata(Stream::<
561                    V,
562                    L,
563                    B::UnderlyingBound,
564                    NoOrder,
565                    ExactlyOnce,
566                >::collection_kind()),
567            },
568        )
569    }
570
571    /// Flattens the keyed singleton into an unordered stream of just the keys.
572    ///
573    /// The value for each key must be bounded, otherwise the removal of keys would result in
574    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
575    /// into the output.
576    ///
577    /// # Example
578    /// ```rust
579    /// # use hydro_lang::prelude::*;
580    /// # use futures::StreamExt;
581    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582    /// let keyed_singleton = // { 1: 2, 2: 4 }
583    /// # process
584    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
585    /// #     .into_keyed()
586    /// #     .first();
587    /// keyed_singleton.keys()
588    /// # }, |mut stream| async move {
589    /// // 1, 2 in any order
590    /// # let mut results = Vec::new();
591    /// # for _ in 0..2 {
592    /// #     results.push(stream.next().await.unwrap());
593    /// # }
594    /// # results.sort();
595    /// # assert_eq!(results, vec![1, 2]);
596    /// # }));
597    /// ```
598    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
599        self.entries().map(q!(|(k, _)| k))
600    }
601
602    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
603    /// entries whose keys are not in the provided stream.
604    ///
605    /// # Example
606    /// ```rust
607    /// # use hydro_lang::prelude::*;
608    /// # use futures::StreamExt;
609    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610    /// let tick = process.tick();
611    /// let keyed_singleton = // { 1: 2, 2: 4 }
612    /// # process
613    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
614    /// #     .into_keyed()
615    /// #     .first()
616    /// #     .batch(&tick, nondet!(/** test */));
617    /// let keys_to_remove = process
618    ///     .source_iter(q!(vec![1]))
619    ///     .batch(&tick, nondet!(/** test */));
620    /// keyed_singleton.filter_key_not_in(keys_to_remove)
621    /// #   .entries().all_ticks()
622    /// # }, |mut stream| async move {
623    /// // { 2: 4 }
624    /// # for w in vec![(2, 4)] {
625    /// #     assert_eq!(stream.next().await.unwrap(), w);
626    /// # }
627    /// # }));
628    /// ```
629    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
630        self,
631        other: Stream<K, L, Bounded, O2, R2>,
632    ) -> Self
633    where
634        K: Hash + Eq,
635    {
636        check_matching_location(&self.location, &other.location);
637
638        KeyedSingleton::new(
639            self.location.clone(),
640            HydroNode::AntiJoin {
641                pos: Box::new(self.ir_node.into_inner()),
642                neg: Box::new(other.ir_node.into_inner()),
643                metadata: self.location.new_node_metadata(Self::collection_kind()),
644            },
645        )
646    }
647
648    /// An operator which allows you to "inspect" each value of a keyed singleton without
649    /// modifying it. The closure `f` is called on a reference to each value. This is
650    /// mainly useful for debugging, and should not be used to generate side-effects.
651    ///
652    /// # Example
653    /// ```rust
654    /// # use hydro_lang::prelude::*;
655    /// # use futures::StreamExt;
656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
657    /// let keyed_singleton = // { 1: 2, 2: 4 }
658    /// # process
659    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
660    /// #     .into_keyed()
661    /// #     .first();
662    /// keyed_singleton
663    ///     .inspect(q!(|v| println!("{}", v)))
664    /// #   .entries()
665    /// # }, |mut stream| async move {
666    /// // { 1: 2, 2: 4 }
667    /// # for w in vec![(1, 2), (2, 4)] {
668    /// #     assert_eq!(stream.next().await.unwrap(), w);
669    /// # }
670    /// # }));
671    /// ```
672    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
673    where
674        F: Fn(&V) + 'a,
675    {
676        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
677        let inspect_f = q!({
678            let orig = f;
679            move |t: &(_, _)| orig(&t.1)
680        })
681        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
682        .into();
683
684        KeyedSingleton::new(
685            self.location.clone(),
686            HydroNode::Inspect {
687                f: inspect_f,
688                input: Box::new(self.ir_node.into_inner()),
689                metadata: self.location.new_node_metadata(Self::collection_kind()),
690            },
691        )
692    }
693
694    /// An operator which allows you to "inspect" each entry of a keyed singleton without
695    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
696    /// mainly useful for debugging, and should not be used to generate side-effects.
697    ///
698    /// # Example
699    /// ```rust
700    /// # use hydro_lang::prelude::*;
701    /// # use futures::StreamExt;
702    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
703    /// let keyed_singleton = // { 1: 2, 2: 4 }
704    /// # process
705    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
706    /// #     .into_keyed()
707    /// #     .first();
708    /// keyed_singleton
709    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
710    /// #   .entries()
711    /// # }, |mut stream| async move {
712    /// // { 1: 2, 2: 4 }
713    /// # for w in vec![(1, 2), (2, 4)] {
714    /// #     assert_eq!(stream.next().await.unwrap(), w);
715    /// # }
716    /// # }));
717    /// ```
718    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
719    where
720        F: Fn(&(K, V)) + 'a,
721    {
722        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
723
724        KeyedSingleton::new(
725            self.location.clone(),
726            HydroNode::Inspect {
727                f: inspect_f,
728                input: Box::new(self.ir_node.into_inner()),
729                metadata: self.location.new_node_metadata(Self::collection_kind()),
730            },
731        )
732    }
733
734    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
735    ///
736    /// Because this method requires values to be bounded, the output [`Optional`] will only be
737    /// asynchronously updated if a new key is added that is higher than the previous max key.
738    ///
739    /// # Example
740    /// ```rust
741    /// # use hydro_lang::prelude::*;
742    /// # use futures::StreamExt;
743    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
744    /// let tick = process.tick();
745    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
746    /// # process
747    /// #     .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
748    /// #     .into_keyed()
749    /// #     .first();
750    /// keyed_singleton.get_max_key()
751    /// # .sample_eager(nondet!(/** test */))
752    /// # }, |mut stream| async move {
753    /// // (2, 456)
754    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
755    /// # }));
756    /// ```
757    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
758    where
759        K: Ord,
760    {
761        self.entries()
762            .assume_ordering(nondet!(
763                /// There is only one element associated with each key, and the keys are totallly
764                /// ordered so we will produce a deterministic value. We can't call
765                /// `reduce_commutative_idempotent` because the closure technically isn't commutative
766                /// in the case where both passed entries have the same key but different values.
767                ///
768                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
769                /// the two inputs do not have the same key.
770            ))
771            .reduce_idempotent(q!({
772                move |curr, new| {
773                    if new.0 > curr.0 {
774                        *curr = new;
775                    }
776                }
777            }))
778    }
779
780    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
781    /// element, the value.
782    ///
783    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
784    ///
785    /// # Example
786    /// ```rust
787    /// # use hydro_lang::prelude::*;
788    /// # use futures::StreamExt;
789    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
790    /// let keyed_singleton = // { 1: 2, 2: 4 }
791    /// # process
792    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
793    /// #     .into_keyed()
794    /// #     .first();
795    /// keyed_singleton
796    ///     .clone()
797    ///     .into_keyed_stream()
798    ///     .interleave(
799    ///         keyed_singleton.into_keyed_stream()
800    ///     )
801    /// #   .entries()
802    /// # }, |mut stream| async move {
803    /// /// // { 1: [2, 2], 2: [4, 4] }
804    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
805    /// #     assert_eq!(stream.next().await.unwrap(), w);
806    /// # }
807    /// # }));
808    /// ```
809    pub fn into_keyed_stream(
810        self,
811    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
812        KeyedStream::new(
813            self.location.clone(),
814            HydroNode::Cast {
815                inner: Box::new(self.ir_node.into_inner()),
816                metadata: self.location.new_node_metadata(KeyedStream::<
817                    K,
818                    V,
819                    L,
820                    B::UnderlyingBound,
821                    TotalOrder,
822                    ExactlyOnce,
823                >::collection_kind()),
824            },
825        )
826    }
827}
828
829impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
830    /// Gets the value associated with a specific key from the keyed singleton.
831    ///
832    /// # Example
833    /// ```rust
834    /// # use hydro_lang::prelude::*;
835    /// # use futures::StreamExt;
836    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
837    /// let tick = process.tick();
838    /// let keyed_data = process
839    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
840    ///     .into_keyed()
841    ///     .batch(&tick, nondet!(/** test */))
842    ///     .first();
843    /// let key = tick.singleton(q!(1));
844    /// keyed_data.get(key).all_ticks()
845    /// # }, |mut stream| async move {
846    /// // 2
847    /// # assert_eq!(stream.next().await.unwrap(), 2);
848    /// # }));
849    /// ```
850    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
851        self.entries()
852            .join(key.into_stream().map(q!(|k| (k, ()))))
853            .map(q!(|(_, (v, _))| v))
854            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
855            .first()
856    }
857
858    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
859    /// is some additional metadata, emits a keyed stream of lookup results where the key
860    /// is the same as before, but the value is a tuple of the lookup result and the metadata
861    /// of the request. If the key is not found, no output will be produced.
862    ///
863    /// # Example
864    /// ```rust
865    /// # use hydro_lang::prelude::*;
866    /// # use futures::StreamExt;
867    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
868    /// let tick = process.tick();
869    /// let keyed_data = process
870    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
871    ///     .into_keyed()
872    ///     .batch(&tick, nondet!(/** test */))
873    ///     .first();
874    /// let other_data = process
875    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
876    ///     .into_keyed()
877    ///     .batch(&tick, nondet!(/** test */));
878    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
879    /// # }, |mut stream| async move {
880    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
881    /// # let mut results = vec![];
882    /// # for _ in 0..3 {
883    /// #     results.push(stream.next().await.unwrap());
884    /// # }
885    /// # results.sort();
886    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
887    /// # }));
888    /// ```
889    pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
890        self,
891        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
892    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
893        self.entries()
894            .weaker_retries::<R2>()
895            .join(requests.entries())
896            .into_keyed()
897    }
898
899    /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
900    /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
901    /// containing the value from `self` and an option of the value from `from`. If the key is not
902    /// present in `from`, the option will be [`None`].
903    ///
904    /// # Example
905    /// ```rust
906    /// # use hydro_lang::prelude::*;
907    /// # use futures::StreamExt;
908    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
909    /// # let tick = process.tick();
910    /// let requests = // { 1: 10, 2: 20 }
911    /// # process
912    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
913    /// #     .into_keyed()
914    /// #     .batch(&tick, nondet!(/** test */))
915    /// #     .first();
916    /// let other_data = // { 10: 100, 11: 101 }
917    /// # process
918    /// #     .source_iter(q!(vec![(10, 100), (11, 101)]))
919    /// #     .into_keyed()
920    /// #     .batch(&tick, nondet!(/** test */))
921    /// #     .first();
922    /// requests.get_from(other_data)
923    /// # .entries().all_ticks()
924    /// # }, |mut stream| async move {
925    /// // { 1: (10, Some(100)), 2: (20, None) }
926    /// # let mut results = vec![];
927    /// # for _ in 0..2 {
928    /// #     results.push(stream.next().await.unwrap());
929    /// # }
930    /// # results.sort();
931    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
932    /// # }));
933    /// ```
934    pub fn get_from<V2: Clone>(
935        self,
936        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
937    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
938    where
939        K: Clone,
940        V: Hash + Eq + Clone,
941    {
942        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
943        let lookup_result = from.get_many_if_present(to_lookup.clone());
944        let missing_values =
945            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
946        let result_stream = lookup_result
947            .entries()
948            .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
949            .into_keyed()
950            .chain(
951                missing_values
952                    .entries()
953                    .map(q!(|(v, k)| (k, (v, None))))
954                    .into_keyed(),
955            );
956
957        KeyedSingleton::new(
958            result_stream.location.clone(),
959            HydroNode::Cast {
960                inner: Box::new(result_stream.ir_node.into_inner()),
961                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
962                    K,
963                    (V, Option<V2>),
964                    Tick<L>,
965                    Bounded,
966                >::collection_kind(
967                )),
968            },
969        )
970    }
971}
972
973impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
974where
975    L: Location<'a>,
976{
977    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
978    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
979    ///
980    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
981    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
982    /// argument that declares where the keyed singleton will be atomically processed. Batching a
983    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
984    /// batching into a different [`Tick`] will introduce asynchrony.
985    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
986        let out_location = Atomic { tick: tick.clone() };
987        KeyedSingleton::new(
988            out_location.clone(),
989            HydroNode::BeginAtomic {
990                inner: Box::new(self.ir_node.into_inner()),
991                metadata: out_location
992                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
993            },
994        )
995    }
996}
997
998impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
999where
1000    L: Location<'a> + NoTick,
1001{
1002    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1003    /// See [`KeyedSingleton::atomic`] for more details.
1004    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1005        KeyedSingleton::new(
1006            self.location.tick.l.clone(),
1007            HydroNode::EndAtomic {
1008                inner: Box::new(self.ir_node.into_inner()),
1009                metadata: self
1010                    .location
1011                    .tick
1012                    .l
1013                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1014            },
1015        )
1016    }
1017}
1018
1019impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1020    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1021    /// tick `T` always has the entries of `self` at tick `T - 1`.
1022    ///
1023    /// At tick `0`, the output has no entries, since there is no previous tick.
1024    ///
1025    /// This operator enables stateful iterative processing with ticks, by sending data from one
1026    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1027    ///
1028    /// # Example
1029    /// ```rust
1030    /// # use hydro_lang::prelude::*;
1031    /// # use futures::StreamExt;
1032    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1033    /// let tick = process.tick();
1034    /// # // ticks are lazy by default, forces the second tick to run
1035    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1036    /// # let batch_first_tick = process
1037    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1038    /// #   .batch(&tick, nondet!(/** test */))
1039    /// #   .into_keyed();
1040    /// # let batch_second_tick = process
1041    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1042    /// #   .batch(&tick, nondet!(/** test */))
1043    /// #   .into_keyed()
1044    /// #   .defer_tick(); // appears on the second tick
1045    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1046    /// # batch_first_tick.chain(batch_second_tick).first();
1047    /// input_batch.clone().filter_key_not_in(
1048    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1049    /// )
1050    /// # .entries().all_ticks()
1051    /// # }, |mut stream| async move {
1052    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1053    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1054    /// #     assert_eq!(stream.next().await.unwrap(), w);
1055    /// # }
1056    /// # }));
1057    /// ```
1058    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1059        KeyedSingleton::new(
1060            self.location.clone(),
1061            HydroNode::DeferTick {
1062                input: Box::new(self.ir_node.into_inner()),
1063                metadata: self
1064                    .location
1065                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1066            },
1067        )
1068    }
1069}
1070
1071impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1072where
1073    L: Location<'a>,
1074{
1075    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1076    /// point in time.
1077    ///
1078    /// # Non-Determinism
1079    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1080    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1081    pub fn snapshot(
1082        self,
1083        tick: &Tick<L>,
1084        _nondet: NonDet,
1085    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1086        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1087        KeyedSingleton::new(
1088            tick.clone(),
1089            HydroNode::Batch {
1090                inner: Box::new(self.ir_node.into_inner()),
1091                metadata: tick
1092                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1093            },
1094        )
1095    }
1096}
1097
1098impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1099where
1100    L: Location<'a> + NoTick,
1101{
1102    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1103    /// state of the keyed singleton being atomically processed.
1104    ///
1105    /// # Non-Determinism
1106    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1107    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1108    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1109        KeyedSingleton::new(
1110            self.location.clone().tick,
1111            HydroNode::Batch {
1112                inner: Box::new(self.ir_node.into_inner()),
1113                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1114                    K,
1115                    V,
1116                    Tick<L>,
1117                    Bounded,
1118                >::collection_kind(
1119                )),
1120            },
1121        )
1122    }
1123}
1124
1125impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1126where
1127    L: Location<'a>,
1128{
1129    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1130    ///
1131    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1132    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1133    /// is filtered out.
1134    ///
1135    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1136    /// not modify or take ownership of the values. If you need to modify the values while filtering
1137    /// use [`KeyedSingleton::filter_map`] instead.
1138    ///
1139    /// # Example
1140    /// ```rust
1141    /// # use hydro_lang::prelude::*;
1142    /// # use futures::StreamExt;
1143    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1145    /// # process
1146    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1147    /// #     .into_keyed()
1148    /// #     .first();
1149    /// keyed_singleton.filter(q!(|&v| v > 1))
1150    /// #   .entries()
1151    /// # }, |mut stream| async move {
1152    /// // { 1: 2, 2: 4 }
1153    /// # let mut results = Vec::new();
1154    /// # for _ in 0..2 {
1155    /// #     results.push(stream.next().await.unwrap());
1156    /// # }
1157    /// # results.sort();
1158    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1159    /// # }));
1160    /// ```
1161    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1162    where
1163        F: Fn(&V) -> bool + 'a,
1164    {
1165        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1166        let filter_f = q!({
1167            let orig = f;
1168            move |t: &(_, _)| orig(&t.1)
1169        })
1170        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1171        .into();
1172
1173        KeyedSingleton::new(
1174            self.location.clone(),
1175            HydroNode::Filter {
1176                f: filter_f,
1177                input: Box::new(self.ir_node.into_inner()),
1178                metadata: self
1179                    .location
1180                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1181            },
1182        )
1183    }
1184
1185    /// An operator that both filters and maps values. It yields only the key-value pairs where
1186    /// the supplied closure `f` returns `Some(value)`.
1187    ///
1188    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1189    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1190    /// If it returns `None`, the key-value pair is filtered out.
1191    ///
1192    /// # Example
1193    /// ```rust
1194    /// # use hydro_lang::prelude::*;
1195    /// # use futures::StreamExt;
1196    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1197    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1198    /// # process
1199    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1200    /// #     .into_keyed()
1201    /// #     .first();
1202    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1203    /// #   .entries()
1204    /// # }, |mut stream| async move {
1205    /// // { 1: 42, 3: 100 }
1206    /// # let mut results = Vec::new();
1207    /// # for _ in 0..2 {
1208    /// #     results.push(stream.next().await.unwrap());
1209    /// # }
1210    /// # results.sort();
1211    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1212    /// # }));
1213    /// ```
1214    pub fn filter_map<F, U>(
1215        self,
1216        f: impl IntoQuotedMut<'a, F, L> + Copy,
1217    ) -> KeyedSingleton<K, U, L, B>
1218    where
1219        F: Fn(V) -> Option<U> + 'a,
1220    {
1221        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1222        let filter_map_f = q!({
1223            let orig = f;
1224            move |(k, v)| orig(v).map(|o| (k, o))
1225        })
1226        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1227        .into();
1228
1229        KeyedSingleton::new(
1230            self.location.clone(),
1231            HydroNode::FilterMap {
1232                f: filter_map_f,
1233                input: Box::new(self.ir_node.into_inner()),
1234                metadata: self
1235                    .location
1236                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1237            },
1238        )
1239    }
1240
1241    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1242    /// arrived since the previous batch was released.
1243    ///
1244    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1245    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1246    ///
1247    /// # Non-Determinism
1248    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1249    /// has a non-deterministic set of key-value pairs.
1250    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1251    where
1252        L: NoTick,
1253    {
1254        self.atomic(tick).batch_atomic(nondet)
1255    }
1256}
1257
1258impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1259where
1260    L: Location<'a> + NoTick,
1261{
1262    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1263    /// atomically processed.
1264    ///
1265    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1266    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1267    ///
1268    /// # Non-Determinism
1269    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1270    /// has a non-deterministic set of key-value pairs.
1271    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1272        let _ = nondet;
1273        KeyedSingleton::new(
1274            self.location.clone().tick,
1275            HydroNode::Batch {
1276                inner: Box::new(self.ir_node.into_inner()),
1277                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1278                    K,
1279                    V,
1280                    Tick<L>,
1281                    Bounded,
1282                >::collection_kind(
1283                )),
1284            },
1285        )
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    #[cfg(feature = "deploy")]
1292    use std::collections::HashMap;
1293
1294    #[cfg(feature = "deploy")]
1295    use futures::{SinkExt, StreamExt};
1296    #[cfg(feature = "deploy")]
1297    use hydro_deploy::Deployment;
1298    use stageleft::q;
1299
1300    use crate::compile::builder::FlowBuilder;
1301    use crate::location::Location;
1302    use crate::nondet::nondet;
1303
1304    #[cfg(feature = "deploy")]
1305    #[tokio::test]
1306    async fn key_count_bounded_value() {
1307        let mut deployment = Deployment::new();
1308
1309        let flow = FlowBuilder::new();
1310        let node = flow.process::<()>();
1311        let external = flow.external::<()>();
1312
1313        let (input_port, input) = node.source_external_bincode(&external);
1314        let out = input
1315            .into_keyed()
1316            .first()
1317            .key_count()
1318            .sample_eager(nondet!(/** test */))
1319            .send_bincode_external(&external);
1320
1321        let nodes = flow
1322            .with_process(&node, deployment.Localhost())
1323            .with_external(&external, deployment.Localhost())
1324            .deploy(&mut deployment);
1325
1326        deployment.deploy().await.unwrap();
1327
1328        let mut external_in = nodes.connect(input_port).await;
1329        let mut external_out = nodes.connect(out).await;
1330
1331        deployment.start().await.unwrap();
1332
1333        assert_eq!(external_out.next().await.unwrap(), 0);
1334
1335        external_in.send((1, 1)).await.unwrap();
1336        assert_eq!(external_out.next().await.unwrap(), 1);
1337
1338        external_in.send((2, 2)).await.unwrap();
1339        assert_eq!(external_out.next().await.unwrap(), 2);
1340    }
1341
1342    #[cfg(feature = "deploy")]
1343    #[tokio::test]
1344    async fn key_count_unbounded_value() {
1345        let mut deployment = Deployment::new();
1346
1347        let flow = FlowBuilder::new();
1348        let node = flow.process::<()>();
1349        let external = flow.external::<()>();
1350
1351        let (input_port, input) = node.source_external_bincode(&external);
1352        let out = input
1353            .into_keyed()
1354            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1355            .key_count()
1356            .sample_eager(nondet!(/** test */))
1357            .send_bincode_external(&external);
1358
1359        let nodes = flow
1360            .with_process(&node, deployment.Localhost())
1361            .with_external(&external, deployment.Localhost())
1362            .deploy(&mut deployment);
1363
1364        deployment.deploy().await.unwrap();
1365
1366        let mut external_in = nodes.connect(input_port).await;
1367        let mut external_out = nodes.connect(out).await;
1368
1369        deployment.start().await.unwrap();
1370
1371        assert_eq!(external_out.next().await.unwrap(), 0);
1372
1373        external_in.send((1, 1)).await.unwrap();
1374        assert_eq!(external_out.next().await.unwrap(), 1);
1375
1376        external_in.send((1, 2)).await.unwrap();
1377        assert_eq!(external_out.next().await.unwrap(), 1);
1378
1379        external_in.send((2, 2)).await.unwrap();
1380        assert_eq!(external_out.next().await.unwrap(), 2);
1381
1382        external_in.send((1, 1)).await.unwrap();
1383        assert_eq!(external_out.next().await.unwrap(), 2);
1384
1385        external_in.send((3, 1)).await.unwrap();
1386        assert_eq!(external_out.next().await.unwrap(), 3);
1387    }
1388
1389    #[cfg(feature = "deploy")]
1390    #[tokio::test]
1391    async fn into_singleton_bounded_value() {
1392        let mut deployment = Deployment::new();
1393
1394        let flow = FlowBuilder::new();
1395        let node = flow.process::<()>();
1396        let external = flow.external::<()>();
1397
1398        let (input_port, input) = node.source_external_bincode(&external);
1399        let out = input
1400            .into_keyed()
1401            .first()
1402            .into_singleton()
1403            .sample_eager(nondet!(/** test */))
1404            .send_bincode_external(&external);
1405
1406        let nodes = flow
1407            .with_process(&node, deployment.Localhost())
1408            .with_external(&external, deployment.Localhost())
1409            .deploy(&mut deployment);
1410
1411        deployment.deploy().await.unwrap();
1412
1413        let mut external_in = nodes.connect(input_port).await;
1414        let mut external_out = nodes.connect(out).await;
1415
1416        deployment.start().await.unwrap();
1417
1418        assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1419
1420        external_in.send((1, 1)).await.unwrap();
1421        assert_eq!(
1422            external_out.next().await.unwrap(),
1423            vec![(1, 1)].into_iter().collect()
1424        );
1425
1426        external_in.send((2, 2)).await.unwrap();
1427        assert_eq!(
1428            external_out.next().await.unwrap(),
1429            vec![(1, 1), (2, 2)].into_iter().collect()
1430        );
1431    }
1432
1433    #[cfg(feature = "deploy")]
1434    #[tokio::test]
1435    async fn into_singleton_unbounded_value() {
1436        let mut deployment = Deployment::new();
1437
1438        let flow = FlowBuilder::new();
1439        let node = flow.process::<()>();
1440        let external = flow.external::<()>();
1441
1442        let (input_port, input) = node.source_external_bincode(&external);
1443        let out = input
1444            .into_keyed()
1445            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1446            .into_singleton()
1447            .sample_eager(nondet!(/** test */))
1448            .send_bincode_external(&external);
1449
1450        let nodes = flow
1451            .with_process(&node, deployment.Localhost())
1452            .with_external(&external, deployment.Localhost())
1453            .deploy(&mut deployment);
1454
1455        deployment.deploy().await.unwrap();
1456
1457        let mut external_in = nodes.connect(input_port).await;
1458        let mut external_out = nodes.connect(out).await;
1459
1460        deployment.start().await.unwrap();
1461
1462        assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1463
1464        external_in.send((1, 1)).await.unwrap();
1465        assert_eq!(
1466            external_out.next().await.unwrap(),
1467            vec![(1, 1)].into_iter().collect()
1468        );
1469
1470        external_in.send((1, 2)).await.unwrap();
1471        assert_eq!(
1472            external_out.next().await.unwrap(),
1473            vec![(1, 2)].into_iter().collect()
1474        );
1475
1476        external_in.send((2, 2)).await.unwrap();
1477        assert_eq!(
1478            external_out.next().await.unwrap(),
1479            vec![(1, 2), (2, 1)].into_iter().collect()
1480        );
1481
1482        external_in.send((1, 1)).await.unwrap();
1483        assert_eq!(
1484            external_out.next().await.unwrap(),
1485            vec![(1, 3), (2, 1)].into_iter().collect()
1486        );
1487
1488        external_in.send((3, 1)).await.unwrap();
1489        assert_eq!(
1490            external_out.next().await.unwrap(),
1491            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1492        );
1493    }
1494
1495    #[test]
1496    fn sim_unbounded_singleton_snapshot() {
1497        let flow = FlowBuilder::new();
1498        let node = flow.process::<()>();
1499        let external = flow.external::<()>();
1500
1501        let (input_port, input) = node.source_external_bincode(&external);
1502        let out = input
1503            .into_keyed()
1504            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1505            .snapshot(&node.tick(), nondet!(/** test */))
1506            .entries()
1507            .all_ticks()
1508            .send_bincode_external(&external);
1509
1510        let count = flow.sim().exhaustive(async |mut instance| {
1511            let input = instance.connect(&input_port);
1512            let output = instance.connect(&out);
1513
1514            instance.launch();
1515
1516            input.send((1, 123));
1517            input.send((1, 456));
1518            input.send((2, 123));
1519
1520            let all = output.collect_sorted::<Vec<_>>().await;
1521            assert_eq!(all.last().unwrap(), &(2, 1));
1522        });
1523
1524        assert_eq!(count, 8);
1525    }
1526}