hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20use crate::forward_handle::ForwardRef;
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
31///
32/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
33/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
34/// indicates that entries may be added over time, but once an entry is added it will never be
35/// removed and its value will never change.
36pub trait KeyedSingletonBound {
37    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
38    type UnderlyingBound: Boundedness;
39    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
40    type ValueBound: Boundedness;
41
42    /// The type of the keyed singleton if the value for each key is immutable.
43    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
44
45    /// The type of the keyed singleton if the value for each key may change asynchronously.
46    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
47
48    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
49    fn bound_kind() -> KeyedSingletonBoundKind;
50}
51
52impl KeyedSingletonBound for Unbounded {
53    type UnderlyingBound = Unbounded;
54    type ValueBound = Unbounded;
55    type WithBoundedValue = BoundedValue;
56    type WithUnboundedValue = Unbounded;
57
58    fn bound_kind() -> KeyedSingletonBoundKind {
59        KeyedSingletonBoundKind::Unbounded
60    }
61}
62
63impl KeyedSingletonBound for Bounded {
64    type UnderlyingBound = Bounded;
65    type ValueBound = Bounded;
66    type WithBoundedValue = Bounded;
67    type WithUnboundedValue = UnreachableBound;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Bounded
71    }
72}
73
74/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
75/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
76/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
77pub struct BoundedValue;
78
79impl KeyedSingletonBound for BoundedValue {
80    type UnderlyingBound = Unbounded;
81    type ValueBound = Bounded;
82    type WithBoundedValue = BoundedValue;
83    type WithUnboundedValue = Unbounded;
84
85    fn bound_kind() -> KeyedSingletonBoundKind {
86        KeyedSingletonBoundKind::BoundedValue
87    }
88}
89
90#[doc(hidden)]
91pub struct UnreachableBound;
92
93impl KeyedSingletonBound for UnreachableBound {
94    type UnderlyingBound = Bounded;
95    type ValueBound = Unbounded;
96
97    type WithBoundedValue = Bounded;
98    type WithUnboundedValue = UnreachableBound;
99
100    fn bound_kind() -> KeyedSingletonBoundKind {
101        unreachable!("UnreachableBound cannot be instantiated")
102    }
103}
104
105/// Mapping from keys of type `K` to values of type `V`.
106///
107/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
108/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
109/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
110/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
111/// keys cannot be removed and the value for each key is immutable.
112///
113/// Type Parameters:
114/// - `K`: the type of the key for each entry
115/// - `V`: the type of the value for each entry
116/// - `Loc`: the [`Location`] where the keyed singleton is materialized
117/// - `Bound`: tracks whether the entries are:
118///     - [`Bounded`] (local and finite)
119///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
120///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
121pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
122    pub(crate) location: Loc,
123    pub(crate) ir_node: RefCell<HydroNode>,
124
125    _phantom: PhantomData<(K, V, Loc, Bound)>,
126}
127
128impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
129    for KeyedSingleton<K, V, Loc, Bound>
130{
131    fn clone(&self) -> Self {
132        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
133            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
134            *self.ir_node.borrow_mut() = HydroNode::Tee {
135                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
136                metadata: self.location.new_node_metadata(Self::collection_kind()),
137            };
138        }
139
140        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
141            KeyedSingleton {
142                location: self.location.clone(),
143                ir_node: HydroNode::Tee {
144                    inner: TeeNode(inner.0.clone()),
145                    metadata: metadata.clone(),
146                }
147                .into(),
148                _phantom: PhantomData,
149            }
150        } else {
151            unreachable!()
152        }
153    }
154}
155
156impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
157    for KeyedSingleton<K, V, L, B>
158where
159    L: Location<'a> + NoTick,
160{
161    type Location = L;
162
163    fn create_source(ident: syn::Ident, location: L) -> Self {
164        KeyedSingleton {
165            location: location.clone(),
166            ir_node: RefCell::new(HydroNode::CycleSource {
167                ident,
168                metadata: location.new_node_metadata(Self::collection_kind()),
169            }),
170            _phantom: PhantomData,
171        }
172    }
173}
174
175impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
176    for KeyedSingleton<K, V, L, B>
177where
178    L: Location<'a> + NoTick,
179{
180    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
181        assert_eq!(
182            Location::id(&self.location),
183            expected_location,
184            "locations do not match"
185        );
186        self.location
187            .flow_state()
188            .borrow_mut()
189            .push_root(HydroRoot::CycleSink {
190                ident,
191                input: Box::new(self.ir_node.into_inner()),
192                op_metadata: HydroIrOpMetadata::new(),
193            });
194    }
195}
196
197impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
198    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
199        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
200        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
201
202        KeyedSingleton {
203            location,
204            ir_node: RefCell::new(ir_node),
205            _phantom: PhantomData,
206        }
207    }
208
209    /// Returns the [`Location`] where this keyed singleton is being materialized.
210    pub fn location(&self) -> &L {
211        &self.location
212    }
213}
214
215#[cfg(stageleft_runtime)]
216fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
217    me: KeyedSingleton<K, V, L, Bounded>,
218) -> Singleton<usize, L, Bounded> {
219    me.entries().count()
220}
221
222#[cfg(stageleft_runtime)]
223fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
224    me: KeyedSingleton<K, V, L, Bounded>,
225) -> Singleton<HashMap<K, V>, L, Bounded>
226where
227    K: Eq + Hash,
228{
229    me.entries()
230        .assume_ordering(nondet!(
231            /// Because this is a keyed singleton, there is only one value per key.
232        ))
233        .fold(
234            q!(|| HashMap::new()),
235            q!(|map, (k, v)| {
236                map.insert(k, v);
237            }),
238        )
239}
240
241impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
242    pub(crate) fn collection_kind() -> CollectionKind {
243        CollectionKind::KeyedSingleton {
244            bound: B::bound_kind(),
245            key_type: stageleft::quote_type::<K>().into(),
246            value_type: stageleft::quote_type::<V>().into(),
247        }
248    }
249
250    /// Transforms each value by invoking `f` on each element, with keys staying the same
251    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
252    ///
253    /// If you do not want to modify the stream and instead only want to view
254    /// each item use [`KeyedSingleton::inspect`] instead.
255    ///
256    /// # Example
257    /// ```rust
258    /// # #[cfg(feature = "deploy")] {
259    /// # use hydro_lang::prelude::*;
260    /// # use futures::StreamExt;
261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
262    /// let keyed_singleton = // { 1: 2, 2: 4 }
263    /// # process
264    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
265    /// #     .into_keyed()
266    /// #     .first();
267    /// keyed_singleton.map(q!(|v| v + 1))
268    /// #   .entries()
269    /// # }, |mut stream| async move {
270    /// // { 1: 3, 2: 5 }
271    /// # let mut results = Vec::new();
272    /// # for _ in 0..2 {
273    /// #     results.push(stream.next().await.unwrap());
274    /// # }
275    /// # results.sort();
276    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
277    /// # }));
278    /// # }
279    /// ```
280    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
281    where
282        F: Fn(V) -> U + 'a,
283    {
284        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
285        let map_f = q!({
286            let orig = f;
287            move |(k, v)| (k, orig(v))
288        })
289        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
290        .into();
291
292        KeyedSingleton::new(
293            self.location.clone(),
294            HydroNode::Map {
295                f: map_f,
296                input: Box::new(self.ir_node.into_inner()),
297                metadata: self
298                    .location
299                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
300            },
301        )
302    }
303
304    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
305    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
306    ///
307    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
308    /// the new value `U`. The key remains unchanged in the output.
309    ///
310    /// # Example
311    /// ```rust
312    /// # #[cfg(feature = "deploy")] {
313    /// # use hydro_lang::prelude::*;
314    /// # use futures::StreamExt;
315    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
316    /// let keyed_singleton = // { 1: 2, 2: 4 }
317    /// # process
318    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
319    /// #     .into_keyed()
320    /// #     .first();
321    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
322    /// #   .entries()
323    /// # }, |mut stream| async move {
324    /// // { 1: 3, 2: 6 }
325    /// # let mut results = Vec::new();
326    /// # for _ in 0..2 {
327    /// #     results.push(stream.next().await.unwrap());
328    /// # }
329    /// # results.sort();
330    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
331    /// # }));
332    /// # }
333    /// ```
334    pub fn map_with_key<U, F>(
335        self,
336        f: impl IntoQuotedMut<'a, F, L> + Copy,
337    ) -> KeyedSingleton<K, U, L, B>
338    where
339        F: Fn((K, V)) -> U + 'a,
340        K: Clone,
341    {
342        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
343        let map_f = q!({
344            let orig = f;
345            move |(k, v)| {
346                let out = orig((Clone::clone(&k), v));
347                (k, out)
348            }
349        })
350        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
351        .into();
352
353        KeyedSingleton::new(
354            self.location.clone(),
355            HydroNode::Map {
356                f: map_f,
357                input: Box::new(self.ir_node.into_inner()),
358                metadata: self
359                    .location
360                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
361            },
362        )
363    }
364
365    /// Gets the number of keys in the keyed singleton.
366    ///
367    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
368    /// since keys may be added / removed over time. When the set of keys changes, the count will
369    /// be asynchronously updated.
370    ///
371    /// # Example
372    /// ```rust
373    /// # #[cfg(feature = "deploy")] {
374    /// # use hydro_lang::prelude::*;
375    /// # use futures::StreamExt;
376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377    /// # let tick = process.tick();
378    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
379    /// # process
380    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
381    /// #     .into_keyed()
382    /// #     .batch(&tick, nondet!(/** test */))
383    /// #     .first();
384    /// keyed_singleton.key_count()
385    /// # .all_ticks()
386    /// # }, |mut stream| async move {
387    /// // 3
388    /// # assert_eq!(stream.next().await.unwrap(), 3);
389    /// # }));
390    /// # }
391    /// ```
392    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
393        if B::ValueBound::BOUNDED {
394            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
395                location: self.location,
396                ir_node: self.ir_node,
397                _phantom: PhantomData,
398            };
399
400            me.entries().count()
401        } else if L::is_top_level()
402            && let Some(tick) = self.location.try_tick()
403        {
404            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
405                location: self.location,
406                ir_node: self.ir_node,
407                _phantom: PhantomData,
408            };
409
410            let out =
411                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
412                    .latest();
413            Singleton::new(out.location, out.ir_node.into_inner())
414        } else {
415            panic!("Unbounded KeyedSingleton inside a tick");
416        }
417    }
418
419    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
420    ///
421    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
422    /// asynchronously as well.
423    ///
424    /// # Example
425    /// ```rust
426    /// # #[cfg(feature = "deploy")] {
427    /// # use hydro_lang::prelude::*;
428    /// # use futures::StreamExt;
429    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
430    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
431    /// # process
432    /// #     .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
433    /// #     .into_keyed()
434    /// #     .batch(&process.tick(), nondet!(/** test */))
435    /// #     .first();
436    /// keyed_singleton.into_singleton()
437    /// # .all_ticks()
438    /// # }, |mut stream| async move {
439    /// // { 1: "a", 2: "b", 3: "c" }
440    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
441    /// # }));
442    /// # }
443    /// ```
444    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
445    where
446        K: Eq + Hash,
447    {
448        if B::ValueBound::BOUNDED {
449            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
450                location: self.location,
451                ir_node: self.ir_node,
452                _phantom: PhantomData,
453            };
454
455            me.entries()
456                .assume_ordering(nondet!(
457                    /// Because this is a keyed singleton, there is only one value per key.
458                ))
459                .fold(
460                    q!(|| HashMap::new()),
461                    q!(|map, (k, v)| {
462                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
463                        map.insert(k, v);
464                    }),
465                )
466        } else if L::is_top_level()
467            && let Some(tick) = self.location.try_tick()
468        {
469            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
470                location: self.location,
471                ir_node: self.ir_node,
472                _phantom: PhantomData,
473            };
474
475            let out = into_singleton_inside_tick(
476                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
477            )
478            .latest();
479            Singleton::new(out.location, out.ir_node.into_inner())
480        } else {
481            panic!("Unbounded KeyedSingleton inside a tick");
482        }
483    }
484
485    /// An operator which allows you to "name" a `HydroNode`.
486    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
487    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
488        {
489            let mut node = self.ir_node.borrow_mut();
490            let metadata = node.metadata_mut();
491            metadata.tag = Some(name.to_string());
492        }
493        self
494    }
495}
496
497impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
498    KeyedSingleton<K, V, L, B>
499{
500    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
501    ///
502    /// The value for each key must be bounded, otherwise the resulting stream elements would be
503    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
504    /// into the output.
505    ///
506    /// # Example
507    /// ```rust
508    /// # #[cfg(feature = "deploy")] {
509    /// # use hydro_lang::prelude::*;
510    /// # use futures::StreamExt;
511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512    /// let keyed_singleton = // { 1: 2, 2: 4 }
513    /// # process
514    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
515    /// #     .into_keyed()
516    /// #     .first();
517    /// keyed_singleton.entries()
518    /// # }, |mut stream| async move {
519    /// // (1, 2), (2, 4) in any order
520    /// # let mut results = Vec::new();
521    /// # for _ in 0..2 {
522    /// #     results.push(stream.next().await.unwrap());
523    /// # }
524    /// # results.sort();
525    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
526    /// # }));
527    /// # }
528    /// ```
529    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
530        self.into_keyed_stream().entries()
531    }
532
533    /// Flattens the keyed singleton into an unordered stream of just the values.
534    ///
535    /// The value for each key must be bounded, otherwise the resulting stream elements would be
536    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
537    /// into the output.
538    ///
539    /// # Example
540    /// ```rust
541    /// # #[cfg(feature = "deploy")] {
542    /// # use hydro_lang::prelude::*;
543    /// # use futures::StreamExt;
544    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
545    /// let keyed_singleton = // { 1: 2, 2: 4 }
546    /// # process
547    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
548    /// #     .into_keyed()
549    /// #     .first();
550    /// keyed_singleton.values()
551    /// # }, |mut stream| async move {
552    /// // 2, 4 in any order
553    /// # let mut results = Vec::new();
554    /// # for _ in 0..2 {
555    /// #     results.push(stream.next().await.unwrap());
556    /// # }
557    /// # results.sort();
558    /// # assert_eq!(results, vec![2, 4]);
559    /// # }));
560    /// # }
561    /// ```
562    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
563        let map_f = q!(|(_, v)| v)
564            .splice_fn1_ctx::<(K, V), V>(&self.location)
565            .into();
566
567        Stream::new(
568            self.location.clone(),
569            HydroNode::Map {
570                f: map_f,
571                input: Box::new(self.ir_node.into_inner()),
572                metadata: self.location.new_node_metadata(Stream::<
573                    V,
574                    L,
575                    B::UnderlyingBound,
576                    NoOrder,
577                    ExactlyOnce,
578                >::collection_kind()),
579            },
580        )
581    }
582
583    /// Flattens the keyed singleton into an unordered stream of just the keys.
584    ///
585    /// The value for each key must be bounded, otherwise the removal of keys would result in
586    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
587    /// into the output.
588    ///
589    /// # Example
590    /// ```rust
591    /// # #[cfg(feature = "deploy")] {
592    /// # use hydro_lang::prelude::*;
593    /// # use futures::StreamExt;
594    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
595    /// let keyed_singleton = // { 1: 2, 2: 4 }
596    /// # process
597    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
598    /// #     .into_keyed()
599    /// #     .first();
600    /// keyed_singleton.keys()
601    /// # }, |mut stream| async move {
602    /// // 1, 2 in any order
603    /// # let mut results = Vec::new();
604    /// # for _ in 0..2 {
605    /// #     results.push(stream.next().await.unwrap());
606    /// # }
607    /// # results.sort();
608    /// # assert_eq!(results, vec![1, 2]);
609    /// # }));
610    /// # }
611    /// ```
612    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
613        self.entries().map(q!(|(k, _)| k))
614    }
615
616    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
617    /// entries whose keys are not in the provided stream.
618    ///
619    /// # Example
620    /// ```rust
621    /// # #[cfg(feature = "deploy")] {
622    /// # use hydro_lang::prelude::*;
623    /// # use futures::StreamExt;
624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625    /// let tick = process.tick();
626    /// let keyed_singleton = // { 1: 2, 2: 4 }
627    /// # process
628    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
629    /// #     .into_keyed()
630    /// #     .first()
631    /// #     .batch(&tick, nondet!(/** test */));
632    /// let keys_to_remove = process
633    ///     .source_iter(q!(vec![1]))
634    ///     .batch(&tick, nondet!(/** test */));
635    /// keyed_singleton.filter_key_not_in(keys_to_remove)
636    /// #   .entries().all_ticks()
637    /// # }, |mut stream| async move {
638    /// // { 2: 4 }
639    /// # for w in vec![(2, 4)] {
640    /// #     assert_eq!(stream.next().await.unwrap(), w);
641    /// # }
642    /// # }));
643    /// # }
644    /// ```
645    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
646        self,
647        other: Stream<K, L, Bounded, O2, R2>,
648    ) -> Self
649    where
650        K: Hash + Eq,
651    {
652        check_matching_location(&self.location, &other.location);
653
654        KeyedSingleton::new(
655            self.location.clone(),
656            HydroNode::AntiJoin {
657                pos: Box::new(self.ir_node.into_inner()),
658                neg: Box::new(other.ir_node.into_inner()),
659                metadata: self.location.new_node_metadata(Self::collection_kind()),
660            },
661        )
662    }
663
664    /// An operator which allows you to "inspect" each value of a keyed singleton without
665    /// modifying it. The closure `f` is called on a reference to each value. This is
666    /// mainly useful for debugging, and should not be used to generate side-effects.
667    ///
668    /// # Example
669    /// ```rust
670    /// # #[cfg(feature = "deploy")] {
671    /// # use hydro_lang::prelude::*;
672    /// # use futures::StreamExt;
673    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674    /// let keyed_singleton = // { 1: 2, 2: 4 }
675    /// # process
676    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
677    /// #     .into_keyed()
678    /// #     .first();
679    /// keyed_singleton
680    ///     .inspect(q!(|v| println!("{}", v)))
681    /// #   .entries()
682    /// # }, |mut stream| async move {
683    /// // { 1: 2, 2: 4 }
684    /// # for w in vec![(1, 2), (2, 4)] {
685    /// #     assert_eq!(stream.next().await.unwrap(), w);
686    /// # }
687    /// # }));
688    /// # }
689    /// ```
690    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
691    where
692        F: Fn(&V) + 'a,
693    {
694        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
695        let inspect_f = q!({
696            let orig = f;
697            move |t: &(_, _)| orig(&t.1)
698        })
699        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
700        .into();
701
702        KeyedSingleton::new(
703            self.location.clone(),
704            HydroNode::Inspect {
705                f: inspect_f,
706                input: Box::new(self.ir_node.into_inner()),
707                metadata: self.location.new_node_metadata(Self::collection_kind()),
708            },
709        )
710    }
711
712    /// An operator which allows you to "inspect" each entry of a keyed singleton without
713    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
714    /// mainly useful for debugging, and should not be used to generate side-effects.
715    ///
716    /// # Example
717    /// ```rust
718    /// # #[cfg(feature = "deploy")] {
719    /// # use hydro_lang::prelude::*;
720    /// # use futures::StreamExt;
721    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722    /// let keyed_singleton = // { 1: 2, 2: 4 }
723    /// # process
724    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
725    /// #     .into_keyed()
726    /// #     .first();
727    /// keyed_singleton
728    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
729    /// #   .entries()
730    /// # }, |mut stream| async move {
731    /// // { 1: 2, 2: 4 }
732    /// # for w in vec![(1, 2), (2, 4)] {
733    /// #     assert_eq!(stream.next().await.unwrap(), w);
734    /// # }
735    /// # }));
736    /// # }
737    /// ```
738    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
739    where
740        F: Fn(&(K, V)) + 'a,
741    {
742        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
743
744        KeyedSingleton::new(
745            self.location.clone(),
746            HydroNode::Inspect {
747                f: inspect_f,
748                input: Box::new(self.ir_node.into_inner()),
749                metadata: self.location.new_node_metadata(Self::collection_kind()),
750            },
751        )
752    }
753
754    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
755    ///
756    /// Because this method requires values to be bounded, the output [`Optional`] will only be
757    /// asynchronously updated if a new key is added that is higher than the previous max key.
758    ///
759    /// # Example
760    /// ```rust
761    /// # #[cfg(feature = "deploy")] {
762    /// # use hydro_lang::prelude::*;
763    /// # use futures::StreamExt;
764    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
765    /// let tick = process.tick();
766    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
767    /// # process
768    /// #     .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
769    /// #     .into_keyed()
770    /// #     .first();
771    /// keyed_singleton.get_max_key()
772    /// # .sample_eager(nondet!(/** test */))
773    /// # }, |mut stream| async move {
774    /// // (2, 456)
775    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
776    /// # }));
777    /// # }
778    /// ```
779    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
780    where
781        K: Ord,
782    {
783        self.entries()
784            .assume_ordering(nondet!(
785                /// There is only one element associated with each key, and the keys are totallly
786                /// ordered so we will produce a deterministic value. We can't call
787                /// `reduce_commutative_idempotent` because the closure technically isn't commutative
788                /// in the case where both passed entries have the same key but different values.
789                ///
790                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
791                /// the two inputs do not have the same key.
792            ))
793            .reduce_idempotent(q!({
794                move |curr, new| {
795                    if new.0 > curr.0 {
796                        *curr = new;
797                    }
798                }
799            }))
800    }
801
802    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
803    /// element, the value.
804    ///
805    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
806    ///
807    /// # Example
808    /// ```rust
809    /// # #[cfg(feature = "deploy")] {
810    /// # use hydro_lang::prelude::*;
811    /// # use futures::StreamExt;
812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813    /// let keyed_singleton = // { 1: 2, 2: 4 }
814    /// # process
815    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
816    /// #     .into_keyed()
817    /// #     .first();
818    /// keyed_singleton
819    ///     .clone()
820    ///     .into_keyed_stream()
821    ///     .interleave(
822    ///         keyed_singleton.into_keyed_stream()
823    ///     )
824    /// #   .entries()
825    /// # }, |mut stream| async move {
826    /// /// // { 1: [2, 2], 2: [4, 4] }
827    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
828    /// #     assert_eq!(stream.next().await.unwrap(), w);
829    /// # }
830    /// # }));
831    /// # }
832    /// ```
833    pub fn into_keyed_stream(
834        self,
835    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
836        KeyedStream::new(
837            self.location.clone(),
838            HydroNode::Cast {
839                inner: Box::new(self.ir_node.into_inner()),
840                metadata: self.location.new_node_metadata(KeyedStream::<
841                    K,
842                    V,
843                    L,
844                    B::UnderlyingBound,
845                    TotalOrder,
846                    ExactlyOnce,
847                >::collection_kind()),
848            },
849        )
850    }
851}
852
853impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
854    /// Gets the value associated with a specific key from the keyed singleton.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// let keyed_data = process
864    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
865    ///     .into_keyed()
866    ///     .batch(&tick, nondet!(/** test */))
867    ///     .first();
868    /// let key = tick.singleton(q!(1));
869    /// keyed_data.get(key).all_ticks()
870    /// # }, |mut stream| async move {
871    /// // 2
872    /// # assert_eq!(stream.next().await.unwrap(), 2);
873    /// # }));
874    /// # }
875    /// ```
876    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
877        self.entries()
878            .join(key.into_stream().map(q!(|k| (k, ()))))
879            .map(q!(|(_, (v, _))| v))
880            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
881            .first()
882    }
883
884    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
885    /// is some additional metadata, emits a keyed stream of lookup results where the key
886    /// is the same as before, but the value is a tuple of the lookup result and the metadata
887    /// of the request. If the key is not found, no output will be produced.
888    ///
889    /// # Example
890    /// ```rust
891    /// # #[cfg(feature = "deploy")] {
892    /// # use hydro_lang::prelude::*;
893    /// # use futures::StreamExt;
894    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
895    /// let tick = process.tick();
896    /// let keyed_data = process
897    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
898    ///     .into_keyed()
899    ///     .batch(&tick, nondet!(/** test */))
900    ///     .first();
901    /// let other_data = process
902    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
903    ///     .into_keyed()
904    ///     .batch(&tick, nondet!(/** test */));
905    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
906    /// # }, |mut stream| async move {
907    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
908    /// # let mut results = vec![];
909    /// # for _ in 0..3 {
910    /// #     results.push(stream.next().await.unwrap());
911    /// # }
912    /// # results.sort();
913    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
914    /// # }));
915    /// # }
916    /// ```
917    pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
918        self,
919        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
920    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
921        self.entries()
922            .weaker_retries::<R2>()
923            .join(requests.entries())
924            .into_keyed()
925    }
926
927    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
928    /// is some additional metadata, emits a keyed stream of lookup results where the key
929    /// is the same as before, but the value is a tuple of the lookup result (as `Option<V>`)
930    /// and the metadata of the request. Unlike `get_many_if_present`, this returns all request
931    /// keys, with `None` for keys that are not found.
932    ///
933    /// # Example
934    /// ```rust
935    /// # #[cfg(feature = "deploy")] {
936    /// # use hydro_lang::prelude::*;
937    /// # use futures::StreamExt;
938    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
939    /// let tick = process.tick();
940    /// let keyed_data = process
941    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
942    ///     .into_keyed()
943    ///     .batch(&tick, nondet!(/** test */))
944    ///     .first();
945    /// let other_data = process
946    ///     .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
947    ///     .into_keyed()
948    ///     .batch(&tick, nondet!(/** test */));
949    /// keyed_data.get_many(other_data).entries().all_ticks()
950    /// # }, |mut stream| async move {
951    /// // { 1: [(Some(10), 100)], 2: [(Some(20), 200)], 3: [(None, 300)] } in any order
952    /// # let mut results = vec![];
953    /// # for _ in 0..3 {
954    /// #     results.push(stream.next().await.unwrap());
955    /// # }
956    /// # results.sort();
957    /// # assert_eq!(results, vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]);
958    /// # }));
959    /// # }
960    /// ```
961    #[expect(clippy::type_complexity, reason = "stream types")]
962    pub fn get_many<O2: Ordering, R2: Retries, V2>(
963        self,
964        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
965    ) -> KeyedStream<K, (Option<V>, V2), Tick<L>, Bounded, NoOrder, R2>
966    where
967        K: Clone,
968        V: Clone,
969        V2: Clone,
970    {
971        let lookup_result = self.clone().get_many_if_present(requests.clone());
972        let missing_keys = requests.filter_key_not_in(self.keys()).weakest_ordering();
973
974        lookup_result
975            .map(q!(|(v, v2)| (Some(v), v2)))
976            .chain(missing_keys.map(q!(|v2| (None, v2))))
977    }
978
979    /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
980    /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
981    /// containing the value from `self` and an option of the value from `from`. If the key is not
982    /// present in `from`, the option will be [`None`].
983    ///
984    /// # Example
985    /// ```rust
986    /// # #[cfg(feature = "deploy")] {
987    /// # use hydro_lang::prelude::*;
988    /// # use futures::StreamExt;
989    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
990    /// # let tick = process.tick();
991    /// let requests = // { 1: 10, 2: 20 }
992    /// # process
993    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
994    /// #     .into_keyed()
995    /// #     .batch(&tick, nondet!(/** test */))
996    /// #     .first();
997    /// let other_data = // { 10: 100, 11: 101 }
998    /// # process
999    /// #     .source_iter(q!(vec![(10, 100), (11, 101)]))
1000    /// #     .into_keyed()
1001    /// #     .batch(&tick, nondet!(/** test */))
1002    /// #     .first();
1003    /// requests.get_from(other_data)
1004    /// # .entries().all_ticks()
1005    /// # }, |mut stream| async move {
1006    /// // { 1: (10, Some(100)), 2: (20, None) }
1007    /// # let mut results = vec![];
1008    /// # for _ in 0..2 {
1009    /// #     results.push(stream.next().await.unwrap());
1010    /// # }
1011    /// # results.sort();
1012    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1013    /// # }));
1014    /// # }
1015    /// ```
1016    pub fn get_from<V2: Clone>(
1017        self,
1018        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
1019    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
1020    where
1021        K: Clone,
1022        V: Hash + Eq + Clone,
1023    {
1024        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
1025        let lookup_result = from.get_many_if_present(to_lookup.clone());
1026        let missing_values =
1027            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
1028        let result_stream = lookup_result
1029            .entries()
1030            .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
1031            .into_keyed()
1032            .chain(
1033                missing_values
1034                    .entries()
1035                    .map(q!(|(v, k)| (k, (v, None))))
1036                    .into_keyed(),
1037            );
1038
1039        KeyedSingleton::new(
1040            result_stream.location.clone(),
1041            HydroNode::Cast {
1042                inner: Box::new(result_stream.ir_node.into_inner()),
1043                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1044                    K,
1045                    (V, Option<V2>),
1046                    Tick<L>,
1047                    Bounded,
1048                >::collection_kind(
1049                )),
1050            },
1051        )
1052    }
1053}
1054
1055impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1056where
1057    L: Location<'a>,
1058{
1059    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1060    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1061    ///
1062    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1063    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1064    /// argument that declares where the keyed singleton will be atomically processed. Batching a
1065    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1066    /// batching into a different [`Tick`] will introduce asynchrony.
1067    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1068        let out_location = Atomic { tick: tick.clone() };
1069        KeyedSingleton::new(
1070            out_location.clone(),
1071            HydroNode::BeginAtomic {
1072                inner: Box::new(self.ir_node.into_inner()),
1073                metadata: out_location
1074                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1075            },
1076        )
1077    }
1078}
1079
1080impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1081where
1082    L: Location<'a> + NoTick,
1083{
1084    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1085    /// See [`KeyedSingleton::atomic`] for more details.
1086    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1087        KeyedSingleton::new(
1088            self.location.tick.l.clone(),
1089            HydroNode::EndAtomic {
1090                inner: Box::new(self.ir_node.into_inner()),
1091                metadata: self
1092                    .location
1093                    .tick
1094                    .l
1095                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1096            },
1097        )
1098    }
1099}
1100
1101impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1102    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1103    /// tick `T` always has the entries of `self` at tick `T - 1`.
1104    ///
1105    /// At tick `0`, the output has no entries, since there is no previous tick.
1106    ///
1107    /// This operator enables stateful iterative processing with ticks, by sending data from one
1108    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1109    ///
1110    /// # Example
1111    /// ```rust
1112    /// # #[cfg(feature = "deploy")] {
1113    /// # use hydro_lang::prelude::*;
1114    /// # use futures::StreamExt;
1115    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1116    /// let tick = process.tick();
1117    /// # // ticks are lazy by default, forces the second tick to run
1118    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1119    /// # let batch_first_tick = process
1120    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1121    /// #   .batch(&tick, nondet!(/** test */))
1122    /// #   .into_keyed();
1123    /// # let batch_second_tick = process
1124    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1125    /// #   .batch(&tick, nondet!(/** test */))
1126    /// #   .into_keyed()
1127    /// #   .defer_tick(); // appears on the second tick
1128    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1129    /// # batch_first_tick.chain(batch_second_tick).first();
1130    /// input_batch.clone().filter_key_not_in(
1131    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1132    /// )
1133    /// # .entries().all_ticks()
1134    /// # }, |mut stream| async move {
1135    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1136    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1137    /// #     assert_eq!(stream.next().await.unwrap(), w);
1138    /// # }
1139    /// # }));
1140    /// # }
1141    /// ```
1142    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1143        KeyedSingleton::new(
1144            self.location.clone(),
1145            HydroNode::DeferTick {
1146                input: Box::new(self.ir_node.into_inner()),
1147                metadata: self
1148                    .location
1149                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1150            },
1151        )
1152    }
1153}
1154
1155impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1156where
1157    L: Location<'a>,
1158{
1159    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1160    /// point in time.
1161    ///
1162    /// # Non-Determinism
1163    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1164    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1165    pub fn snapshot(
1166        self,
1167        tick: &Tick<L>,
1168        _nondet: NonDet,
1169    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1170        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1171        KeyedSingleton::new(
1172            tick.clone(),
1173            HydroNode::Batch {
1174                inner: Box::new(self.ir_node.into_inner()),
1175                metadata: tick
1176                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1177            },
1178        )
1179    }
1180}
1181
1182impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1183where
1184    L: Location<'a> + NoTick,
1185{
1186    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1187    /// state of the keyed singleton being atomically processed.
1188    ///
1189    /// # Non-Determinism
1190    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1191    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1192    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1193        KeyedSingleton::new(
1194            self.location.clone().tick,
1195            HydroNode::Batch {
1196                inner: Box::new(self.ir_node.into_inner()),
1197                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1198                    K,
1199                    V,
1200                    Tick<L>,
1201                    Bounded,
1202                >::collection_kind(
1203                )),
1204            },
1205        )
1206    }
1207}
1208
1209impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1210where
1211    L: Location<'a>,
1212{
1213    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1214    ///
1215    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1216    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1217    /// is filtered out.
1218    ///
1219    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1220    /// not modify or take ownership of the values. If you need to modify the values while filtering
1221    /// use [`KeyedSingleton::filter_map`] instead.
1222    ///
1223    /// # Example
1224    /// ```rust
1225    /// # #[cfg(feature = "deploy")] {
1226    /// # use hydro_lang::prelude::*;
1227    /// # use futures::StreamExt;
1228    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1230    /// # process
1231    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1232    /// #     .into_keyed()
1233    /// #     .first();
1234    /// keyed_singleton.filter(q!(|&v| v > 1))
1235    /// #   .entries()
1236    /// # }, |mut stream| async move {
1237    /// // { 1: 2, 2: 4 }
1238    /// # let mut results = Vec::new();
1239    /// # for _ in 0..2 {
1240    /// #     results.push(stream.next().await.unwrap());
1241    /// # }
1242    /// # results.sort();
1243    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1244    /// # }));
1245    /// # }
1246    /// ```
1247    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1248    where
1249        F: Fn(&V) -> bool + 'a,
1250    {
1251        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1252        let filter_f = q!({
1253            let orig = f;
1254            move |t: &(_, _)| orig(&t.1)
1255        })
1256        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1257        .into();
1258
1259        KeyedSingleton::new(
1260            self.location.clone(),
1261            HydroNode::Filter {
1262                f: filter_f,
1263                input: Box::new(self.ir_node.into_inner()),
1264                metadata: self
1265                    .location
1266                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1267            },
1268        )
1269    }
1270
1271    /// An operator that both filters and maps values. It yields only the key-value pairs where
1272    /// the supplied closure `f` returns `Some(value)`.
1273    ///
1274    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1275    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1276    /// If it returns `None`, the key-value pair is filtered out.
1277    ///
1278    /// # Example
1279    /// ```rust
1280    /// # #[cfg(feature = "deploy")] {
1281    /// # use hydro_lang::prelude::*;
1282    /// # use futures::StreamExt;
1283    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1284    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1285    /// # process
1286    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1287    /// #     .into_keyed()
1288    /// #     .first();
1289    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1290    /// #   .entries()
1291    /// # }, |mut stream| async move {
1292    /// // { 1: 42, 3: 100 }
1293    /// # let mut results = Vec::new();
1294    /// # for _ in 0..2 {
1295    /// #     results.push(stream.next().await.unwrap());
1296    /// # }
1297    /// # results.sort();
1298    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1299    /// # }));
1300    /// # }
1301    /// ```
1302    pub fn filter_map<F, U>(
1303        self,
1304        f: impl IntoQuotedMut<'a, F, L> + Copy,
1305    ) -> KeyedSingleton<K, U, L, B>
1306    where
1307        F: Fn(V) -> Option<U> + 'a,
1308    {
1309        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1310        let filter_map_f = q!({
1311            let orig = f;
1312            move |(k, v)| orig(v).map(|o| (k, o))
1313        })
1314        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1315        .into();
1316
1317        KeyedSingleton::new(
1318            self.location.clone(),
1319            HydroNode::FilterMap {
1320                f: filter_map_f,
1321                input: Box::new(self.ir_node.into_inner()),
1322                metadata: self
1323                    .location
1324                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1325            },
1326        )
1327    }
1328
1329    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1330    /// arrived since the previous batch was released.
1331    ///
1332    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1333    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1334    ///
1335    /// # Non-Determinism
1336    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1337    /// has a non-deterministic set of key-value pairs.
1338    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1339    where
1340        L: NoTick,
1341    {
1342        self.atomic(tick).batch_atomic(nondet)
1343    }
1344}
1345
1346impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1347where
1348    L: Location<'a> + NoTick,
1349{
1350    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1351    /// atomically processed.
1352    ///
1353    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1354    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1355    ///
1356    /// # Non-Determinism
1357    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1358    /// has a non-deterministic set of key-value pairs.
1359    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1360        let _ = nondet;
1361        KeyedSingleton::new(
1362            self.location.clone().tick,
1363            HydroNode::Batch {
1364                inner: Box::new(self.ir_node.into_inner()),
1365                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1366                    K,
1367                    V,
1368                    Tick<L>,
1369                    Bounded,
1370                >::collection_kind(
1371                )),
1372            },
1373        )
1374    }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    #[cfg(feature = "deploy")]
1380    use futures::{SinkExt, StreamExt};
1381    #[cfg(feature = "deploy")]
1382    use hydro_deploy::Deployment;
1383    #[cfg(any(feature = "deploy", feature = "sim"))]
1384    use stageleft::q;
1385
1386    #[cfg(any(feature = "deploy", feature = "sim"))]
1387    use crate::compile::builder::FlowBuilder;
1388    #[cfg(any(feature = "deploy", feature = "sim"))]
1389    use crate::location::Location;
1390    #[cfg(any(feature = "deploy", feature = "sim"))]
1391    use crate::nondet::nondet;
1392
1393    #[cfg(feature = "deploy")]
1394    #[tokio::test]
1395    async fn key_count_bounded_value() {
1396        let mut deployment = Deployment::new();
1397
1398        let flow = FlowBuilder::new();
1399        let node = flow.process::<()>();
1400        let external = flow.external::<()>();
1401
1402        let (input_port, input) = node.source_external_bincode(&external);
1403        let out = input
1404            .into_keyed()
1405            .first()
1406            .key_count()
1407            .sample_eager(nondet!(/** test */))
1408            .send_bincode_external(&external);
1409
1410        let nodes = flow
1411            .with_process(&node, deployment.Localhost())
1412            .with_external(&external, deployment.Localhost())
1413            .deploy(&mut deployment);
1414
1415        deployment.deploy().await.unwrap();
1416
1417        let mut external_in = nodes.connect(input_port).await;
1418        let mut external_out = nodes.connect(out).await;
1419
1420        deployment.start().await.unwrap();
1421
1422        assert_eq!(external_out.next().await.unwrap(), 0);
1423
1424        external_in.send((1, 1)).await.unwrap();
1425        assert_eq!(external_out.next().await.unwrap(), 1);
1426
1427        external_in.send((2, 2)).await.unwrap();
1428        assert_eq!(external_out.next().await.unwrap(), 2);
1429    }
1430
1431    #[cfg(feature = "deploy")]
1432    #[tokio::test]
1433    async fn key_count_unbounded_value() {
1434        let mut deployment = Deployment::new();
1435
1436        let flow = FlowBuilder::new();
1437        let node = flow.process::<()>();
1438        let external = flow.external::<()>();
1439
1440        let (input_port, input) = node.source_external_bincode(&external);
1441        let out = input
1442            .into_keyed()
1443            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1444            .key_count()
1445            .sample_eager(nondet!(/** test */))
1446            .send_bincode_external(&external);
1447
1448        let nodes = flow
1449            .with_process(&node, deployment.Localhost())
1450            .with_external(&external, deployment.Localhost())
1451            .deploy(&mut deployment);
1452
1453        deployment.deploy().await.unwrap();
1454
1455        let mut external_in = nodes.connect(input_port).await;
1456        let mut external_out = nodes.connect(out).await;
1457
1458        deployment.start().await.unwrap();
1459
1460        assert_eq!(external_out.next().await.unwrap(), 0);
1461
1462        external_in.send((1, 1)).await.unwrap();
1463        assert_eq!(external_out.next().await.unwrap(), 1);
1464
1465        external_in.send((1, 2)).await.unwrap();
1466        assert_eq!(external_out.next().await.unwrap(), 1);
1467
1468        external_in.send((2, 2)).await.unwrap();
1469        assert_eq!(external_out.next().await.unwrap(), 2);
1470
1471        external_in.send((1, 1)).await.unwrap();
1472        assert_eq!(external_out.next().await.unwrap(), 2);
1473
1474        external_in.send((3, 1)).await.unwrap();
1475        assert_eq!(external_out.next().await.unwrap(), 3);
1476    }
1477
1478    #[cfg(feature = "deploy")]
1479    #[tokio::test]
1480    async fn into_singleton_bounded_value() {
1481        let mut deployment = Deployment::new();
1482
1483        let flow = FlowBuilder::new();
1484        let node = flow.process::<()>();
1485        let external = flow.external::<()>();
1486
1487        let (input_port, input) = node.source_external_bincode(&external);
1488        let out = input
1489            .into_keyed()
1490            .first()
1491            .into_singleton()
1492            .sample_eager(nondet!(/** test */))
1493            .send_bincode_external(&external);
1494
1495        let nodes = flow
1496            .with_process(&node, deployment.Localhost())
1497            .with_external(&external, deployment.Localhost())
1498            .deploy(&mut deployment);
1499
1500        deployment.deploy().await.unwrap();
1501
1502        let mut external_in = nodes.connect(input_port).await;
1503        let mut external_out = nodes.connect(out).await;
1504
1505        deployment.start().await.unwrap();
1506
1507        assert_eq!(
1508            external_out.next().await.unwrap(),
1509            std::collections::HashMap::new()
1510        );
1511
1512        external_in.send((1, 1)).await.unwrap();
1513        assert_eq!(
1514            external_out.next().await.unwrap(),
1515            vec![(1, 1)].into_iter().collect()
1516        );
1517
1518        external_in.send((2, 2)).await.unwrap();
1519        assert_eq!(
1520            external_out.next().await.unwrap(),
1521            vec![(1, 1), (2, 2)].into_iter().collect()
1522        );
1523    }
1524
1525    #[cfg(feature = "deploy")]
1526    #[tokio::test]
1527    async fn into_singleton_unbounded_value() {
1528        let mut deployment = Deployment::new();
1529
1530        let flow = FlowBuilder::new();
1531        let node = flow.process::<()>();
1532        let external = flow.external::<()>();
1533
1534        let (input_port, input) = node.source_external_bincode(&external);
1535        let out = input
1536            .into_keyed()
1537            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1538            .into_singleton()
1539            .sample_eager(nondet!(/** test */))
1540            .send_bincode_external(&external);
1541
1542        let nodes = flow
1543            .with_process(&node, deployment.Localhost())
1544            .with_external(&external, deployment.Localhost())
1545            .deploy(&mut deployment);
1546
1547        deployment.deploy().await.unwrap();
1548
1549        let mut external_in = nodes.connect(input_port).await;
1550        let mut external_out = nodes.connect(out).await;
1551
1552        deployment.start().await.unwrap();
1553
1554        assert_eq!(
1555            external_out.next().await.unwrap(),
1556            std::collections::HashMap::new()
1557        );
1558
1559        external_in.send((1, 1)).await.unwrap();
1560        assert_eq!(
1561            external_out.next().await.unwrap(),
1562            vec![(1, 1)].into_iter().collect()
1563        );
1564
1565        external_in.send((1, 2)).await.unwrap();
1566        assert_eq!(
1567            external_out.next().await.unwrap(),
1568            vec![(1, 2)].into_iter().collect()
1569        );
1570
1571        external_in.send((2, 2)).await.unwrap();
1572        assert_eq!(
1573            external_out.next().await.unwrap(),
1574            vec![(1, 2), (2, 1)].into_iter().collect()
1575        );
1576
1577        external_in.send((1, 1)).await.unwrap();
1578        assert_eq!(
1579            external_out.next().await.unwrap(),
1580            vec![(1, 3), (2, 1)].into_iter().collect()
1581        );
1582
1583        external_in.send((3, 1)).await.unwrap();
1584        assert_eq!(
1585            external_out.next().await.unwrap(),
1586            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1587        );
1588    }
1589
1590    #[cfg(feature = "sim")]
1591    #[test]
1592    fn sim_unbounded_singleton_snapshot() {
1593        let flow = FlowBuilder::new();
1594        let node = flow.process::<()>();
1595
1596        let (input_port, input) = node.sim_input();
1597        let output = input
1598            .into_keyed()
1599            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1600            .snapshot(&node.tick(), nondet!(/** test */))
1601            .entries()
1602            .all_ticks()
1603            .sim_output();
1604
1605        let count = flow.sim().exhaustive(async || {
1606            input_port.send((1, 123));
1607            input_port.send((1, 456));
1608            input_port.send((2, 123));
1609
1610            let all = output.collect_sorted::<Vec<_>>().await;
1611            assert_eq!(all.last().unwrap(), &(2, 1));
1612        });
1613
1614        assert_eq!(count, 8);
1615    }
1616
1617    #[cfg(feature = "deploy")]
1618    #[tokio::test]
1619    async fn get_many_outer_join() {
1620        let mut deployment = Deployment::new();
1621
1622        let flow = FlowBuilder::new();
1623        let node = flow.process::<()>();
1624        let external = flow.external::<()>();
1625
1626        let tick = node.tick();
1627        let keyed_data = node
1628            .source_iter(q!(vec![(1, 10), (2, 20)]))
1629            .into_keyed()
1630            .batch(&tick, nondet!(/** test */))
1631            .first();
1632        let requests = node
1633            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1634            .into_keyed()
1635            .batch(&tick, nondet!(/** test */));
1636
1637        let out = keyed_data
1638            .get_many(requests)
1639            .entries()
1640            .all_ticks()
1641            .send_bincode_external(&external);
1642
1643        let nodes = flow
1644            .with_process(&node, deployment.Localhost())
1645            .with_external(&external, deployment.Localhost())
1646            .deploy(&mut deployment);
1647
1648        deployment.deploy().await.unwrap();
1649
1650        let mut external_out = nodes.connect(out).await;
1651
1652        deployment.start().await.unwrap();
1653
1654        let mut results = vec![];
1655        for _ in 0..3 {
1656            results.push(external_out.next().await.unwrap());
1657        }
1658        results.sort();
1659
1660        assert_eq!(
1661            results,
1662            vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]
1663        );
1664    }
1665}