Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use sealed::sealed;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_stream::KeyedStream;
15use super::optional::Optional;
16use super::singleton::Singleton;
17use super::sliced::sliced;
18use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::stream::{Ordering, Retries};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::manual_proof;
34
35/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
36///
37/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
38/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
39/// indicates that entries may be added over time, but once an entry is added it will never be
40/// removed and its value will never change.
41pub trait KeyedSingletonBound {
42    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
43    type UnderlyingBound: Boundedness;
44    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
45    type ValueBound: Boundedness;
46
47    /// The type of the keyed singleton if the value for each key is immutable.
48    type WithBoundedValue: KeyedSingletonBound<
49            UnderlyingBound = Self::UnderlyingBound,
50            ValueBound = Bounded,
51            EraseMonotonic = Self::WithBoundedValue,
52        >;
53
54    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
55    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
56
57    /// The type of the keyed singleton if the value for each key is no longer monotonic.
58    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
59
60    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
61    fn bound_kind() -> KeyedSingletonBoundKind;
62}
63
64impl KeyedSingletonBound for Unbounded {
65    type UnderlyingBound = Unbounded;
66    type ValueBound = Unbounded;
67    type WithBoundedValue = BoundedValue;
68    type KeyedStreamToMonotone = MonotonicValue;
69    type EraseMonotonic = Unbounded;
70
71    fn bound_kind() -> KeyedSingletonBoundKind {
72        KeyedSingletonBoundKind::Unbounded
73    }
74}
75
76impl KeyedSingletonBound for Bounded {
77    type UnderlyingBound = Bounded;
78    type ValueBound = Bounded;
79    type WithBoundedValue = Bounded;
80    type KeyedStreamToMonotone = Bounded;
81    type EraseMonotonic = Bounded;
82
83    fn bound_kind() -> KeyedSingletonBoundKind {
84        KeyedSingletonBoundKind::Bounded
85    }
86}
87
88/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
89/// its value is bounded and will never change, but new entries may appear asynchronously
90pub struct BoundedValue;
91
92impl KeyedSingletonBound for BoundedValue {
93    type UnderlyingBound = Unbounded;
94    type ValueBound = Bounded;
95    type WithBoundedValue = BoundedValue;
96    type KeyedStreamToMonotone = BoundedValue;
97    type EraseMonotonic = BoundedValue;
98
99    fn bound_kind() -> KeyedSingletonBoundKind {
100        KeyedSingletonBoundKind::BoundedValue
101    }
102}
103
104/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
105/// it will never be removed, and the corresponding value will only increase monotonically.
106pub struct MonotonicValue;
107
108impl KeyedSingletonBound for MonotonicValue {
109    type UnderlyingBound = Unbounded;
110    type ValueBound = Unbounded;
111    type WithBoundedValue = BoundedValue;
112    type KeyedStreamToMonotone = MonotonicValue;
113    type EraseMonotonic = Unbounded;
114
115    fn bound_kind() -> KeyedSingletonBoundKind {
116        KeyedSingletonBoundKind::MonotonicValue
117    }
118}
119
120#[sealed]
121#[diagnostic::on_unimplemented(
122    message = "The keyed singleton must have monotonic values (`MonotonicValue`) or be bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
123    label = "required here",
124    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
125)]
126/// Marker trait that is implemented for [`KeyedSingletonBound`] types whose per-key values
127/// are monotonically non-decreasing (or bounded).
128pub trait IsKeyedMonotonic: KeyedSingletonBound {}
129
130#[sealed]
131#[diagnostic::do_not_recommend]
132impl IsKeyedMonotonic for MonotonicValue {}
133
134#[sealed]
135#[diagnostic::do_not_recommend]
136impl IsKeyedMonotonic for BoundedValue {}
137
138#[sealed]
139#[diagnostic::do_not_recommend]
140impl<B: IsBounded + KeyedSingletonBound> IsKeyedMonotonic for B {}
141
142/// Mapping from keys of type `K` to values of type `V`.
143///
144/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
145/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
146/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
147/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
148/// keys cannot be removed and the value for each key is immutable.
149///
150/// Type Parameters:
151/// - `K`: the type of the key for each entry
152/// - `V`: the type of the value for each entry
153/// - `Loc`: the [`Location`] where the keyed singleton is materialized
154/// - `Bound`: tracks whether the entries are:
155///     - [`Bounded`] (local and finite)
156///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
157///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
158pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
159    pub(crate) location: Loc,
160    pub(crate) ir_node: RefCell<HydroNode>,
161    pub(crate) flow_state: FlowState,
162
163    _phantom: PhantomData<(K, V, Loc, Bound)>,
164}
165
166impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
167    fn drop(&mut self) {
168        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
169        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
170            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
171                input: Box::new(ir_node),
172                op_metadata: HydroIrOpMetadata::new(),
173            });
174        }
175    }
176}
177
178impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
179    for KeyedSingleton<K, V, Loc, Bound>
180{
181    fn clone(&self) -> Self {
182        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
183            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
184            *self.ir_node.borrow_mut() = HydroNode::Tee {
185                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
186                metadata: self.location.new_node_metadata(Self::collection_kind()),
187            };
188        }
189
190        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
191            KeyedSingleton {
192                location: self.location.clone(),
193                flow_state: self.flow_state.clone(),
194                ir_node: HydroNode::Tee {
195                    inner: SharedNode(inner.0.clone()),
196                    metadata: metadata.clone(),
197                }
198                .into(),
199                _phantom: PhantomData,
200            }
201        } else {
202            unreachable!()
203        }
204    }
205}
206
207impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
208    for KeyedSingleton<K, V, L, B>
209where
210    L: Location<'a>,
211{
212    type Location = L;
213
214    fn create_source(cycle_id: CycleId, location: L) -> Self {
215        KeyedSingleton {
216            flow_state: location.flow_state().clone(),
217            location: location.clone(),
218            ir_node: RefCell::new(HydroNode::CycleSource {
219                cycle_id,
220                metadata: location.new_node_metadata(Self::collection_kind()),
221            }),
222            _phantom: PhantomData,
223        }
224    }
225}
226
227impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
228where
229    L: Location<'a>,
230{
231    type Location = Tick<L>;
232
233    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
234        KeyedSingleton::new(
235            location.clone(),
236            HydroNode::CycleSource {
237                cycle_id,
238                metadata: location.new_node_metadata(Self::collection_kind()),
239            },
240        )
241    }
242}
243
244impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
245where
246    L: Location<'a>,
247{
248    fn defer_tick(self) -> Self {
249        KeyedSingleton::defer_tick(self)
250    }
251}
252
253impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
254    for KeyedSingleton<K, V, L, B>
255where
256    L: Location<'a>,
257{
258    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
259        assert_eq!(
260            Location::id(&self.location),
261            expected_location,
262            "locations do not match"
263        );
264        self.location
265            .flow_state()
266            .borrow_mut()
267            .push_root(HydroRoot::CycleSink {
268                cycle_id,
269                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
270                op_metadata: HydroIrOpMetadata::new(),
271            });
272    }
273}
274
275impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
276where
277    L: Location<'a>,
278{
279    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
280        assert_eq!(
281            Location::id(&self.location),
282            expected_location,
283            "locations do not match"
284        );
285        self.location
286            .flow_state()
287            .borrow_mut()
288            .push_root(HydroRoot::CycleSink {
289                cycle_id,
290                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
291                op_metadata: HydroIrOpMetadata::new(),
292            });
293    }
294}
295
296impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
297    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
298        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
299        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
300
301        let flow_state = location.flow_state().clone();
302        KeyedSingleton {
303            location,
304            flow_state,
305            ir_node: RefCell::new(ir_node),
306            _phantom: PhantomData,
307        }
308    }
309
310    /// Returns the [`Location`] where this keyed singleton is being materialized.
311    pub fn location(&self) -> &L {
312        &self.location
313    }
314
315    /// Weakens the consistency of this live collection to not guarantee any consistency across
316    /// cluster members (if this collection is on a cluster).
317    pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
318    where
319        L: Location<'a>,
320    {
321        if L::consistency()
322            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
323        {
324            // already no consistency
325            KeyedSingleton::new(
326                self.location.drop_consistency(),
327                self.ir_node.replace(HydroNode::Placeholder),
328            )
329        } else {
330            KeyedSingleton::new(
331                self.location.drop_consistency(),
332                HydroNode::Cast {
333                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334                    metadata: self
335                        .location
336                        .drop_consistency()
337                        .new_node_metadata(
338                            KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
339                        ),
340                },
341            )
342        }
343    }
344
345    /// Casts this live collection to have the consistency guarantees specified in the given
346    /// location type parameter. The developer must ensure that the strengthened consistency
347    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
348    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
349        self,
350        _proof: impl crate::properties::ConsistencyProof,
351    ) -> KeyedSingleton<K, V, L2, B>
352    where
353        L: Location<'a>,
354    {
355        if L::consistency() == L2::consistency() {
356            // already consistent
357            KeyedSingleton::new(
358                self.location.with_consistency_of(),
359                self.ir_node.replace(HydroNode::Placeholder),
360            )
361        } else {
362            KeyedSingleton::new(
363                self.location.with_consistency_of(),
364                HydroNode::AssertIsConsistent {
365                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
366                    trusted: false,
367                    metadata: self
368                        .location
369                        .clone()
370                        .with_consistency_of::<L2>()
371                        .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
372                },
373            )
374        }
375    }
376}
377
378#[cfg(stageleft_runtime)]
379fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
380    me: KeyedSingleton<K, V, L, Bounded>,
381) -> Singleton<usize, L, Bounded> {
382    me.entries().count()
383}
384
385#[cfg(stageleft_runtime)]
386fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
387    me: KeyedSingleton<K, V, L, Bounded>,
388) -> Singleton<HashMap<K, V>, L, Bounded>
389where
390    K: Eq + Hash,
391{
392    me.entries()
393        .assume_ordering_trusted(nondet!(
394            /// There is only one element associated with each key. The closure technically
395            /// isn't commutative in the case where both passed entries have the same key
396            /// but different values.
397            ///
398            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
399            /// the key is never already present in the map.
400        ))
401        .fold(
402            q!(|| HashMap::new()),
403            q!(|map, (k, v)| {
404                map.insert(k, v);
405            }),
406        )
407}
408
409impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
410    pub(crate) fn collection_kind() -> CollectionKind {
411        CollectionKind::KeyedSingleton {
412            bound: B::bound_kind(),
413            key_type: stageleft::quote_type::<K>().into(),
414            value_type: stageleft::quote_type::<V>().into(),
415        }
416    }
417
418    /// Transforms each value by invoking `f` on each element, with keys staying the same
419    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
420    ///
421    /// If you do not want to modify the stream and instead only want to view
422    /// each item use [`KeyedSingleton::inspect`] instead.
423    ///
424    /// # Example
425    /// ```rust
426    /// # #[cfg(feature = "deploy")] {
427    /// # use hydro_lang::prelude::*;
428    /// # use futures::StreamExt;
429    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
430    /// let keyed_singleton = // { 1: 2, 2: 4 }
431    /// # process
432    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
433    /// #     .into_keyed()
434    /// #     .first();
435    /// keyed_singleton.map(q!(|v| v + 1))
436    /// #   .entries()
437    /// # }, |mut stream| async move {
438    /// // { 1: 3, 2: 5 }
439    /// # let mut results = Vec::new();
440    /// # for _ in 0..2 {
441    /// #     results.push(stream.next().await.unwrap());
442    /// # }
443    /// # results.sort();
444    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
445    /// # }));
446    /// # }
447    /// ```
448    pub fn map<U, F>(
449        self,
450        f: impl IntoQuotedMut<'a, F, L> + Copy,
451    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
452    where
453        F: Fn(V) -> U + 'a,
454    {
455        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
456        let map_f = q!({
457            let orig = f;
458            move |(k, v)| (k, orig(v))
459        })
460        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
461        .into();
462
463        KeyedSingleton::new(
464            self.location.clone(),
465            HydroNode::Map {
466                f: map_f,
467                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
468                metadata: self.location.new_node_metadata(KeyedSingleton::<
469                    K,
470                    U,
471                    L,
472                    B::EraseMonotonic,
473                >::collection_kind()),
474            },
475        )
476    }
477
478    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
479    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
480    ///
481    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
482    /// the new value `U`. The key remains unchanged in the output.
483    ///
484    /// # Example
485    /// ```rust
486    /// # #[cfg(feature = "deploy")] {
487    /// # use hydro_lang::prelude::*;
488    /// # use futures::StreamExt;
489    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
490    /// let keyed_singleton = // { 1: 2, 2: 4 }
491    /// # process
492    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
493    /// #     .into_keyed()
494    /// #     .first();
495    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
496    /// #   .entries()
497    /// # }, |mut stream| async move {
498    /// // { 1: 3, 2: 6 }
499    /// # let mut results = Vec::new();
500    /// # for _ in 0..2 {
501    /// #     results.push(stream.next().await.unwrap());
502    /// # }
503    /// # results.sort();
504    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
505    /// # }));
506    /// # }
507    /// ```
508    pub fn map_with_key<U, F>(
509        self,
510        f: impl IntoQuotedMut<'a, F, L> + Copy,
511    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
512    where
513        F: Fn((K, V)) -> U + 'a,
514        K: Clone,
515    {
516        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
517        let map_f = q!({
518            let orig = f;
519            move |(k, v)| {
520                let out = orig((Clone::clone(&k), v));
521                (k, out)
522            }
523        })
524        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
525        .into();
526
527        KeyedSingleton::new(
528            self.location.clone(),
529            HydroNode::Map {
530                f: map_f,
531                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532                metadata: self.location.new_node_metadata(KeyedSingleton::<
533                    K,
534                    U,
535                    L,
536                    B::EraseMonotonic,
537                >::collection_kind()),
538            },
539        )
540    }
541
542    /// Gets the number of keys in the keyed singleton.
543    ///
544    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
545    /// since keys may be added / removed over time. When the set of keys changes, the count will
546    /// be asynchronously updated.
547    ///
548    /// # Example
549    /// ```rust
550    /// # #[cfg(feature = "deploy")] {
551    /// # use hydro_lang::prelude::*;
552    /// # use futures::StreamExt;
553    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
554    /// # let tick = process.tick();
555    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
556    /// # process
557    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
558    /// #     .into_keyed()
559    /// #     .batch(&tick, nondet!(/** test */))
560    /// #     .first();
561    /// keyed_singleton.key_count()
562    /// # .all_ticks()
563    /// # }, |mut stream| async move {
564    /// // 3
565    /// # assert_eq!(stream.next().await.unwrap(), 3);
566    /// # }));
567    /// # }
568    /// ```
569    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
570        if B::ValueBound::BOUNDED {
571            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
572                location: self.location.clone(),
573                flow_state: self.flow_state.clone(),
574                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
575                _phantom: PhantomData,
576            };
577
578            me.entries().count().ignore_monotonic()
579        } else if L::is_top_level()
580            && let Some(tick) = self.location.try_tick()
581            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
582        {
583            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
584                self.location.clone(),
585                self.ir_node.replace(HydroNode::Placeholder),
586            );
587
588            let out =
589                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
590                    .latest();
591            Singleton::new(
592                self.location.clone(),
593                out.ir_node.replace(HydroNode::Placeholder),
594            )
595        } else {
596            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
597        }
598    }
599
600    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
601    ///
602    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
603    /// asynchronously as well.
604    ///
605    /// # Example
606    /// ```rust
607    /// # #[cfg(feature = "deploy")] {
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
612    /// # process
613    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
614    /// #     .into_keyed()
615    /// #     .batch(&process.tick(), nondet!(/** test */))
616    /// #     .first();
617    /// keyed_singleton.into_singleton()
618    /// # .all_ticks()
619    /// # }, |mut stream| async move {
620    /// // { 1: "a", 2: "b", 3: "c" }
621    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
622    /// # }));
623    /// # }
624    /// ```
625    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
626    where
627        K: Eq + Hash,
628    {
629        if B::ValueBound::BOUNDED {
630            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
631                location: self.location.clone(),
632                flow_state: self.flow_state.clone(),
633                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
634                _phantom: PhantomData,
635            };
636
637            me.entries()
638                .assume_ordering_trusted(nondet!(
639                    /// There is only one element associated with each key. The closure technically
640                    /// isn't commutative in the case where both passed entries have the same key
641                    /// but different values.
642                    ///
643                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
644                    /// the key is never already present in the map.
645                ))
646                .fold(
647                    q!(|| HashMap::new()),
648                    q!(|map, (k, v)| {
649                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
650                        map.insert(k, v);
651                    }),
652                )
653        } else if L::is_top_level()
654            && let Some(tick) = self.location.try_tick()
655            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
656        {
657            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
658                self.location.clone(),
659                self.ir_node.replace(HydroNode::Placeholder),
660            );
661
662            let out = into_singleton_inside_tick(
663                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
664            )
665            .latest();
666            Singleton::new(
667                self.location.clone(),
668                out.ir_node.replace(HydroNode::Placeholder),
669            )
670        } else {
671            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
672        }
673    }
674
675    /// An operator which allows you to "name" a `HydroNode`.
676    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
677    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
678        {
679            let mut node = self.ir_node.borrow_mut();
680            let metadata = node.metadata_mut();
681            metadata.tag = Some(name.to_owned());
682        }
683        self
684    }
685
686    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
687    /// implies that `B == Bounded`.
688    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
689    where
690        B: IsBounded,
691    {
692        KeyedSingleton::new(
693            self.location.clone(),
694            self.ir_node.replace(HydroNode::Placeholder),
695        )
696    }
697
698    /// Gets the value associated with a specific key from the keyed singleton.
699    /// Returns `None` if the key is `None` or there is no associated value.
700    ///
701    /// # Example
702    /// ```rust
703    /// # #[cfg(feature = "deploy")] {
704    /// # use hydro_lang::prelude::*;
705    /// # use futures::StreamExt;
706    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707    /// let tick = process.tick();
708    /// let keyed_data = process
709    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
710    ///     .into_keyed()
711    ///     .batch(&tick, nondet!(/** test */))
712    ///     .first();
713    /// let key = tick.singleton(q!(1));
714    /// keyed_data.get(key).all_ticks()
715    /// # }, |mut stream| async move {
716    /// // 2
717    /// # assert_eq!(stream.next().await.unwrap(), 2);
718    /// # }));
719    /// # }
720    /// ```
721    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
722    where
723        B: IsBounded,
724        K: Hash + Eq + Clone,
725        V: Clone,
726    {
727        self.make_bounded()
728            .into_keyed_stream()
729            .get(key)
730            .cast_at_most_one_element()
731    }
732
733    /// Emit a keyed stream containing keys shared between the keyed singleton and the
734    /// keyed stream, where each value in the output keyed stream is a tuple of
735    /// (the keyed singleton's value, the keyed stream's value).
736    ///
737    /// # Example
738    /// ```rust
739    /// # #[cfg(feature = "deploy")] {
740    /// # use hydro_lang::prelude::*;
741    /// # use futures::StreamExt;
742    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
743    /// let tick = process.tick();
744    /// let keyed_data = process
745    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
746    ///     .into_keyed()
747    ///     .batch(&tick, nondet!(/** test */))
748    ///     .first();
749    /// let other_data = process
750    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
751    ///     .into_keyed()
752    ///     .batch(&tick, nondet!(/** test */));
753    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
754    /// # }, |mut stream| async move {
755    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
756    /// # let mut results = vec![];
757    /// # for _ in 0..3 {
758    /// #     results.push(stream.next().await.unwrap());
759    /// # }
760    /// # results.sort();
761    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
762    /// # }));
763    /// # }
764    /// ```
765    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
766        self,
767        other: KeyedStream<K, V2, L, B2, O2, R2>,
768    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
769    where
770        B: IsBounded,
771        K: Eq + Hash + Clone,
772        V: Clone,
773        V2: Clone,
774    {
775        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
776        // always produces deterministic order per key (nested loop join), this could just use
777        // `join_keyed_stream` without constructing IRs manually
778        KeyedStream::new(
779            self.location.clone(),
780            HydroNode::Join {
781                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
782                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
783                metadata: self
784                    .location
785                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
786            },
787        )
788    }
789
790    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
791    /// where each value in the output keyed singleton is a tuple of
792    /// (self.value, other.value).
793    ///
794    /// # Example
795    /// ```rust
796    /// # #[cfg(feature = "deploy")] {
797    /// # use hydro_lang::prelude::*;
798    /// # use futures::StreamExt;
799    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
800    /// # let tick = process.tick();
801    /// let requests = // { 1: 10, 2: 20, 3: 30 }
802    /// # process
803    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
804    /// #     .into_keyed()
805    /// #     .batch(&tick, nondet!(/** test */))
806    /// #     .first();
807    /// let other = // { 1: 100, 2: 200, 4: 400 }
808    /// # process
809    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
810    /// #     .into_keyed()
811    /// #     .batch(&tick, nondet!(/** test */))
812    /// #     .first();
813    /// requests.join_keyed_singleton(other)
814    /// # .entries().all_ticks()
815    /// # }, |mut stream| async move {
816    /// // { 1: (10, 100), 2: (20, 200) }
817    /// # let mut results = vec![];
818    /// # for _ in 0..2 {
819    /// #     results.push(stream.next().await.unwrap());
820    /// # }
821    /// # results.sort();
822    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
823    /// # }));
824    /// # }
825    /// ```
826    pub fn join_keyed_singleton<V2: Clone>(
827        self,
828        other: KeyedSingleton<K, V2, L, Bounded>,
829    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
830    where
831        B: IsBounded,
832        K: Eq + Hash + Clone,
833        V: Clone,
834    {
835        let result_stream = self
836            .make_bounded()
837            .entries()
838            .join(other.entries())
839            .into_keyed();
840
841        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
842        result_stream.cast_at_most_one_entry_per_key()
843    }
844
845    /// For each value in `self`, find the matching key in `lookup`.
846    /// The output is a keyed singleton with the key from `self`, and a value
847    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
848    /// If the key is not present in `lookup`, the option will be [`None`].
849    ///
850    /// # Example
851    /// ```rust
852    /// # #[cfg(feature = "deploy")] {
853    /// # use hydro_lang::prelude::*;
854    /// # use futures::StreamExt;
855    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
856    /// # let tick = process.tick();
857    /// let requests = // { 1: 10, 2: 20 }
858    /// # process
859    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
860    /// #     .into_keyed()
861    /// #     .batch(&tick, nondet!(/** test */))
862    /// #     .first();
863    /// let other_data = // { 10: 100, 11: 110 }
864    /// # process
865    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
866    /// #     .into_keyed()
867    /// #     .batch(&tick, nondet!(/** test */))
868    /// #     .first();
869    /// requests.lookup_keyed_singleton(other_data)
870    /// # .entries().all_ticks()
871    /// # }, |mut stream| async move {
872    /// // { 1: (10, Some(100)), 2: (20, None) }
873    /// # let mut results = vec![];
874    /// # for _ in 0..2 {
875    /// #     results.push(stream.next().await.unwrap());
876    /// # }
877    /// # results.sort();
878    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
879    /// # }));
880    /// # }
881    /// ```
882    pub fn lookup_keyed_singleton<V2>(
883        self,
884        lookup: KeyedSingleton<V, V2, L, Bounded>,
885    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
886    where
887        B: IsBounded,
888        K: Eq + Hash + Clone,
889        V: Eq + Hash + Clone,
890        V2: Clone,
891    {
892        let result_stream = self
893            .make_bounded()
894            .into_keyed_stream()
895            .lookup_keyed_stream(lookup.into_keyed_stream());
896
897        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
898        result_stream.cast_at_most_one_entry_per_key()
899    }
900
901    /// For each value in `self`, find the matching key in `lookup`.
902    /// The output is a keyed stream with the key from `self`, and a value
903    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
904    /// If the key is not present in `lookup`, the option will be [`None`].
905    ///
906    /// # Example
907    /// ```rust
908    /// # #[cfg(feature = "deploy")] {
909    /// # use hydro_lang::prelude::*;
910    /// # use futures::StreamExt;
911    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
912    /// # let tick = process.tick();
913    /// let requests = // { 1: 10, 2: 20 }
914    /// # process
915    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
916    /// #     .into_keyed()
917    /// #     .batch(&tick, nondet!(/** test */))
918    /// #     .first();
919    /// let other_data = // { 10: 100, 10: 110 }
920    /// # process
921    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
922    /// #     .into_keyed()
923    /// #     .batch(&tick, nondet!(/** test */));
924    /// requests.lookup_keyed_stream(other_data)
925    /// # .entries().all_ticks()
926    /// # }, |mut stream| async move {
927    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
928    /// # let mut results = vec![];
929    /// # for _ in 0..3 {
930    /// #     results.push(stream.next().await.unwrap());
931    /// # }
932    /// # results.sort();
933    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
934    /// # }));
935    /// # }
936    /// ```
937    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
938        self,
939        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
940    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
941    where
942        B: IsBounded,
943        K: Eq + Hash + Clone,
944        V: Eq + Hash + Clone,
945        V2: Clone,
946    {
947        self.make_bounded()
948            .entries()
949            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
950            .into_keyed()
951            .lookup_keyed_stream(lookup)
952    }
953
954    /// For each key present in both `self` and `thresholds`, emits a [`KeyedStream`] event the first
955    /// time that key's value becomes greater than or equal to the corresponding threshold value.
956    /// The emitted value for each key is the threshold value itself.
957    ///
958    /// This requires the keyed singleton to have monotonic values ([`MonotonicValue`] or [`Bounded`]),
959    /// because otherwise the threshold detection would be non-deterministic.
960    ///
961    /// The `thresholds` parameter is a [`BoundedValue`] keyed singleton mapping each key to its
962    /// threshold. Thresholds may arrive asynchronously (new keys appear over time), but once set
963    /// for a key, the threshold value is fixed. Late-arriving thresholds are checked against the
964    /// current snapshot value immediately.
965    ///
966    /// # Example
967    /// ```rust,ignore
968    /// use hydro_lang::prelude::*;
969    ///
970    /// // Given a monotonically increasing keyed singleton (e.g. from fold with monotone proof)
971    /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
972    ///     .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 is monotone */)));
973    ///
974    /// // BoundedValue keyed singleton of thresholds (from .first())
975    /// let thresholds = threshold_source.into_keyed().first();
976    ///
977    /// // Emits (key, threshold_value) the first time each key's value >= threshold
978    /// let crossed = counts.threshold_greater_or_equal(thresholds);
979    /// ```
980    pub fn threshold_greater_or_equal(
981        self,
982        thresholds: KeyedSingleton<K, V, L, BoundedValue>,
983    ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
984    where
985        K: Clone + Eq + Hash,
986        V: Clone + PartialOrd,
987        B: IsKeyedMonotonic,
988    {
989        let self_location = self.location.clone();
990        match B::bound_kind() {
991            KeyedSingletonBoundKind::Bounded => {
992                // Bounded case: self is already fixed, just join and filter
993                let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
994                    self.location.clone(),
995                    self.ir_node.replace(HydroNode::Placeholder),
996                );
997                let result = me
998                    .entries()
999                    .join(thresholds.entries())
1000                    .filter_map(q!(|(k, (val, thresh))| {
1001                        if val >= thresh {
1002                            Some((k, thresh))
1003                        } else {
1004                            None
1005                        }
1006                    }))
1007                    .into_keyed();
1008                KeyedStream::new(
1009                    result.location.clone(),
1010                    result.ir_node.replace(HydroNode::Placeholder),
1011                )
1012            }
1013            KeyedSingletonBoundKind::MonotonicValue => {
1014                let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1015                    self.location.clone(),
1016                    self.ir_node.replace(HydroNode::Placeholder),
1017                );
1018
1019                let result = sliced! {
1020                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1021                    let thresh_snapshot =
1022                        use(thresholds, nondet!(/** thresholds are deterministic */));
1023                    let mut already_crossed =
1024                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1025
1026                    let joined = thresh_snapshot.entries().join(snapshot.entries());
1027                    let passed = joined
1028                        .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1029                        .map(q!(|(k, (thresh, _))| (k, thresh)));
1030
1031                    let newly_crossed = passed.anti_join(already_crossed.clone());
1032                    already_crossed =
1033                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1034
1035                    newly_crossed.into_keyed()
1036                };
1037
1038                KeyedStream::new(
1039                    self_location,
1040                    result.ir_node.replace(HydroNode::Placeholder),
1041                )
1042            }
1043            KeyedSingletonBoundKind::BoundedValue => {
1044                let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1045                    self.location.clone(),
1046                    self.ir_node.replace(HydroNode::Placeholder),
1047                );
1048
1049                let result = sliced! {
1050                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1051                    let thresh_snapshot =
1052                        use(thresholds, nondet!(/** thresholds are deterministic */));
1053                    let mut already_crossed =
1054                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1055
1056                    let joined = thresh_snapshot.entries().join(snapshot.entries());
1057                    let passed = joined
1058                        .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1059                        .map(q!(|(k, (thresh, _))| (k, thresh)));
1060
1061                    let newly_crossed = passed.anti_join(already_crossed.clone());
1062                    already_crossed =
1063                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1064
1065                    newly_crossed.into_keyed()
1066                };
1067
1068                KeyedStream::new(
1069                    self_location,
1070                    result.ir_node.replace(HydroNode::Placeholder),
1071                )
1072            }
1073            _ => {
1074                unreachable!(
1075                    "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1076                )
1077            }
1078        }
1079    }
1080
1081    /// Like [`Self::threshold_greater_or_equal`], but uses a single [`Singleton`] threshold
1082    /// shared across all keys. Emits a `(K, V)` event for each key the first time that key's
1083    /// value becomes >= the threshold. The emitted value is the threshold itself.
1084    ///
1085    /// Because the threshold is a [`Bounded`] singleton, it is a compile-time constant and
1086    /// does not carry ongoing memory cost.
1087    ///
1088    /// # Example
1089    /// ```rust,ignore
1090    /// use hydro_lang::prelude::*;
1091    ///
1092    /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
1093    ///     .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 */)));
1094    ///
1095    /// let threshold = process.singleton(q!(5usize));
1096    /// let crossed = counts.threshold_greater_or_equal_uniform(threshold);
1097    /// ```
1098    pub fn threshold_greater_or_equal_uniform(
1099        self,
1100        threshold: Singleton<V, L, Bounded>,
1101    ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1102    where
1103        K: Clone + Eq + Hash,
1104        V: Clone + PartialOrd,
1105        B: IsKeyedMonotonic,
1106    {
1107        let self_location = self.location.clone();
1108        match B::bound_kind() {
1109            KeyedSingletonBoundKind::Bounded => {
1110                let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1111                    self.location.clone(),
1112                    self.ir_node.replace(HydroNode::Placeholder),
1113                );
1114                let result = me
1115                    .entries()
1116                    .cross_singleton(threshold)
1117                    .filter_map(q!(|((k, val), thresh)| {
1118                        if val >= thresh {
1119                            Some((k, thresh))
1120                        } else {
1121                            None
1122                        }
1123                    }))
1124                    .into_keyed();
1125                KeyedStream::new(
1126                    result.location.clone(),
1127                    result.ir_node.replace(HydroNode::Placeholder),
1128                )
1129            }
1130            KeyedSingletonBoundKind::MonotonicValue => {
1131                let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1132                    self.location.clone(),
1133                    self.ir_node.replace(HydroNode::Placeholder),
1134                );
1135
1136                let result = sliced! {
1137                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1138                    let mut already_crossed =
1139                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1140
1141                    let tick = snapshot.location().clone();
1142                    let thresh_in_tick = threshold.clone_into_tick(&tick);
1143
1144                    let crossing = snapshot
1145                        .entries()
1146                        .cross_singleton(thresh_in_tick)
1147                        .filter_map(q!(|((k, val), thresh)| {
1148                            if val >= thresh {
1149                                Some((k, thresh))
1150                            } else {
1151                                None
1152                            }
1153                        }));
1154
1155                    let newly_crossed = crossing.anti_join(already_crossed.clone());
1156                    already_crossed =
1157                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1158
1159                    newly_crossed.into_keyed()
1160                };
1161
1162                KeyedStream::new(
1163                    self_location,
1164                    result.ir_node.replace(HydroNode::Placeholder),
1165                )
1166            }
1167            KeyedSingletonBoundKind::BoundedValue => {
1168                let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1169                    self.location.clone(),
1170                    self.ir_node.replace(HydroNode::Placeholder),
1171                );
1172
1173                let result = sliced! {
1174                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1175                    let mut already_crossed =
1176                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1177
1178                    let tick = snapshot.location().clone();
1179                    let thresh_in_tick = threshold.clone_into_tick(&tick);
1180
1181                    let crossing = snapshot
1182                        .entries()
1183                        .cross_singleton(thresh_in_tick)
1184                        .filter_map(q!(|((k, val), thresh)| {
1185                            if val >= thresh {
1186                                Some((k, thresh))
1187                            } else {
1188                                None
1189                            }
1190                        }));
1191
1192                    let newly_crossed = crossing.anti_join(already_crossed.clone());
1193                    already_crossed =
1194                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1195
1196                    newly_crossed.into_keyed()
1197                };
1198
1199                KeyedStream::new(
1200                    self_location,
1201                    result.ir_node.replace(HydroNode::Placeholder),
1202                )
1203            }
1204            _ => {
1205                unreachable!(
1206                    "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1207                )
1208            }
1209        }
1210    }
1211}
1212
1213impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
1214    KeyedSingleton<K, V, L, B>
1215{
1216    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
1217    ///
1218    /// The value for each key must be bounded, otherwise the resulting stream elements would be
1219    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1220    /// into the output.
1221    ///
1222    /// # Example
1223    /// ```rust
1224    /// # #[cfg(feature = "deploy")] {
1225    /// # use hydro_lang::prelude::*;
1226    /// # use futures::StreamExt;
1227    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1228    /// let keyed_singleton = // { 1: 2, 2: 4 }
1229    /// # process
1230    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1231    /// #     .into_keyed()
1232    /// #     .first();
1233    /// keyed_singleton.entries()
1234    /// # }, |mut stream| async move {
1235    /// // (1, 2), (2, 4) in any order
1236    /// # let mut results = Vec::new();
1237    /// # for _ in 0..2 {
1238    /// #     results.push(stream.next().await.unwrap());
1239    /// # }
1240    /// # results.sort();
1241    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1242    /// # }));
1243    /// # }
1244    /// ```
1245    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1246        self.into_keyed_stream().entries()
1247    }
1248
1249    /// Flattens the keyed singleton into an unordered stream of just the values.
1250    ///
1251    /// The value for each key must be bounded, otherwise the resulting stream elements would be
1252    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1253    /// into the output.
1254    ///
1255    /// # Example
1256    /// ```rust
1257    /// # #[cfg(feature = "deploy")] {
1258    /// # use hydro_lang::prelude::*;
1259    /// # use futures::StreamExt;
1260    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1261    /// let keyed_singleton = // { 1: 2, 2: 4 }
1262    /// # process
1263    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1264    /// #     .into_keyed()
1265    /// #     .first();
1266    /// keyed_singleton.values()
1267    /// # }, |mut stream| async move {
1268    /// // 2, 4 in any order
1269    /// # let mut results = Vec::new();
1270    /// # for _ in 0..2 {
1271    /// #     results.push(stream.next().await.unwrap());
1272    /// # }
1273    /// # results.sort();
1274    /// # assert_eq!(results, vec![2, 4]);
1275    /// # }));
1276    /// # }
1277    /// ```
1278    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1279        let map_f = q!(|(_, v)| v)
1280            .splice_fn1_ctx::<(K, V), V>(&self.location)
1281            .into();
1282
1283        Stream::new(
1284            self.location.clone(),
1285            HydroNode::Map {
1286                f: map_f,
1287                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1288                metadata: self.location.new_node_metadata(Stream::<
1289                    V,
1290                    L,
1291                    B::UnderlyingBound,
1292                    NoOrder,
1293                    ExactlyOnce,
1294                >::collection_kind()),
1295            },
1296        )
1297    }
1298
1299    /// Flattens the keyed singleton into an unordered stream of just the keys.
1300    ///
1301    /// The value for each key must be bounded, otherwise the removal of keys would result in
1302    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1303    /// into the output.
1304    ///
1305    /// # Example
1306    /// ```rust
1307    /// # #[cfg(feature = "deploy")] {
1308    /// # use hydro_lang::prelude::*;
1309    /// # use futures::StreamExt;
1310    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1311    /// let keyed_singleton = // { 1: 2, 2: 4 }
1312    /// # process
1313    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1314    /// #     .into_keyed()
1315    /// #     .first();
1316    /// keyed_singleton.keys()
1317    /// # }, |mut stream| async move {
1318    /// // 1, 2 in any order
1319    /// # let mut results = Vec::new();
1320    /// # for _ in 0..2 {
1321    /// #     results.push(stream.next().await.unwrap());
1322    /// # }
1323    /// # results.sort();
1324    /// # assert_eq!(results, vec![1, 2]);
1325    /// # }));
1326    /// # }
1327    /// ```
1328    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1329        self.entries().map(q!(|(k, _)| k))
1330    }
1331
1332    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1333    /// entries whose keys are not in the provided stream.
1334    ///
1335    /// # Example
1336    /// ```rust
1337    /// # #[cfg(feature = "deploy")] {
1338    /// # use hydro_lang::prelude::*;
1339    /// # use futures::StreamExt;
1340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1341    /// let tick = process.tick();
1342    /// let keyed_singleton = // { 1: 2, 2: 4 }
1343    /// # process
1344    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1345    /// #     .into_keyed()
1346    /// #     .first()
1347    /// #     .batch(&tick, nondet!(/** test */));
1348    /// let keys_to_remove = process
1349    ///     .source_iter(q!(vec![1]))
1350    ///     .batch(&tick, nondet!(/** test */));
1351    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1352    /// #   .entries().all_ticks()
1353    /// # }, |mut stream| async move {
1354    /// // { 2: 4 }
1355    /// # for w in vec![(2, 4)] {
1356    /// #     assert_eq!(stream.next().await.unwrap(), w);
1357    /// # }
1358    /// # }));
1359    /// # }
1360    /// ```
1361    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1362        self,
1363        other: Stream<K, L, Bounded, O2, R2>,
1364    ) -> Self
1365    where
1366        K: Hash + Eq,
1367    {
1368        check_matching_location(&self.location, &other.location);
1369
1370        KeyedSingleton::new(
1371            self.location.clone(),
1372            HydroNode::AntiJoin {
1373                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1374                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1375                metadata: self.location.new_node_metadata(Self::collection_kind()),
1376            },
1377        )
1378    }
1379
1380    /// An operator which allows you to "inspect" each value of a keyed singleton without
1381    /// modifying it. The closure `f` is called on a reference to each value. This is
1382    /// mainly useful for debugging, and should not be used to generate side-effects.
1383    ///
1384    /// # Example
1385    /// ```rust
1386    /// # #[cfg(feature = "deploy")] {
1387    /// # use hydro_lang::prelude::*;
1388    /// # use futures::StreamExt;
1389    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1390    /// let keyed_singleton = // { 1: 2, 2: 4 }
1391    /// # process
1392    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1393    /// #     .into_keyed()
1394    /// #     .first();
1395    /// keyed_singleton
1396    ///     .inspect(q!(|v| println!("{}", v)))
1397    /// #   .entries()
1398    /// # }, |mut stream| async move {
1399    /// // { 1: 2, 2: 4 }
1400    /// # for w in vec![(1, 2), (2, 4)] {
1401    /// #     assert_eq!(stream.next().await.unwrap(), w);
1402    /// # }
1403    /// # }));
1404    /// # }
1405    /// ```
1406    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1407    where
1408        F: Fn(&V) + 'a,
1409    {
1410        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1411        let inspect_f = q!({
1412            let orig = f;
1413            move |t: &(_, _)| orig(&t.1)
1414        })
1415        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1416        .into();
1417
1418        KeyedSingleton::new(
1419            self.location.clone(),
1420            HydroNode::Inspect {
1421                f: inspect_f,
1422                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1423                metadata: self.location.new_node_metadata(Self::collection_kind()),
1424            },
1425        )
1426    }
1427
1428    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1429    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1430    /// mainly useful for debugging, and should not be used to generate side-effects.
1431    ///
1432    /// # Example
1433    /// ```rust
1434    /// # #[cfg(feature = "deploy")] {
1435    /// # use hydro_lang::prelude::*;
1436    /// # use futures::StreamExt;
1437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438    /// let keyed_singleton = // { 1: 2, 2: 4 }
1439    /// # process
1440    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1441    /// #     .into_keyed()
1442    /// #     .first();
1443    /// keyed_singleton
1444    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1445    /// #   .entries()
1446    /// # }, |mut stream| async move {
1447    /// // { 1: 2, 2: 4 }
1448    /// # for w in vec![(1, 2), (2, 4)] {
1449    /// #     assert_eq!(stream.next().await.unwrap(), w);
1450    /// # }
1451    /// # }));
1452    /// # }
1453    /// ```
1454    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1455    where
1456        F: Fn(&(K, V)) + 'a,
1457    {
1458        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1459
1460        KeyedSingleton::new(
1461            self.location.clone(),
1462            HydroNode::Inspect {
1463                f: inspect_f,
1464                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1465                metadata: self.location.new_node_metadata(Self::collection_kind()),
1466            },
1467        )
1468    }
1469
1470    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1471    ///
1472    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1473    /// asynchronously updated if a new key is added that is higher than the previous max key.
1474    ///
1475    /// # Example
1476    /// ```rust
1477    /// # #[cfg(feature = "deploy")] {
1478    /// # use hydro_lang::prelude::*;
1479    /// # use futures::StreamExt;
1480    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1481    /// let tick = process.tick();
1482    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1483    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1484    /// #     .into_keyed()
1485    /// #     .first();
1486    /// keyed_singleton.get_max_key()
1487    /// # .sample_eager(nondet!(/** test */))
1488    /// # }, |mut stream| async move {
1489    /// // (2, 456)
1490    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1491    /// # }));
1492    /// # }
1493    /// ```
1494    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1495    where
1496        K: Ord,
1497    {
1498        self.entries()
1499            .assume_ordering_trusted(nondet!(
1500                /// There is only one element associated with each key, and the keys are totallly
1501                /// ordered so we will produce a deterministic value. The closure technically
1502                /// isn't commutative in the case where both passed entries have the same key
1503                /// but different values.
1504                ///
1505                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1506                /// the two inputs do not have the same key.
1507            ))
1508            .reduce(q!(
1509                move |curr, new| {
1510                    if new.0 > curr.0 {
1511                        *curr = new;
1512                    }
1513                },
1514                idempotent = manual_proof!(/** repeated elements are ignored */)
1515            ))
1516    }
1517
1518    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1519    /// element, the value.
1520    ///
1521    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1522    ///
1523    /// # Example
1524    /// ```rust
1525    /// # #[cfg(feature = "deploy")] {
1526    /// # use hydro_lang::prelude::*;
1527    /// # use futures::StreamExt;
1528    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1529    /// let keyed_singleton = // { 1: 2, 2: 4 }
1530    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1531    /// #     .into_keyed()
1532    /// #     .first();
1533    /// keyed_singleton
1534    ///     .clone()
1535    ///     .into_keyed_stream()
1536    ///     .merge_unordered(
1537    ///         keyed_singleton.into_keyed_stream()
1538    ///     )
1539    /// #   .entries()
1540    /// # }, |mut stream| async move {
1541    /// /// // { 1: [2, 2], 2: [4, 4] }
1542    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1543    /// #     assert_eq!(stream.next().await.unwrap(), w);
1544    /// # }
1545    /// # }));
1546    /// # }
1547    /// ```
1548    pub fn into_keyed_stream(
1549        self,
1550    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1551        KeyedStream::new(
1552            self.location.clone(),
1553            HydroNode::Cast {
1554                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1555                metadata: self.location.new_node_metadata(KeyedStream::<
1556                    K,
1557                    V,
1558                    L,
1559                    B::UnderlyingBound,
1560                    TotalOrder,
1561                    ExactlyOnce,
1562                >::collection_kind()),
1563            },
1564        )
1565    }
1566}
1567
1568impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1569where
1570    L: Location<'a>,
1571{
1572    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1573    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1574    ///
1575    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1576    /// processed before an acknowledgement is emitted.
1577    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1578        let id = self.location.flow_state().borrow_mut().next_clock_id();
1579        let out_location = Atomic {
1580            tick: Tick {
1581                id,
1582                l: self.location.clone(),
1583            },
1584        };
1585        KeyedSingleton::new(
1586            out_location.clone(),
1587            HydroNode::BeginAtomic {
1588                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1589                metadata: out_location
1590                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1591            },
1592        )
1593    }
1594}
1595
1596impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1597where
1598    L: Location<'a>,
1599{
1600    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1601    /// See [`KeyedSingleton::atomic`] for more details.
1602    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1603        KeyedSingleton::new(
1604            self.location.tick.l.clone(),
1605            HydroNode::EndAtomic {
1606                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1607                metadata: self
1608                    .location
1609                    .tick
1610                    .l
1611                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1612            },
1613        )
1614    }
1615}
1616
1617impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1618    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1619    /// tick `T` always has the entries of `self` at tick `T - 1`.
1620    ///
1621    /// At tick `0`, the output has no entries, since there is no previous tick.
1622    ///
1623    /// This operator enables stateful iterative processing with ticks, by sending data from one
1624    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1625    ///
1626    /// # Example
1627    /// ```rust
1628    /// # #[cfg(feature = "deploy")] {
1629    /// # use hydro_lang::prelude::*;
1630    /// # use futures::StreamExt;
1631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1632    /// let tick = process.tick();
1633    /// # // ticks are lazy by default, forces the second tick to run
1634    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1635    /// # let batch_first_tick = process
1636    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1637    /// #   .batch(&tick, nondet!(/** test */))
1638    /// #   .into_keyed();
1639    /// # let batch_second_tick = process
1640    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1641    /// #   .batch(&tick, nondet!(/** test */))
1642    /// #   .into_keyed()
1643    /// #   .defer_tick(); // appears on the second tick
1644    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1645    /// # batch_first_tick.chain(batch_second_tick).first();
1646    /// input_batch.clone().filter_key_not_in(
1647    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1648    /// )
1649    /// # .entries().all_ticks()
1650    /// # }, |mut stream| async move {
1651    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1652    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1653    /// #     assert_eq!(stream.next().await.unwrap(), w);
1654    /// # }
1655    /// # }));
1656    /// # }
1657    /// ```
1658    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1659        KeyedSingleton::new(
1660            self.location.clone(),
1661            HydroNode::DeferTick {
1662                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1663                metadata: self
1664                    .location
1665                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1666            },
1667        )
1668    }
1669}
1670
1671impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1672where
1673    L: Location<'a>,
1674{
1675    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1676    /// point in time.
1677    ///
1678    /// # Non-Determinism
1679    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1680    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1681    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1682        self,
1683        tick: &Tick<L2>,
1684        _nondet: NonDet,
1685    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1686        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1687        KeyedSingleton::new(
1688            tick.drop_consistency(),
1689            HydroNode::Batch {
1690                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1691                metadata: tick
1692                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1693            },
1694        )
1695    }
1696}
1697
1698impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1699where
1700    L: Location<'a>,
1701{
1702    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1703    /// state of the keyed singleton being atomically processed.
1704    ///
1705    /// # Non-Determinism
1706    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1707    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1708    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1709        self,
1710        tick: &Tick<L2>,
1711        _nondet: NonDet,
1712    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1713        KeyedSingleton::new(
1714            tick.drop_consistency(),
1715            HydroNode::Batch {
1716                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1717                metadata: tick
1718                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1719            },
1720        )
1721    }
1722}
1723
1724impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1725where
1726    L: Location<'a>,
1727{
1728    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1729    ///
1730    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1731    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1732    /// is filtered out.
1733    ///
1734    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1735    /// not modify or take ownership of the values. If you need to modify the values while filtering
1736    /// use [`KeyedSingleton::filter_map`] instead.
1737    ///
1738    /// # Example
1739    /// ```rust
1740    /// # #[cfg(feature = "deploy")] {
1741    /// # use hydro_lang::prelude::*;
1742    /// # use futures::StreamExt;
1743    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1744    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1745    /// # process
1746    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1747    /// #     .into_keyed()
1748    /// #     .first();
1749    /// keyed_singleton.filter(q!(|&v| v > 1))
1750    /// #   .entries()
1751    /// # }, |mut stream| async move {
1752    /// // { 1: 2, 2: 4 }
1753    /// # let mut results = Vec::new();
1754    /// # for _ in 0..2 {
1755    /// #     results.push(stream.next().await.unwrap());
1756    /// # }
1757    /// # results.sort();
1758    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1759    /// # }));
1760    /// # }
1761    /// ```
1762    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1763    where
1764        F: Fn(&V) -> bool + 'a,
1765    {
1766        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1767        let filter_f = q!({
1768            let orig = f;
1769            move |t: &(_, _)| orig(&t.1)
1770        })
1771        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1772        .into();
1773
1774        KeyedSingleton::new(
1775            self.location.clone(),
1776            HydroNode::Filter {
1777                f: filter_f,
1778                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1779                metadata: self
1780                    .location
1781                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1782            },
1783        )
1784    }
1785
1786    /// An operator that both filters and maps values. It yields only the key-value pairs where
1787    /// the supplied closure `f` returns `Some(value)`.
1788    ///
1789    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1790    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1791    /// If it returns `None`, the key-value pair is filtered out.
1792    ///
1793    /// # Example
1794    /// ```rust
1795    /// # #[cfg(feature = "deploy")] {
1796    /// # use hydro_lang::prelude::*;
1797    /// # use futures::StreamExt;
1798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1799    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1800    /// # process
1801    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1802    /// #     .into_keyed()
1803    /// #     .first();
1804    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1805    /// #   .entries()
1806    /// # }, |mut stream| async move {
1807    /// // { 1: 42, 3: 100 }
1808    /// # let mut results = Vec::new();
1809    /// # for _ in 0..2 {
1810    /// #     results.push(stream.next().await.unwrap());
1811    /// # }
1812    /// # results.sort();
1813    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1814    /// # }));
1815    /// # }
1816    /// ```
1817    pub fn filter_map<F, U>(
1818        self,
1819        f: impl IntoQuotedMut<'a, F, L> + Copy,
1820    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1821    where
1822        F: Fn(V) -> Option<U> + 'a,
1823    {
1824        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1825        let filter_map_f = q!({
1826            let orig = f;
1827            move |(k, v)| orig(v).map(|o| (k, o))
1828        })
1829        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1830        .into();
1831
1832        KeyedSingleton::new(
1833            self.location.clone(),
1834            HydroNode::FilterMap {
1835                f: filter_map_f,
1836                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1837                metadata: self.location.new_node_metadata(KeyedSingleton::<
1838                    K,
1839                    U,
1840                    L,
1841                    B::EraseMonotonic,
1842                >::collection_kind()),
1843            },
1844        )
1845    }
1846
1847    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1848    /// arrived since the previous batch was released.
1849    ///
1850    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1851    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1852    ///
1853    /// # Non-Determinism
1854    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1855    /// has a non-deterministic set of key-value pairs.
1856    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1857        self,
1858        tick: &Tick<L2>,
1859        _nondet: NonDet,
1860    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1861        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1862        KeyedSingleton::new(
1863            tick.drop_consistency(),
1864            HydroNode::Batch {
1865                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1866                metadata: tick
1867                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1868            },
1869        )
1870    }
1871}
1872
1873impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1874where
1875    L: Location<'a>,
1876{
1877    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1878    /// atomically processed.
1879    ///
1880    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1881    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1882    ///
1883    /// # Non-Determinism
1884    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1885    /// has a non-deterministic set of key-value pairs.
1886    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1887        self,
1888        tick: &Tick<L2>,
1889        nondet: NonDet,
1890    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1891        let _ = nondet;
1892        KeyedSingleton::new(
1893            tick.drop_consistency(),
1894            HydroNode::Batch {
1895                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1896                metadata: tick
1897                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1898            },
1899        )
1900    }
1901}
1902
1903#[cfg(test)]
1904mod tests {
1905    #[cfg(feature = "deploy")]
1906    use futures::{SinkExt, StreamExt};
1907    #[cfg(feature = "deploy")]
1908    use hydro_deploy::Deployment;
1909    #[cfg(any(feature = "deploy", feature = "sim"))]
1910    use stageleft::q;
1911
1912    #[cfg(any(feature = "deploy", feature = "sim"))]
1913    use crate::compile::builder::FlowBuilder;
1914    #[cfg(any(feature = "deploy", feature = "sim"))]
1915    use crate::location::Location;
1916    #[cfg(any(feature = "deploy", feature = "sim"))]
1917    use crate::nondet::nondet;
1918
1919    #[cfg(feature = "deploy")]
1920    #[tokio::test]
1921    async fn key_count_bounded_value() {
1922        let mut deployment = Deployment::new();
1923
1924        let mut flow = FlowBuilder::new();
1925        let node = flow.process::<()>();
1926        let external = flow.external::<()>();
1927
1928        let (input_port, input) = node.source_external_bincode(&external);
1929        let out = input
1930            .into_keyed()
1931            .first()
1932            .key_count()
1933            .sample_eager(nondet!(/** test */))
1934            .send_bincode_external(&external);
1935
1936        let nodes = flow
1937            .with_process(&node, deployment.Localhost())
1938            .with_external(&external, deployment.Localhost())
1939            .deploy(&mut deployment);
1940
1941        deployment.deploy().await.unwrap();
1942
1943        let mut external_in = nodes.connect(input_port).await;
1944        let mut external_out = nodes.connect(out).await;
1945
1946        deployment.start().await.unwrap();
1947
1948        assert_eq!(external_out.next().await.unwrap(), 0);
1949
1950        external_in.send((1, 1)).await.unwrap();
1951        assert_eq!(external_out.next().await.unwrap(), 1);
1952
1953        external_in.send((2, 2)).await.unwrap();
1954        assert_eq!(external_out.next().await.unwrap(), 2);
1955    }
1956
1957    #[cfg(feature = "deploy")]
1958    #[tokio::test]
1959    async fn key_count_unbounded_value() {
1960        let mut deployment = Deployment::new();
1961
1962        let mut flow = FlowBuilder::new();
1963        let node = flow.process::<()>();
1964        let external = flow.external::<()>();
1965
1966        let (input_port, input) = node.source_external_bincode(&external);
1967        let out = input
1968            .into_keyed()
1969            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1970            .key_count()
1971            .sample_eager(nondet!(/** test */))
1972            .send_bincode_external(&external);
1973
1974        let nodes = flow
1975            .with_process(&node, deployment.Localhost())
1976            .with_external(&external, deployment.Localhost())
1977            .deploy(&mut deployment);
1978
1979        deployment.deploy().await.unwrap();
1980
1981        let mut external_in = nodes.connect(input_port).await;
1982        let mut external_out = nodes.connect(out).await;
1983
1984        deployment.start().await.unwrap();
1985
1986        assert_eq!(external_out.next().await.unwrap(), 0);
1987
1988        external_in.send((1, 1)).await.unwrap();
1989        assert_eq!(external_out.next().await.unwrap(), 1);
1990
1991        external_in.send((1, 2)).await.unwrap();
1992        assert_eq!(external_out.next().await.unwrap(), 1);
1993
1994        external_in.send((2, 2)).await.unwrap();
1995        assert_eq!(external_out.next().await.unwrap(), 2);
1996
1997        external_in.send((1, 1)).await.unwrap();
1998        assert_eq!(external_out.next().await.unwrap(), 2);
1999
2000        external_in.send((3, 1)).await.unwrap();
2001        assert_eq!(external_out.next().await.unwrap(), 3);
2002    }
2003
2004    #[cfg(feature = "deploy")]
2005    #[tokio::test]
2006    async fn into_singleton_bounded_value() {
2007        let mut deployment = Deployment::new();
2008
2009        let mut flow = FlowBuilder::new();
2010        let node = flow.process::<()>();
2011        let external = flow.external::<()>();
2012
2013        let (input_port, input) = node.source_external_bincode(&external);
2014        let out = input
2015            .into_keyed()
2016            .first()
2017            .into_singleton()
2018            .sample_eager(nondet!(/** test */))
2019            .send_bincode_external(&external);
2020
2021        let nodes = flow
2022            .with_process(&node, deployment.Localhost())
2023            .with_external(&external, deployment.Localhost())
2024            .deploy(&mut deployment);
2025
2026        deployment.deploy().await.unwrap();
2027
2028        let mut external_in = nodes.connect(input_port).await;
2029        let mut external_out = nodes.connect(out).await;
2030
2031        deployment.start().await.unwrap();
2032
2033        assert_eq!(
2034            external_out.next().await.unwrap(),
2035            std::collections::HashMap::new()
2036        );
2037
2038        external_in.send((1, 1)).await.unwrap();
2039        assert_eq!(
2040            external_out.next().await.unwrap(),
2041            vec![(1, 1)].into_iter().collect()
2042        );
2043
2044        external_in.send((2, 2)).await.unwrap();
2045        assert_eq!(
2046            external_out.next().await.unwrap(),
2047            vec![(1, 1), (2, 2)].into_iter().collect()
2048        );
2049    }
2050
2051    #[cfg(feature = "deploy")]
2052    #[tokio::test]
2053    async fn into_singleton_unbounded_value() {
2054        let mut deployment = Deployment::new();
2055
2056        let mut flow = FlowBuilder::new();
2057        let node = flow.process::<()>();
2058        let external = flow.external::<()>();
2059
2060        let (input_port, input) = node.source_external_bincode(&external);
2061        let out = input
2062            .into_keyed()
2063            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2064            .into_singleton()
2065            .sample_eager(nondet!(/** test */))
2066            .send_bincode_external(&external);
2067
2068        let nodes = flow
2069            .with_process(&node, deployment.Localhost())
2070            .with_external(&external, deployment.Localhost())
2071            .deploy(&mut deployment);
2072
2073        deployment.deploy().await.unwrap();
2074
2075        let mut external_in = nodes.connect(input_port).await;
2076        let mut external_out = nodes.connect(out).await;
2077
2078        deployment.start().await.unwrap();
2079
2080        assert_eq!(
2081            external_out.next().await.unwrap(),
2082            std::collections::HashMap::new()
2083        );
2084
2085        external_in.send((1, 1)).await.unwrap();
2086        assert_eq!(
2087            external_out.next().await.unwrap(),
2088            vec![(1, 1)].into_iter().collect()
2089        );
2090
2091        external_in.send((1, 2)).await.unwrap();
2092        assert_eq!(
2093            external_out.next().await.unwrap(),
2094            vec![(1, 2)].into_iter().collect()
2095        );
2096
2097        external_in.send((2, 2)).await.unwrap();
2098        assert_eq!(
2099            external_out.next().await.unwrap(),
2100            vec![(1, 2), (2, 1)].into_iter().collect()
2101        );
2102
2103        external_in.send((1, 1)).await.unwrap();
2104        assert_eq!(
2105            external_out.next().await.unwrap(),
2106            vec![(1, 3), (2, 1)].into_iter().collect()
2107        );
2108
2109        external_in.send((3, 1)).await.unwrap();
2110        assert_eq!(
2111            external_out.next().await.unwrap(),
2112            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
2113        );
2114    }
2115
2116    #[cfg(feature = "sim")]
2117    #[test]
2118    fn sim_unbounded_singleton_snapshot() {
2119        let mut flow = FlowBuilder::new();
2120        let node = flow.process::<()>();
2121
2122        let (input_port, input) = node.sim_input();
2123        let output = input
2124            .into_keyed()
2125            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2126            .snapshot(&node.tick(), nondet!(/** test */))
2127            .entries()
2128            .all_ticks()
2129            .sim_output();
2130
2131        let count = flow.sim().exhaustive(async || {
2132            input_port.send((1, 123));
2133            input_port.send((1, 456));
2134            input_port.send((2, 123));
2135
2136            let all = output.collect_sorted::<Vec<_>>().await;
2137            assert_eq!(all.last().unwrap(), &(2, 1));
2138        });
2139
2140        assert_eq!(count, 8);
2141    }
2142
2143    #[cfg(feature = "deploy")]
2144    #[tokio::test]
2145    async fn join_keyed_stream() {
2146        let mut deployment = Deployment::new();
2147
2148        let mut flow = FlowBuilder::new();
2149        let node = flow.process::<()>();
2150        let external = flow.external::<()>();
2151
2152        let tick = node.tick();
2153        let keyed_data = node
2154            .source_iter(q!(vec![(1, 10), (2, 20)]))
2155            .into_keyed()
2156            .batch(&tick, nondet!(/** test */))
2157            .first();
2158        let requests = node
2159            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
2160            .into_keyed()
2161            .batch(&tick, nondet!(/** test */));
2162
2163        let out = keyed_data
2164            .join_keyed_stream(requests)
2165            .entries()
2166            .all_ticks()
2167            .send_bincode_external(&external);
2168
2169        let nodes = flow
2170            .with_process(&node, deployment.Localhost())
2171            .with_external(&external, deployment.Localhost())
2172            .deploy(&mut deployment);
2173
2174        deployment.deploy().await.unwrap();
2175
2176        let mut external_out = nodes.connect(out).await;
2177
2178        deployment.start().await.unwrap();
2179
2180        let mut results = vec![];
2181        for _ in 0..2 {
2182            results.push(external_out.next().await.unwrap());
2183        }
2184        results.sort();
2185
2186        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
2187    }
2188
2189    #[cfg(feature = "sim")]
2190    #[test]
2191    fn threshold_greater_or_equal_monotonic() {
2192        let mut flow = FlowBuilder::new();
2193        let node = flow.process::<()>();
2194
2195        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2196        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2197
2198        // Create a monotonically increasing keyed singleton via fold with monotone proof
2199        let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2200            input.into_keyed().fold(
2201                q!(|| 0usize),
2202                q!(
2203                    |acc, v| *acc += v,
2204                    monotone = crate::properties::manual_proof!(/** += is monotonic */)
2205                ),
2206            );
2207
2208        // BoundedValue keyed singleton of thresholds (from .first() on unbounded stream)
2209        let thresholds = thresh_input.into_keyed().first();
2210
2211        let output = counts
2212            .threshold_greater_or_equal(thresholds)
2213            .entries()
2214            .sim_output();
2215
2216        let count = flow.sim().exhaustive(async || {
2217            // Set thresholds: key 1 needs value >= 5, key 2 needs value >= 10
2218            thresh_port.send((1, 5));
2219            thresh_port.send((2, 10));
2220
2221            // key 1 gets increments: 3 + 3 = 6, which is >= 5 ✓
2222            input_port.send((1, 3));
2223            input_port.send((1, 3));
2224            // key 2 gets increments: 3 + 3 = 6, which is < 10 ✗
2225            input_port.send((2, 3));
2226            input_port.send((2, 3));
2227
2228            let results = output.collect_sorted::<Vec<_>>().await;
2229            assert_eq!(results, vec![(1, 5)]);
2230        });
2231
2232        assert!(count > 0);
2233    }
2234
2235    #[cfg(feature = "sim")]
2236    #[test]
2237    fn threshold_greater_or_equal_uniform() {
2238        let mut flow = FlowBuilder::new();
2239        let node = flow.process::<()>();
2240
2241        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2242
2243        let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2244            input.into_keyed().fold(
2245                q!(|| 0usize),
2246                q!(
2247                    |acc, v| *acc += v,
2248                    monotone = crate::properties::manual_proof!(/** += is monotonic */)
2249                ),
2250            );
2251
2252        // Uniform threshold: all keys need value >= 5
2253        let threshold = node.singleton(q!(5usize));
2254
2255        let output = counts
2256            .threshold_greater_or_equal_uniform(threshold)
2257            .entries()
2258            .sim_output();
2259
2260        let count = flow.sim().exhaustive(async || {
2261            // key 1: 3 + 3 = 6 >= 5 ✓
2262            input_port.send((1, 3));
2263            input_port.send((1, 3));
2264            // key 2: 2 + 2 = 4 < 5 ✗
2265            input_port.send((2, 2));
2266            input_port.send((2, 2));
2267
2268            let results = output.collect_sorted::<Vec<_>>().await;
2269            assert_eq!(results, vec![(1, 5)]);
2270        });
2271
2272        assert!(count > 0);
2273    }
2274
2275    #[cfg(feature = "sim")]
2276    #[test]
2277    fn threshold_greater_or_equal_bounded_value() {
2278        let mut flow = FlowBuilder::new();
2279        let node = flow.process::<()>();
2280
2281        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2282        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2283
2284        // BoundedValue keyed singleton (values fixed once per key via .first())
2285        let values = input.into_keyed().first();
2286
2287        // BoundedValue keyed singleton of thresholds
2288        let thresholds = thresh_input.into_keyed().first();
2289
2290        let output = values
2291            .threshold_greater_or_equal(thresholds)
2292            .entries()
2293            .sim_output();
2294
2295        let count = flow.sim().exhaustive(async || {
2296            // Set thresholds: key 1 needs >= 3, key 2 needs >= 10
2297            thresh_port.send((1, 3));
2298            thresh_port.send((2, 10));
2299
2300            // key 1 gets value 5 >= 3 ✓, key 2 gets value 4 < 10 ✗
2301            input_port.send((1, 5));
2302            input_port.send((2, 4));
2303
2304            let results = output.collect_sorted::<Vec<_>>().await;
2305            assert_eq!(results, vec![(1, 3)]);
2306        });
2307
2308        assert!(count > 0);
2309    }
2310
2311    #[cfg(feature = "sim")]
2312    #[test]
2313    fn threshold_greater_or_equal_uniform_bounded_value() {
2314        let mut flow = FlowBuilder::new();
2315        let node = flow.process::<()>();
2316
2317        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2318
2319        // BoundedValue keyed singleton (values fixed once per key via .first())
2320        let values = input.into_keyed().first();
2321
2322        // Uniform threshold: all keys need value >= 5
2323        let threshold = node.singleton(q!(5usize));
2324
2325        let output = values
2326            .threshold_greater_or_equal_uniform(threshold)
2327            .entries()
2328            .sim_output();
2329
2330        let count = flow.sim().exhaustive(async || {
2331            // key 1 gets value 7 >= 5 ✓, key 2 gets value 3 < 5 ✗
2332            input_port.send((1, 7));
2333            input_port.send((2, 3));
2334
2335            let results = output.collect_sorted::<Vec<_>>().await;
2336            assert_eq!(results, vec![(1, 5)]);
2337        });
2338
2339        assert!(count > 0);
2340    }
2341
2342    #[cfg(feature = "sim")]
2343    #[test]
2344    fn threshold_greater_or_equal_bounded() {
2345        let mut flow = FlowBuilder::new();
2346        let node = flow.process::<()>();
2347
2348        // Bounded keyed singleton (fully known upfront)
2349        let values = node
2350            .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2351            .into_keyed()
2352            .first();
2353
2354        // BoundedValue thresholds (from async source)
2355        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2356        let thresholds = thresh_input.into_keyed().first();
2357
2358        let output = values
2359            .threshold_greater_or_equal(thresholds)
2360            .entries()
2361            .sim_output();
2362
2363        let count = flow.sim().exhaustive(async || {
2364            thresh_port.send((1, 5));
2365            thresh_port.send((2, 10));
2366
2367            // key 1: 6 >= 5 ✓, key 2: 4 < 10 ✗
2368            let results = output.collect_sorted::<Vec<_>>().await;
2369            assert_eq!(results, vec![(1, 5)]);
2370        });
2371
2372        assert!(count > 0);
2373    }
2374
2375    #[cfg(feature = "sim")]
2376    #[test]
2377    fn threshold_greater_or_equal_uniform_bounded() {
2378        let mut flow = FlowBuilder::new();
2379        let node = flow.process::<()>();
2380
2381        let values = node
2382            .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2383            .into_keyed()
2384            .first();
2385        let threshold = node.singleton(q!(5usize));
2386
2387        let output = values
2388            .threshold_greater_or_equal_uniform(threshold)
2389            .entries()
2390            .sim_output();
2391
2392        let count = flow.sim().exhaustive(async || {
2393            // key 1: 6 >= 5 ✓, key 2: 4 < 5 ✗
2394            let results = output.collect_sorted::<Vec<_>>().await;
2395            assert_eq!(results, vec![(1, 5)]);
2396        });
2397
2398        assert!(count > 0);
2399    }
2400}