hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use sealed::sealed;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_stream::KeyedStream;
15use super::optional::Optional;
16use super::singleton::Singleton;
17use super::sliced::sliced;
18use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::stream::{Ordering, Retries};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::manual_proof;
34
35/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
36///
37/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
38/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
39/// indicates that entries may be added over time, but once an entry is added it will never be
40/// removed and its value will never change.
41pub trait KeyedSingletonBound {
42 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
43 type UnderlyingBound: Boundedness;
44 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
45 type ValueBound: Boundedness;
46
47 /// The type of the keyed singleton if the value for each key is immutable.
48 type WithBoundedValue: KeyedSingletonBound<
49 UnderlyingBound = Self::UnderlyingBound,
50 ValueBound = Bounded,
51 EraseMonotonic = Self::WithBoundedValue,
52 >;
53
54 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
55 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
56
57 /// The type of the keyed singleton if the value for each key is no longer monotonic.
58 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
59
60 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
61 fn bound_kind() -> KeyedSingletonBoundKind;
62}
63
64impl KeyedSingletonBound for Unbounded {
65 type UnderlyingBound = Unbounded;
66 type ValueBound = Unbounded;
67 type WithBoundedValue = BoundedValue;
68 type KeyedStreamToMonotone = MonotonicValue;
69 type EraseMonotonic = Unbounded;
70
71 fn bound_kind() -> KeyedSingletonBoundKind {
72 KeyedSingletonBoundKind::Unbounded
73 }
74}
75
76impl KeyedSingletonBound for Bounded {
77 type UnderlyingBound = Bounded;
78 type ValueBound = Bounded;
79 type WithBoundedValue = Bounded;
80 type KeyedStreamToMonotone = Bounded;
81 type EraseMonotonic = Bounded;
82
83 fn bound_kind() -> KeyedSingletonBoundKind {
84 KeyedSingletonBoundKind::Bounded
85 }
86}
87
88/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
89/// its value is bounded and will never change, but new entries may appear asynchronously
90pub struct BoundedValue;
91
92impl KeyedSingletonBound for BoundedValue {
93 type UnderlyingBound = Unbounded;
94 type ValueBound = Bounded;
95 type WithBoundedValue = BoundedValue;
96 type KeyedStreamToMonotone = BoundedValue;
97 type EraseMonotonic = BoundedValue;
98
99 fn bound_kind() -> KeyedSingletonBoundKind {
100 KeyedSingletonBoundKind::BoundedValue
101 }
102}
103
104/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
105/// it will never be removed, and the corresponding value will only increase monotonically.
106pub struct MonotonicValue;
107
108impl KeyedSingletonBound for MonotonicValue {
109 type UnderlyingBound = Unbounded;
110 type ValueBound = Unbounded;
111 type WithBoundedValue = BoundedValue;
112 type KeyedStreamToMonotone = MonotonicValue;
113 type EraseMonotonic = Unbounded;
114
115 fn bound_kind() -> KeyedSingletonBoundKind {
116 KeyedSingletonBoundKind::MonotonicValue
117 }
118}
119
120#[sealed]
121#[diagnostic::on_unimplemented(
122 message = "The keyed singleton must have monotonic values (`MonotonicValue`) or be bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
123 label = "required here",
124 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
125)]
126/// Marker trait that is implemented for [`KeyedSingletonBound`] types whose per-key values
127/// are monotonically non-decreasing (or bounded).
128pub trait IsKeyedMonotonic: KeyedSingletonBound {}
129
130#[sealed]
131#[diagnostic::do_not_recommend]
132impl IsKeyedMonotonic for MonotonicValue {}
133
134#[sealed]
135#[diagnostic::do_not_recommend]
136impl IsKeyedMonotonic for BoundedValue {}
137
138#[sealed]
139#[diagnostic::do_not_recommend]
140impl<B: IsBounded + KeyedSingletonBound> IsKeyedMonotonic for B {}
141
142/// Mapping from keys of type `K` to values of type `V`.
143///
144/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
145/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
146/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
147/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
148/// keys cannot be removed and the value for each key is immutable.
149///
150/// Type Parameters:
151/// - `K`: the type of the key for each entry
152/// - `V`: the type of the value for each entry
153/// - `Loc`: the [`Location`] where the keyed singleton is materialized
154/// - `Bound`: tracks whether the entries are:
155/// - [`Bounded`] (local and finite)
156/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
157/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
158pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
159 pub(crate) location: Loc,
160 pub(crate) ir_node: RefCell<HydroNode>,
161 pub(crate) flow_state: FlowState,
162
163 _phantom: PhantomData<(K, V, Loc, Bound)>,
164}
165
166impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
167 fn drop(&mut self) {
168 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
169 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
170 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
171 input: Box::new(ir_node),
172 op_metadata: HydroIrOpMetadata::new(),
173 });
174 }
175 }
176}
177
178impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
179 for KeyedSingleton<K, V, Loc, Bound>
180{
181 fn clone(&self) -> Self {
182 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
183 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
184 *self.ir_node.borrow_mut() = HydroNode::Tee {
185 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
186 metadata: self.location.new_node_metadata(Self::collection_kind()),
187 };
188 }
189
190 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
191 KeyedSingleton {
192 location: self.location.clone(),
193 flow_state: self.flow_state.clone(),
194 ir_node: HydroNode::Tee {
195 inner: SharedNode(inner.0.clone()),
196 metadata: metadata.clone(),
197 }
198 .into(),
199 _phantom: PhantomData,
200 }
201 } else {
202 unreachable!()
203 }
204 }
205}
206
207impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
208 for KeyedSingleton<K, V, L, B>
209where
210 L: Location<'a>,
211{
212 type Location = L;
213
214 fn create_source(cycle_id: CycleId, location: L) -> Self {
215 KeyedSingleton {
216 flow_state: location.flow_state().clone(),
217 location: location.clone(),
218 ir_node: RefCell::new(HydroNode::CycleSource {
219 cycle_id,
220 metadata: location.new_node_metadata(Self::collection_kind()),
221 }),
222 _phantom: PhantomData,
223 }
224 }
225}
226
227impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
228where
229 L: Location<'a>,
230{
231 type Location = Tick<L>;
232
233 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
234 KeyedSingleton::new(
235 location.clone(),
236 HydroNode::CycleSource {
237 cycle_id,
238 metadata: location.new_node_metadata(Self::collection_kind()),
239 },
240 )
241 }
242}
243
244impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
245where
246 L: Location<'a>,
247{
248 fn defer_tick(self) -> Self {
249 KeyedSingleton::defer_tick(self)
250 }
251}
252
253impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
254 for KeyedSingleton<K, V, L, B>
255where
256 L: Location<'a>,
257{
258 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
259 assert_eq!(
260 Location::id(&self.location),
261 expected_location,
262 "locations do not match"
263 );
264 self.location
265 .flow_state()
266 .borrow_mut()
267 .push_root(HydroRoot::CycleSink {
268 cycle_id,
269 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
270 op_metadata: HydroIrOpMetadata::new(),
271 });
272 }
273}
274
275impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
276where
277 L: Location<'a>,
278{
279 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
280 assert_eq!(
281 Location::id(&self.location),
282 expected_location,
283 "locations do not match"
284 );
285 self.location
286 .flow_state()
287 .borrow_mut()
288 .push_root(HydroRoot::CycleSink {
289 cycle_id,
290 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
291 op_metadata: HydroIrOpMetadata::new(),
292 });
293 }
294}
295
296impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
297 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
298 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
299 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
300
301 let flow_state = location.flow_state().clone();
302 KeyedSingleton {
303 location,
304 flow_state,
305 ir_node: RefCell::new(ir_node),
306 _phantom: PhantomData,
307 }
308 }
309
310 /// Returns the [`Location`] where this keyed singleton is being materialized.
311 pub fn location(&self) -> &L {
312 &self.location
313 }
314
315 /// Weakens the consistency of this live collection to not guarantee any consistency across
316 /// cluster members (if this collection is on a cluster).
317 pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
318 where
319 L: Location<'a>,
320 {
321 if L::consistency()
322 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
323 {
324 // already no consistency
325 KeyedSingleton::new(
326 self.location.drop_consistency(),
327 self.ir_node.replace(HydroNode::Placeholder),
328 )
329 } else {
330 KeyedSingleton::new(
331 self.location.drop_consistency(),
332 HydroNode::Cast {
333 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334 metadata: self
335 .location
336 .drop_consistency()
337 .new_node_metadata(
338 KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
339 ),
340 },
341 )
342 }
343 }
344
345 /// Casts this live collection to have the consistency guarantees specified in the given
346 /// location type parameter. The developer must ensure that the strengthened consistency
347 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
348 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
349 self,
350 _proof: impl crate::properties::ConsistencyProof,
351 ) -> KeyedSingleton<K, V, L2, B>
352 where
353 L: Location<'a>,
354 {
355 if L::consistency() == L2::consistency() {
356 // already consistent
357 KeyedSingleton::new(
358 self.location.with_consistency_of(),
359 self.ir_node.replace(HydroNode::Placeholder),
360 )
361 } else {
362 KeyedSingleton::new(
363 self.location.with_consistency_of(),
364 HydroNode::AssertIsConsistent {
365 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
366 trusted: false,
367 metadata: self
368 .location
369 .clone()
370 .with_consistency_of::<L2>()
371 .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
372 },
373 )
374 }
375 }
376}
377
378#[cfg(stageleft_runtime)]
379fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
380 me: KeyedSingleton<K, V, L, Bounded>,
381) -> Singleton<usize, L, Bounded> {
382 me.entries().count()
383}
384
385#[cfg(stageleft_runtime)]
386fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
387 me: KeyedSingleton<K, V, L, Bounded>,
388) -> Singleton<HashMap<K, V>, L, Bounded>
389where
390 K: Eq + Hash,
391{
392 me.entries()
393 .assume_ordering_trusted(nondet!(
394 /// There is only one element associated with each key. The closure technically
395 /// isn't commutative in the case where both passed entries have the same key
396 /// but different values.
397 ///
398 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
399 /// the key is never already present in the map.
400 ))
401 .fold(
402 q!(|| HashMap::new()),
403 q!(|map, (k, v)| {
404 map.insert(k, v);
405 }),
406 )
407}
408
409impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
410 pub(crate) fn collection_kind() -> CollectionKind {
411 CollectionKind::KeyedSingleton {
412 bound: B::bound_kind(),
413 key_type: stageleft::quote_type::<K>().into(),
414 value_type: stageleft::quote_type::<V>().into(),
415 }
416 }
417
418 /// Transforms each value by invoking `f` on each element, with keys staying the same
419 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
420 ///
421 /// If you do not want to modify the stream and instead only want to view
422 /// each item use [`KeyedSingleton::inspect`] instead.
423 ///
424 /// # Example
425 /// ```rust
426 /// # #[cfg(feature = "deploy")] {
427 /// # use hydro_lang::prelude::*;
428 /// # use futures::StreamExt;
429 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
430 /// let keyed_singleton = // { 1: 2, 2: 4 }
431 /// # process
432 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
433 /// # .into_keyed()
434 /// # .first();
435 /// keyed_singleton.map(q!(|v| v + 1))
436 /// # .entries()
437 /// # }, |mut stream| async move {
438 /// // { 1: 3, 2: 5 }
439 /// # let mut results = Vec::new();
440 /// # for _ in 0..2 {
441 /// # results.push(stream.next().await.unwrap());
442 /// # }
443 /// # results.sort();
444 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
445 /// # }));
446 /// # }
447 /// ```
448 pub fn map<U, F>(
449 self,
450 f: impl IntoQuotedMut<'a, F, L> + Copy,
451 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
452 where
453 F: Fn(V) -> U + 'a,
454 {
455 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
456 let map_f = q!({
457 let orig = f;
458 move |(k, v)| (k, orig(v))
459 })
460 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
461 .into();
462
463 KeyedSingleton::new(
464 self.location.clone(),
465 HydroNode::Map {
466 f: map_f,
467 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
468 metadata: self.location.new_node_metadata(KeyedSingleton::<
469 K,
470 U,
471 L,
472 B::EraseMonotonic,
473 >::collection_kind()),
474 },
475 )
476 }
477
478 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
479 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
480 ///
481 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
482 /// the new value `U`. The key remains unchanged in the output.
483 ///
484 /// # Example
485 /// ```rust
486 /// # #[cfg(feature = "deploy")] {
487 /// # use hydro_lang::prelude::*;
488 /// # use futures::StreamExt;
489 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
490 /// let keyed_singleton = // { 1: 2, 2: 4 }
491 /// # process
492 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
493 /// # .into_keyed()
494 /// # .first();
495 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
496 /// # .entries()
497 /// # }, |mut stream| async move {
498 /// // { 1: 3, 2: 6 }
499 /// # let mut results = Vec::new();
500 /// # for _ in 0..2 {
501 /// # results.push(stream.next().await.unwrap());
502 /// # }
503 /// # results.sort();
504 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
505 /// # }));
506 /// # }
507 /// ```
508 pub fn map_with_key<U, F>(
509 self,
510 f: impl IntoQuotedMut<'a, F, L> + Copy,
511 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
512 where
513 F: Fn((K, V)) -> U + 'a,
514 K: Clone,
515 {
516 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
517 let map_f = q!({
518 let orig = f;
519 move |(k, v)| {
520 let out = orig((Clone::clone(&k), v));
521 (k, out)
522 }
523 })
524 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
525 .into();
526
527 KeyedSingleton::new(
528 self.location.clone(),
529 HydroNode::Map {
530 f: map_f,
531 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532 metadata: self.location.new_node_metadata(KeyedSingleton::<
533 K,
534 U,
535 L,
536 B::EraseMonotonic,
537 >::collection_kind()),
538 },
539 )
540 }
541
542 /// Gets the number of keys in the keyed singleton.
543 ///
544 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
545 /// since keys may be added / removed over time. When the set of keys changes, the count will
546 /// be asynchronously updated.
547 ///
548 /// # Example
549 /// ```rust
550 /// # #[cfg(feature = "deploy")] {
551 /// # use hydro_lang::prelude::*;
552 /// # use futures::StreamExt;
553 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
554 /// # let tick = process.tick();
555 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
556 /// # process
557 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
558 /// # .into_keyed()
559 /// # .batch(&tick, nondet!(/** test */))
560 /// # .first();
561 /// keyed_singleton.key_count()
562 /// # .all_ticks()
563 /// # }, |mut stream| async move {
564 /// // 3
565 /// # assert_eq!(stream.next().await.unwrap(), 3);
566 /// # }));
567 /// # }
568 /// ```
569 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
570 if B::ValueBound::BOUNDED {
571 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
572 location: self.location.clone(),
573 flow_state: self.flow_state.clone(),
574 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
575 _phantom: PhantomData,
576 };
577
578 me.entries().count().ignore_monotonic()
579 } else if L::is_top_level()
580 && let Some(tick) = self.location.try_tick()
581 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
582 {
583 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
584 self.location.clone(),
585 self.ir_node.replace(HydroNode::Placeholder),
586 );
587
588 let out =
589 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
590 .latest();
591 Singleton::new(
592 self.location.clone(),
593 out.ir_node.replace(HydroNode::Placeholder),
594 )
595 } else {
596 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
597 }
598 }
599
600 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
601 ///
602 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
603 /// asynchronously as well.
604 ///
605 /// # Example
606 /// ```rust
607 /// # #[cfg(feature = "deploy")] {
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
612 /// # process
613 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
614 /// # .into_keyed()
615 /// # .batch(&process.tick(), nondet!(/** test */))
616 /// # .first();
617 /// keyed_singleton.into_singleton()
618 /// # .all_ticks()
619 /// # }, |mut stream| async move {
620 /// // { 1: "a", 2: "b", 3: "c" }
621 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
622 /// # }));
623 /// # }
624 /// ```
625 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
626 where
627 K: Eq + Hash,
628 {
629 if B::ValueBound::BOUNDED {
630 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
631 location: self.location.clone(),
632 flow_state: self.flow_state.clone(),
633 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
634 _phantom: PhantomData,
635 };
636
637 me.entries()
638 .assume_ordering_trusted(nondet!(
639 /// There is only one element associated with each key. The closure technically
640 /// isn't commutative in the case where both passed entries have the same key
641 /// but different values.
642 ///
643 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
644 /// the key is never already present in the map.
645 ))
646 .fold(
647 q!(|| HashMap::new()),
648 q!(|map, (k, v)| {
649 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
650 map.insert(k, v);
651 }),
652 )
653 } else if L::is_top_level()
654 && let Some(tick) = self.location.try_tick()
655 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
656 {
657 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
658 self.location.clone(),
659 self.ir_node.replace(HydroNode::Placeholder),
660 );
661
662 let out = into_singleton_inside_tick(
663 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
664 )
665 .latest();
666 Singleton::new(
667 self.location.clone(),
668 out.ir_node.replace(HydroNode::Placeholder),
669 )
670 } else {
671 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
672 }
673 }
674
675 /// An operator which allows you to "name" a `HydroNode`.
676 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
677 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
678 {
679 let mut node = self.ir_node.borrow_mut();
680 let metadata = node.metadata_mut();
681 metadata.tag = Some(name.to_owned());
682 }
683 self
684 }
685
686 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
687 /// implies that `B == Bounded`.
688 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
689 where
690 B: IsBounded,
691 {
692 KeyedSingleton::new(
693 self.location.clone(),
694 self.ir_node.replace(HydroNode::Placeholder),
695 )
696 }
697
698 /// Gets the value associated with a specific key from the keyed singleton.
699 /// Returns `None` if the key is `None` or there is no associated value.
700 ///
701 /// # Example
702 /// ```rust
703 /// # #[cfg(feature = "deploy")] {
704 /// # use hydro_lang::prelude::*;
705 /// # use futures::StreamExt;
706 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707 /// let tick = process.tick();
708 /// let keyed_data = process
709 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
710 /// .into_keyed()
711 /// .batch(&tick, nondet!(/** test */))
712 /// .first();
713 /// let key = tick.singleton(q!(1));
714 /// keyed_data.get(key).all_ticks()
715 /// # }, |mut stream| async move {
716 /// // 2
717 /// # assert_eq!(stream.next().await.unwrap(), 2);
718 /// # }));
719 /// # }
720 /// ```
721 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
722 where
723 B: IsBounded,
724 K: Hash + Eq + Clone,
725 V: Clone,
726 {
727 self.make_bounded()
728 .into_keyed_stream()
729 .get(key)
730 .cast_at_most_one_element()
731 }
732
733 /// Emit a keyed stream containing keys shared between the keyed singleton and the
734 /// keyed stream, where each value in the output keyed stream is a tuple of
735 /// (the keyed singleton's value, the keyed stream's value).
736 ///
737 /// # Example
738 /// ```rust
739 /// # #[cfg(feature = "deploy")] {
740 /// # use hydro_lang::prelude::*;
741 /// # use futures::StreamExt;
742 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
743 /// let tick = process.tick();
744 /// let keyed_data = process
745 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
746 /// .into_keyed()
747 /// .batch(&tick, nondet!(/** test */))
748 /// .first();
749 /// let other_data = process
750 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
751 /// .into_keyed()
752 /// .batch(&tick, nondet!(/** test */));
753 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
754 /// # }, |mut stream| async move {
755 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
756 /// # let mut results = vec![];
757 /// # for _ in 0..3 {
758 /// # results.push(stream.next().await.unwrap());
759 /// # }
760 /// # results.sort();
761 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
762 /// # }));
763 /// # }
764 /// ```
765 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
766 self,
767 other: KeyedStream<K, V2, L, B2, O2, R2>,
768 ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
769 where
770 B: IsBounded,
771 K: Eq + Hash + Clone,
772 V: Clone,
773 V2: Clone,
774 {
775 // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
776 // always produces deterministic order per key (nested loop join), this could just use
777 // `join_keyed_stream` without constructing IRs manually
778 KeyedStream::new(
779 self.location.clone(),
780 HydroNode::Join {
781 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
782 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
783 metadata: self
784 .location
785 .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
786 },
787 )
788 }
789
790 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
791 /// where each value in the output keyed singleton is a tuple of
792 /// (self.value, other.value).
793 ///
794 /// # Example
795 /// ```rust
796 /// # #[cfg(feature = "deploy")] {
797 /// # use hydro_lang::prelude::*;
798 /// # use futures::StreamExt;
799 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
800 /// # let tick = process.tick();
801 /// let requests = // { 1: 10, 2: 20, 3: 30 }
802 /// # process
803 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
804 /// # .into_keyed()
805 /// # .batch(&tick, nondet!(/** test */))
806 /// # .first();
807 /// let other = // { 1: 100, 2: 200, 4: 400 }
808 /// # process
809 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
810 /// # .into_keyed()
811 /// # .batch(&tick, nondet!(/** test */))
812 /// # .first();
813 /// requests.join_keyed_singleton(other)
814 /// # .entries().all_ticks()
815 /// # }, |mut stream| async move {
816 /// // { 1: (10, 100), 2: (20, 200) }
817 /// # let mut results = vec![];
818 /// # for _ in 0..2 {
819 /// # results.push(stream.next().await.unwrap());
820 /// # }
821 /// # results.sort();
822 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
823 /// # }));
824 /// # }
825 /// ```
826 pub fn join_keyed_singleton<V2: Clone>(
827 self,
828 other: KeyedSingleton<K, V2, L, Bounded>,
829 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
830 where
831 B: IsBounded,
832 K: Eq + Hash + Clone,
833 V: Clone,
834 {
835 let result_stream = self
836 .make_bounded()
837 .entries()
838 .join(other.entries())
839 .into_keyed();
840
841 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
842 result_stream.cast_at_most_one_entry_per_key()
843 }
844
845 /// For each value in `self`, find the matching key in `lookup`.
846 /// The output is a keyed singleton with the key from `self`, and a value
847 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
848 /// If the key is not present in `lookup`, the option will be [`None`].
849 ///
850 /// # Example
851 /// ```rust
852 /// # #[cfg(feature = "deploy")] {
853 /// # use hydro_lang::prelude::*;
854 /// # use futures::StreamExt;
855 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
856 /// # let tick = process.tick();
857 /// let requests = // { 1: 10, 2: 20 }
858 /// # process
859 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
860 /// # .into_keyed()
861 /// # .batch(&tick, nondet!(/** test */))
862 /// # .first();
863 /// let other_data = // { 10: 100, 11: 110 }
864 /// # process
865 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
866 /// # .into_keyed()
867 /// # .batch(&tick, nondet!(/** test */))
868 /// # .first();
869 /// requests.lookup_keyed_singleton(other_data)
870 /// # .entries().all_ticks()
871 /// # }, |mut stream| async move {
872 /// // { 1: (10, Some(100)), 2: (20, None) }
873 /// # let mut results = vec![];
874 /// # for _ in 0..2 {
875 /// # results.push(stream.next().await.unwrap());
876 /// # }
877 /// # results.sort();
878 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
879 /// # }));
880 /// # }
881 /// ```
882 pub fn lookup_keyed_singleton<V2>(
883 self,
884 lookup: KeyedSingleton<V, V2, L, Bounded>,
885 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
886 where
887 B: IsBounded,
888 K: Eq + Hash + Clone,
889 V: Eq + Hash + Clone,
890 V2: Clone,
891 {
892 let result_stream = self
893 .make_bounded()
894 .into_keyed_stream()
895 .lookup_keyed_stream(lookup.into_keyed_stream());
896
897 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
898 result_stream.cast_at_most_one_entry_per_key()
899 }
900
901 /// For each value in `self`, find the matching key in `lookup`.
902 /// The output is a keyed stream with the key from `self`, and a value
903 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
904 /// If the key is not present in `lookup`, the option will be [`None`].
905 ///
906 /// # Example
907 /// ```rust
908 /// # #[cfg(feature = "deploy")] {
909 /// # use hydro_lang::prelude::*;
910 /// # use futures::StreamExt;
911 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
912 /// # let tick = process.tick();
913 /// let requests = // { 1: 10, 2: 20 }
914 /// # process
915 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
916 /// # .into_keyed()
917 /// # .batch(&tick, nondet!(/** test */))
918 /// # .first();
919 /// let other_data = // { 10: 100, 10: 110 }
920 /// # process
921 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
922 /// # .into_keyed()
923 /// # .batch(&tick, nondet!(/** test */));
924 /// requests.lookup_keyed_stream(other_data)
925 /// # .entries().all_ticks()
926 /// # }, |mut stream| async move {
927 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
928 /// # let mut results = vec![];
929 /// # for _ in 0..3 {
930 /// # results.push(stream.next().await.unwrap());
931 /// # }
932 /// # results.sort();
933 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
934 /// # }));
935 /// # }
936 /// ```
937 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
938 self,
939 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
940 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
941 where
942 B: IsBounded,
943 K: Eq + Hash + Clone,
944 V: Eq + Hash + Clone,
945 V2: Clone,
946 {
947 self.make_bounded()
948 .entries()
949 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
950 .into_keyed()
951 .lookup_keyed_stream(lookup)
952 }
953
954 /// For each key present in both `self` and `thresholds`, emits a [`KeyedStream`] event the first
955 /// time that key's value becomes greater than or equal to the corresponding threshold value.
956 /// The emitted value for each key is the threshold value itself.
957 ///
958 /// This requires the keyed singleton to have monotonic values ([`MonotonicValue`] or [`Bounded`]),
959 /// because otherwise the threshold detection would be non-deterministic.
960 ///
961 /// The `thresholds` parameter is a [`BoundedValue`] keyed singleton mapping each key to its
962 /// threshold. Thresholds may arrive asynchronously (new keys appear over time), but once set
963 /// for a key, the threshold value is fixed. Late-arriving thresholds are checked against the
964 /// current snapshot value immediately.
965 ///
966 /// # Example
967 /// ```rust,ignore
968 /// use hydro_lang::prelude::*;
969 ///
970 /// // Given a monotonically increasing keyed singleton (e.g. from fold with monotone proof)
971 /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
972 /// .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 is monotone */)));
973 ///
974 /// // BoundedValue keyed singleton of thresholds (from .first())
975 /// let thresholds = threshold_source.into_keyed().first();
976 ///
977 /// // Emits (key, threshold_value) the first time each key's value >= threshold
978 /// let crossed = counts.threshold_greater_or_equal(thresholds);
979 /// ```
980 pub fn threshold_greater_or_equal(
981 self,
982 thresholds: KeyedSingleton<K, V, L, BoundedValue>,
983 ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
984 where
985 K: Clone + Eq + Hash,
986 V: Clone + PartialOrd,
987 B: IsKeyedMonotonic,
988 {
989 let self_location = self.location.clone();
990 match B::bound_kind() {
991 KeyedSingletonBoundKind::Bounded => {
992 // Bounded case: self is already fixed, just join and filter
993 let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
994 self.location.clone(),
995 self.ir_node.replace(HydroNode::Placeholder),
996 );
997 let result = me
998 .entries()
999 .join(thresholds.entries())
1000 .filter_map(q!(|(k, (val, thresh))| {
1001 if val >= thresh {
1002 Some((k, thresh))
1003 } else {
1004 None
1005 }
1006 }))
1007 .into_keyed();
1008 KeyedStream::new(
1009 result.location.clone(),
1010 result.ir_node.replace(HydroNode::Placeholder),
1011 )
1012 }
1013 KeyedSingletonBoundKind::MonotonicValue => {
1014 let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1015 self.location.clone(),
1016 self.ir_node.replace(HydroNode::Placeholder),
1017 );
1018
1019 let result = sliced! {
1020 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1021 let thresh_snapshot =
1022 use(thresholds, nondet!(/** thresholds are deterministic */));
1023 let mut already_crossed =
1024 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1025
1026 let joined = thresh_snapshot.entries().join(snapshot.entries());
1027 let passed = joined
1028 .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1029 .map(q!(|(k, (thresh, _))| (k, thresh)));
1030
1031 let newly_crossed = passed.anti_join(already_crossed.clone());
1032 already_crossed =
1033 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1034
1035 newly_crossed.into_keyed()
1036 };
1037
1038 KeyedStream::new(
1039 self_location,
1040 result.ir_node.replace(HydroNode::Placeholder),
1041 )
1042 }
1043 KeyedSingletonBoundKind::BoundedValue => {
1044 let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1045 self.location.clone(),
1046 self.ir_node.replace(HydroNode::Placeholder),
1047 );
1048
1049 let result = sliced! {
1050 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1051 let thresh_snapshot =
1052 use(thresholds, nondet!(/** thresholds are deterministic */));
1053 let mut already_crossed =
1054 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1055
1056 let joined = thresh_snapshot.entries().join(snapshot.entries());
1057 let passed = joined
1058 .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1059 .map(q!(|(k, (thresh, _))| (k, thresh)));
1060
1061 let newly_crossed = passed.anti_join(already_crossed.clone());
1062 already_crossed =
1063 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1064
1065 newly_crossed.into_keyed()
1066 };
1067
1068 KeyedStream::new(
1069 self_location,
1070 result.ir_node.replace(HydroNode::Placeholder),
1071 )
1072 }
1073 _ => {
1074 unreachable!(
1075 "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1076 )
1077 }
1078 }
1079 }
1080
1081 /// Like [`Self::threshold_greater_or_equal`], but uses a single [`Singleton`] threshold
1082 /// shared across all keys. Emits a `(K, V)` event for each key the first time that key's
1083 /// value becomes >= the threshold. The emitted value is the threshold itself.
1084 ///
1085 /// Because the threshold is a [`Bounded`] singleton, it is a compile-time constant and
1086 /// does not carry ongoing memory cost.
1087 ///
1088 /// # Example
1089 /// ```rust,ignore
1090 /// use hydro_lang::prelude::*;
1091 ///
1092 /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
1093 /// .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 */)));
1094 ///
1095 /// let threshold = process.singleton(q!(5usize));
1096 /// let crossed = counts.threshold_greater_or_equal_uniform(threshold);
1097 /// ```
1098 pub fn threshold_greater_or_equal_uniform(
1099 self,
1100 threshold: Singleton<V, L, Bounded>,
1101 ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1102 where
1103 K: Clone + Eq + Hash,
1104 V: Clone + PartialOrd,
1105 B: IsKeyedMonotonic,
1106 {
1107 let self_location = self.location.clone();
1108 match B::bound_kind() {
1109 KeyedSingletonBoundKind::Bounded => {
1110 let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1111 self.location.clone(),
1112 self.ir_node.replace(HydroNode::Placeholder),
1113 );
1114 let result = me
1115 .entries()
1116 .cross_singleton(threshold)
1117 .filter_map(q!(|((k, val), thresh)| {
1118 if val >= thresh {
1119 Some((k, thresh))
1120 } else {
1121 None
1122 }
1123 }))
1124 .into_keyed();
1125 KeyedStream::new(
1126 result.location.clone(),
1127 result.ir_node.replace(HydroNode::Placeholder),
1128 )
1129 }
1130 KeyedSingletonBoundKind::MonotonicValue => {
1131 let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1132 self.location.clone(),
1133 self.ir_node.replace(HydroNode::Placeholder),
1134 );
1135
1136 let result = sliced! {
1137 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1138 let mut already_crossed =
1139 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1140
1141 let tick = snapshot.location().clone();
1142 let thresh_in_tick = threshold.clone_into_tick(&tick);
1143
1144 let crossing = snapshot
1145 .entries()
1146 .cross_singleton(thresh_in_tick)
1147 .filter_map(q!(|((k, val), thresh)| {
1148 if val >= thresh {
1149 Some((k, thresh))
1150 } else {
1151 None
1152 }
1153 }));
1154
1155 let newly_crossed = crossing.anti_join(already_crossed.clone());
1156 already_crossed =
1157 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1158
1159 newly_crossed.into_keyed()
1160 };
1161
1162 KeyedStream::new(
1163 self_location,
1164 result.ir_node.replace(HydroNode::Placeholder),
1165 )
1166 }
1167 KeyedSingletonBoundKind::BoundedValue => {
1168 let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1169 self.location.clone(),
1170 self.ir_node.replace(HydroNode::Placeholder),
1171 );
1172
1173 let result = sliced! {
1174 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1175 let mut already_crossed =
1176 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1177
1178 let tick = snapshot.location().clone();
1179 let thresh_in_tick = threshold.clone_into_tick(&tick);
1180
1181 let crossing = snapshot
1182 .entries()
1183 .cross_singleton(thresh_in_tick)
1184 .filter_map(q!(|((k, val), thresh)| {
1185 if val >= thresh {
1186 Some((k, thresh))
1187 } else {
1188 None
1189 }
1190 }));
1191
1192 let newly_crossed = crossing.anti_join(already_crossed.clone());
1193 already_crossed =
1194 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1195
1196 newly_crossed.into_keyed()
1197 };
1198
1199 KeyedStream::new(
1200 self_location,
1201 result.ir_node.replace(HydroNode::Placeholder),
1202 )
1203 }
1204 _ => {
1205 unreachable!(
1206 "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1207 )
1208 }
1209 }
1210 }
1211}
1212
1213impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
1214 KeyedSingleton<K, V, L, B>
1215{
1216 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
1217 ///
1218 /// The value for each key must be bounded, otherwise the resulting stream elements would be
1219 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1220 /// into the output.
1221 ///
1222 /// # Example
1223 /// ```rust
1224 /// # #[cfg(feature = "deploy")] {
1225 /// # use hydro_lang::prelude::*;
1226 /// # use futures::StreamExt;
1227 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1228 /// let keyed_singleton = // { 1: 2, 2: 4 }
1229 /// # process
1230 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1231 /// # .into_keyed()
1232 /// # .first();
1233 /// keyed_singleton.entries()
1234 /// # }, |mut stream| async move {
1235 /// // (1, 2), (2, 4) in any order
1236 /// # let mut results = Vec::new();
1237 /// # for _ in 0..2 {
1238 /// # results.push(stream.next().await.unwrap());
1239 /// # }
1240 /// # results.sort();
1241 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1242 /// # }));
1243 /// # }
1244 /// ```
1245 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1246 self.into_keyed_stream().entries()
1247 }
1248
1249 /// Flattens the keyed singleton into an unordered stream of just the values.
1250 ///
1251 /// The value for each key must be bounded, otherwise the resulting stream elements would be
1252 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1253 /// into the output.
1254 ///
1255 /// # Example
1256 /// ```rust
1257 /// # #[cfg(feature = "deploy")] {
1258 /// # use hydro_lang::prelude::*;
1259 /// # use futures::StreamExt;
1260 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1261 /// let keyed_singleton = // { 1: 2, 2: 4 }
1262 /// # process
1263 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1264 /// # .into_keyed()
1265 /// # .first();
1266 /// keyed_singleton.values()
1267 /// # }, |mut stream| async move {
1268 /// // 2, 4 in any order
1269 /// # let mut results = Vec::new();
1270 /// # for _ in 0..2 {
1271 /// # results.push(stream.next().await.unwrap());
1272 /// # }
1273 /// # results.sort();
1274 /// # assert_eq!(results, vec![2, 4]);
1275 /// # }));
1276 /// # }
1277 /// ```
1278 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1279 let map_f = q!(|(_, v)| v)
1280 .splice_fn1_ctx::<(K, V), V>(&self.location)
1281 .into();
1282
1283 Stream::new(
1284 self.location.clone(),
1285 HydroNode::Map {
1286 f: map_f,
1287 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1288 metadata: self.location.new_node_metadata(Stream::<
1289 V,
1290 L,
1291 B::UnderlyingBound,
1292 NoOrder,
1293 ExactlyOnce,
1294 >::collection_kind()),
1295 },
1296 )
1297 }
1298
1299 /// Flattens the keyed singleton into an unordered stream of just the keys.
1300 ///
1301 /// The value for each key must be bounded, otherwise the removal of keys would result in
1302 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1303 /// into the output.
1304 ///
1305 /// # Example
1306 /// ```rust
1307 /// # #[cfg(feature = "deploy")] {
1308 /// # use hydro_lang::prelude::*;
1309 /// # use futures::StreamExt;
1310 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1311 /// let keyed_singleton = // { 1: 2, 2: 4 }
1312 /// # process
1313 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1314 /// # .into_keyed()
1315 /// # .first();
1316 /// keyed_singleton.keys()
1317 /// # }, |mut stream| async move {
1318 /// // 1, 2 in any order
1319 /// # let mut results = Vec::new();
1320 /// # for _ in 0..2 {
1321 /// # results.push(stream.next().await.unwrap());
1322 /// # }
1323 /// # results.sort();
1324 /// # assert_eq!(results, vec![1, 2]);
1325 /// # }));
1326 /// # }
1327 /// ```
1328 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1329 self.entries().map(q!(|(k, _)| k))
1330 }
1331
1332 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1333 /// entries whose keys are not in the provided stream.
1334 ///
1335 /// # Example
1336 /// ```rust
1337 /// # #[cfg(feature = "deploy")] {
1338 /// # use hydro_lang::prelude::*;
1339 /// # use futures::StreamExt;
1340 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1341 /// let tick = process.tick();
1342 /// let keyed_singleton = // { 1: 2, 2: 4 }
1343 /// # process
1344 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1345 /// # .into_keyed()
1346 /// # .first()
1347 /// # .batch(&tick, nondet!(/** test */));
1348 /// let keys_to_remove = process
1349 /// .source_iter(q!(vec![1]))
1350 /// .batch(&tick, nondet!(/** test */));
1351 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1352 /// # .entries().all_ticks()
1353 /// # }, |mut stream| async move {
1354 /// // { 2: 4 }
1355 /// # for w in vec![(2, 4)] {
1356 /// # assert_eq!(stream.next().await.unwrap(), w);
1357 /// # }
1358 /// # }));
1359 /// # }
1360 /// ```
1361 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1362 self,
1363 other: Stream<K, L, Bounded, O2, R2>,
1364 ) -> Self
1365 where
1366 K: Hash + Eq,
1367 {
1368 check_matching_location(&self.location, &other.location);
1369
1370 KeyedSingleton::new(
1371 self.location.clone(),
1372 HydroNode::AntiJoin {
1373 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1374 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1375 metadata: self.location.new_node_metadata(Self::collection_kind()),
1376 },
1377 )
1378 }
1379
1380 /// An operator which allows you to "inspect" each value of a keyed singleton without
1381 /// modifying it. The closure `f` is called on a reference to each value. This is
1382 /// mainly useful for debugging, and should not be used to generate side-effects.
1383 ///
1384 /// # Example
1385 /// ```rust
1386 /// # #[cfg(feature = "deploy")] {
1387 /// # use hydro_lang::prelude::*;
1388 /// # use futures::StreamExt;
1389 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1390 /// let keyed_singleton = // { 1: 2, 2: 4 }
1391 /// # process
1392 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1393 /// # .into_keyed()
1394 /// # .first();
1395 /// keyed_singleton
1396 /// .inspect(q!(|v| println!("{}", v)))
1397 /// # .entries()
1398 /// # }, |mut stream| async move {
1399 /// // { 1: 2, 2: 4 }
1400 /// # for w in vec![(1, 2), (2, 4)] {
1401 /// # assert_eq!(stream.next().await.unwrap(), w);
1402 /// # }
1403 /// # }));
1404 /// # }
1405 /// ```
1406 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1407 where
1408 F: Fn(&V) + 'a,
1409 {
1410 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1411 let inspect_f = q!({
1412 let orig = f;
1413 move |t: &(_, _)| orig(&t.1)
1414 })
1415 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1416 .into();
1417
1418 KeyedSingleton::new(
1419 self.location.clone(),
1420 HydroNode::Inspect {
1421 f: inspect_f,
1422 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1423 metadata: self.location.new_node_metadata(Self::collection_kind()),
1424 },
1425 )
1426 }
1427
1428 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1429 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1430 /// mainly useful for debugging, and should not be used to generate side-effects.
1431 ///
1432 /// # Example
1433 /// ```rust
1434 /// # #[cfg(feature = "deploy")] {
1435 /// # use hydro_lang::prelude::*;
1436 /// # use futures::StreamExt;
1437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438 /// let keyed_singleton = // { 1: 2, 2: 4 }
1439 /// # process
1440 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1441 /// # .into_keyed()
1442 /// # .first();
1443 /// keyed_singleton
1444 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1445 /// # .entries()
1446 /// # }, |mut stream| async move {
1447 /// // { 1: 2, 2: 4 }
1448 /// # for w in vec![(1, 2), (2, 4)] {
1449 /// # assert_eq!(stream.next().await.unwrap(), w);
1450 /// # }
1451 /// # }));
1452 /// # }
1453 /// ```
1454 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1455 where
1456 F: Fn(&(K, V)) + 'a,
1457 {
1458 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1459
1460 KeyedSingleton::new(
1461 self.location.clone(),
1462 HydroNode::Inspect {
1463 f: inspect_f,
1464 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1465 metadata: self.location.new_node_metadata(Self::collection_kind()),
1466 },
1467 )
1468 }
1469
1470 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1471 ///
1472 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1473 /// asynchronously updated if a new key is added that is higher than the previous max key.
1474 ///
1475 /// # Example
1476 /// ```rust
1477 /// # #[cfg(feature = "deploy")] {
1478 /// # use hydro_lang::prelude::*;
1479 /// # use futures::StreamExt;
1480 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1481 /// let tick = process.tick();
1482 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1483 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1484 /// # .into_keyed()
1485 /// # .first();
1486 /// keyed_singleton.get_max_key()
1487 /// # .sample_eager(nondet!(/** test */))
1488 /// # }, |mut stream| async move {
1489 /// // (2, 456)
1490 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1491 /// # }));
1492 /// # }
1493 /// ```
1494 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1495 where
1496 K: Ord,
1497 {
1498 self.entries()
1499 .assume_ordering_trusted(nondet!(
1500 /// There is only one element associated with each key, and the keys are totallly
1501 /// ordered so we will produce a deterministic value. The closure technically
1502 /// isn't commutative in the case where both passed entries have the same key
1503 /// but different values.
1504 ///
1505 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1506 /// the two inputs do not have the same key.
1507 ))
1508 .reduce(q!(
1509 move |curr, new| {
1510 if new.0 > curr.0 {
1511 *curr = new;
1512 }
1513 },
1514 idempotent = manual_proof!(/** repeated elements are ignored */)
1515 ))
1516 }
1517
1518 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1519 /// element, the value.
1520 ///
1521 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1522 ///
1523 /// # Example
1524 /// ```rust
1525 /// # #[cfg(feature = "deploy")] {
1526 /// # use hydro_lang::prelude::*;
1527 /// # use futures::StreamExt;
1528 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1529 /// let keyed_singleton = // { 1: 2, 2: 4 }
1530 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1531 /// # .into_keyed()
1532 /// # .first();
1533 /// keyed_singleton
1534 /// .clone()
1535 /// .into_keyed_stream()
1536 /// .merge_unordered(
1537 /// keyed_singleton.into_keyed_stream()
1538 /// )
1539 /// # .entries()
1540 /// # }, |mut stream| async move {
1541 /// /// // { 1: [2, 2], 2: [4, 4] }
1542 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1543 /// # assert_eq!(stream.next().await.unwrap(), w);
1544 /// # }
1545 /// # }));
1546 /// # }
1547 /// ```
1548 pub fn into_keyed_stream(
1549 self,
1550 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1551 KeyedStream::new(
1552 self.location.clone(),
1553 HydroNode::Cast {
1554 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1555 metadata: self.location.new_node_metadata(KeyedStream::<
1556 K,
1557 V,
1558 L,
1559 B::UnderlyingBound,
1560 TotalOrder,
1561 ExactlyOnce,
1562 >::collection_kind()),
1563 },
1564 )
1565 }
1566}
1567
1568impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1569where
1570 L: Location<'a>,
1571{
1572 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1573 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1574 ///
1575 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1576 /// processed before an acknowledgement is emitted.
1577 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1578 let id = self.location.flow_state().borrow_mut().next_clock_id();
1579 let out_location = Atomic {
1580 tick: Tick {
1581 id,
1582 l: self.location.clone(),
1583 },
1584 };
1585 KeyedSingleton::new(
1586 out_location.clone(),
1587 HydroNode::BeginAtomic {
1588 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1589 metadata: out_location
1590 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1591 },
1592 )
1593 }
1594}
1595
1596impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1597where
1598 L: Location<'a>,
1599{
1600 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1601 /// See [`KeyedSingleton::atomic`] for more details.
1602 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1603 KeyedSingleton::new(
1604 self.location.tick.l.clone(),
1605 HydroNode::EndAtomic {
1606 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1607 metadata: self
1608 .location
1609 .tick
1610 .l
1611 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1612 },
1613 )
1614 }
1615}
1616
1617impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1618 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1619 /// tick `T` always has the entries of `self` at tick `T - 1`.
1620 ///
1621 /// At tick `0`, the output has no entries, since there is no previous tick.
1622 ///
1623 /// This operator enables stateful iterative processing with ticks, by sending data from one
1624 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1625 ///
1626 /// # Example
1627 /// ```rust
1628 /// # #[cfg(feature = "deploy")] {
1629 /// # use hydro_lang::prelude::*;
1630 /// # use futures::StreamExt;
1631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1632 /// let tick = process.tick();
1633 /// # // ticks are lazy by default, forces the second tick to run
1634 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1635 /// # let batch_first_tick = process
1636 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1637 /// # .batch(&tick, nondet!(/** test */))
1638 /// # .into_keyed();
1639 /// # let batch_second_tick = process
1640 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1641 /// # .batch(&tick, nondet!(/** test */))
1642 /// # .into_keyed()
1643 /// # .defer_tick(); // appears on the second tick
1644 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1645 /// # batch_first_tick.chain(batch_second_tick).first();
1646 /// input_batch.clone().filter_key_not_in(
1647 /// input_batch.defer_tick().keys() // keys present in the previous tick
1648 /// )
1649 /// # .entries().all_ticks()
1650 /// # }, |mut stream| async move {
1651 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1652 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1653 /// # assert_eq!(stream.next().await.unwrap(), w);
1654 /// # }
1655 /// # }));
1656 /// # }
1657 /// ```
1658 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1659 KeyedSingleton::new(
1660 self.location.clone(),
1661 HydroNode::DeferTick {
1662 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1663 metadata: self
1664 .location
1665 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1666 },
1667 )
1668 }
1669}
1670
1671impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1672where
1673 L: Location<'a>,
1674{
1675 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1676 /// point in time.
1677 ///
1678 /// # Non-Determinism
1679 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1680 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1681 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1682 self,
1683 tick: &Tick<L2>,
1684 _nondet: NonDet,
1685 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1686 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1687 KeyedSingleton::new(
1688 tick.drop_consistency(),
1689 HydroNode::Batch {
1690 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1691 metadata: tick
1692 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1693 },
1694 )
1695 }
1696}
1697
1698impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1699where
1700 L: Location<'a>,
1701{
1702 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1703 /// state of the keyed singleton being atomically processed.
1704 ///
1705 /// # Non-Determinism
1706 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1707 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1708 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1709 self,
1710 tick: &Tick<L2>,
1711 _nondet: NonDet,
1712 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1713 KeyedSingleton::new(
1714 tick.drop_consistency(),
1715 HydroNode::Batch {
1716 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1717 metadata: tick
1718 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1719 },
1720 )
1721 }
1722}
1723
1724impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1725where
1726 L: Location<'a>,
1727{
1728 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1729 ///
1730 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1731 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1732 /// is filtered out.
1733 ///
1734 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1735 /// not modify or take ownership of the values. If you need to modify the values while filtering
1736 /// use [`KeyedSingleton::filter_map`] instead.
1737 ///
1738 /// # Example
1739 /// ```rust
1740 /// # #[cfg(feature = "deploy")] {
1741 /// # use hydro_lang::prelude::*;
1742 /// # use futures::StreamExt;
1743 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1744 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1745 /// # process
1746 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1747 /// # .into_keyed()
1748 /// # .first();
1749 /// keyed_singleton.filter(q!(|&v| v > 1))
1750 /// # .entries()
1751 /// # }, |mut stream| async move {
1752 /// // { 1: 2, 2: 4 }
1753 /// # let mut results = Vec::new();
1754 /// # for _ in 0..2 {
1755 /// # results.push(stream.next().await.unwrap());
1756 /// # }
1757 /// # results.sort();
1758 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1759 /// # }));
1760 /// # }
1761 /// ```
1762 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1763 where
1764 F: Fn(&V) -> bool + 'a,
1765 {
1766 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1767 let filter_f = q!({
1768 let orig = f;
1769 move |t: &(_, _)| orig(&t.1)
1770 })
1771 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1772 .into();
1773
1774 KeyedSingleton::new(
1775 self.location.clone(),
1776 HydroNode::Filter {
1777 f: filter_f,
1778 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1779 metadata: self
1780 .location
1781 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1782 },
1783 )
1784 }
1785
1786 /// An operator that both filters and maps values. It yields only the key-value pairs where
1787 /// the supplied closure `f` returns `Some(value)`.
1788 ///
1789 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1790 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1791 /// If it returns `None`, the key-value pair is filtered out.
1792 ///
1793 /// # Example
1794 /// ```rust
1795 /// # #[cfg(feature = "deploy")] {
1796 /// # use hydro_lang::prelude::*;
1797 /// # use futures::StreamExt;
1798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1799 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1800 /// # process
1801 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1802 /// # .into_keyed()
1803 /// # .first();
1804 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1805 /// # .entries()
1806 /// # }, |mut stream| async move {
1807 /// // { 1: 42, 3: 100 }
1808 /// # let mut results = Vec::new();
1809 /// # for _ in 0..2 {
1810 /// # results.push(stream.next().await.unwrap());
1811 /// # }
1812 /// # results.sort();
1813 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1814 /// # }));
1815 /// # }
1816 /// ```
1817 pub fn filter_map<F, U>(
1818 self,
1819 f: impl IntoQuotedMut<'a, F, L> + Copy,
1820 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1821 where
1822 F: Fn(V) -> Option<U> + 'a,
1823 {
1824 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1825 let filter_map_f = q!({
1826 let orig = f;
1827 move |(k, v)| orig(v).map(|o| (k, o))
1828 })
1829 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1830 .into();
1831
1832 KeyedSingleton::new(
1833 self.location.clone(),
1834 HydroNode::FilterMap {
1835 f: filter_map_f,
1836 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1837 metadata: self.location.new_node_metadata(KeyedSingleton::<
1838 K,
1839 U,
1840 L,
1841 B::EraseMonotonic,
1842 >::collection_kind()),
1843 },
1844 )
1845 }
1846
1847 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1848 /// arrived since the previous batch was released.
1849 ///
1850 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1851 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1852 ///
1853 /// # Non-Determinism
1854 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1855 /// has a non-deterministic set of key-value pairs.
1856 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1857 self,
1858 tick: &Tick<L2>,
1859 _nondet: NonDet,
1860 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1861 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1862 KeyedSingleton::new(
1863 tick.drop_consistency(),
1864 HydroNode::Batch {
1865 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1866 metadata: tick
1867 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1868 },
1869 )
1870 }
1871}
1872
1873impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1874where
1875 L: Location<'a>,
1876{
1877 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1878 /// atomically processed.
1879 ///
1880 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1881 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1882 ///
1883 /// # Non-Determinism
1884 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1885 /// has a non-deterministic set of key-value pairs.
1886 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1887 self,
1888 tick: &Tick<L2>,
1889 nondet: NonDet,
1890 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1891 let _ = nondet;
1892 KeyedSingleton::new(
1893 tick.drop_consistency(),
1894 HydroNode::Batch {
1895 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1896 metadata: tick
1897 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1898 },
1899 )
1900 }
1901}
1902
1903#[cfg(test)]
1904mod tests {
1905 #[cfg(feature = "deploy")]
1906 use futures::{SinkExt, StreamExt};
1907 #[cfg(feature = "deploy")]
1908 use hydro_deploy::Deployment;
1909 #[cfg(any(feature = "deploy", feature = "sim"))]
1910 use stageleft::q;
1911
1912 #[cfg(any(feature = "deploy", feature = "sim"))]
1913 use crate::compile::builder::FlowBuilder;
1914 #[cfg(any(feature = "deploy", feature = "sim"))]
1915 use crate::location::Location;
1916 #[cfg(any(feature = "deploy", feature = "sim"))]
1917 use crate::nondet::nondet;
1918
1919 #[cfg(feature = "deploy")]
1920 #[tokio::test]
1921 async fn key_count_bounded_value() {
1922 let mut deployment = Deployment::new();
1923
1924 let mut flow = FlowBuilder::new();
1925 let node = flow.process::<()>();
1926 let external = flow.external::<()>();
1927
1928 let (input_port, input) = node.source_external_bincode(&external);
1929 let out = input
1930 .into_keyed()
1931 .first()
1932 .key_count()
1933 .sample_eager(nondet!(/** test */))
1934 .send_bincode_external(&external);
1935
1936 let nodes = flow
1937 .with_process(&node, deployment.Localhost())
1938 .with_external(&external, deployment.Localhost())
1939 .deploy(&mut deployment);
1940
1941 deployment.deploy().await.unwrap();
1942
1943 let mut external_in = nodes.connect(input_port).await;
1944 let mut external_out = nodes.connect(out).await;
1945
1946 deployment.start().await.unwrap();
1947
1948 assert_eq!(external_out.next().await.unwrap(), 0);
1949
1950 external_in.send((1, 1)).await.unwrap();
1951 assert_eq!(external_out.next().await.unwrap(), 1);
1952
1953 external_in.send((2, 2)).await.unwrap();
1954 assert_eq!(external_out.next().await.unwrap(), 2);
1955 }
1956
1957 #[cfg(feature = "deploy")]
1958 #[tokio::test]
1959 async fn key_count_unbounded_value() {
1960 let mut deployment = Deployment::new();
1961
1962 let mut flow = FlowBuilder::new();
1963 let node = flow.process::<()>();
1964 let external = flow.external::<()>();
1965
1966 let (input_port, input) = node.source_external_bincode(&external);
1967 let out = input
1968 .into_keyed()
1969 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1970 .key_count()
1971 .sample_eager(nondet!(/** test */))
1972 .send_bincode_external(&external);
1973
1974 let nodes = flow
1975 .with_process(&node, deployment.Localhost())
1976 .with_external(&external, deployment.Localhost())
1977 .deploy(&mut deployment);
1978
1979 deployment.deploy().await.unwrap();
1980
1981 let mut external_in = nodes.connect(input_port).await;
1982 let mut external_out = nodes.connect(out).await;
1983
1984 deployment.start().await.unwrap();
1985
1986 assert_eq!(external_out.next().await.unwrap(), 0);
1987
1988 external_in.send((1, 1)).await.unwrap();
1989 assert_eq!(external_out.next().await.unwrap(), 1);
1990
1991 external_in.send((1, 2)).await.unwrap();
1992 assert_eq!(external_out.next().await.unwrap(), 1);
1993
1994 external_in.send((2, 2)).await.unwrap();
1995 assert_eq!(external_out.next().await.unwrap(), 2);
1996
1997 external_in.send((1, 1)).await.unwrap();
1998 assert_eq!(external_out.next().await.unwrap(), 2);
1999
2000 external_in.send((3, 1)).await.unwrap();
2001 assert_eq!(external_out.next().await.unwrap(), 3);
2002 }
2003
2004 #[cfg(feature = "deploy")]
2005 #[tokio::test]
2006 async fn into_singleton_bounded_value() {
2007 let mut deployment = Deployment::new();
2008
2009 let mut flow = FlowBuilder::new();
2010 let node = flow.process::<()>();
2011 let external = flow.external::<()>();
2012
2013 let (input_port, input) = node.source_external_bincode(&external);
2014 let out = input
2015 .into_keyed()
2016 .first()
2017 .into_singleton()
2018 .sample_eager(nondet!(/** test */))
2019 .send_bincode_external(&external);
2020
2021 let nodes = flow
2022 .with_process(&node, deployment.Localhost())
2023 .with_external(&external, deployment.Localhost())
2024 .deploy(&mut deployment);
2025
2026 deployment.deploy().await.unwrap();
2027
2028 let mut external_in = nodes.connect(input_port).await;
2029 let mut external_out = nodes.connect(out).await;
2030
2031 deployment.start().await.unwrap();
2032
2033 assert_eq!(
2034 external_out.next().await.unwrap(),
2035 std::collections::HashMap::new()
2036 );
2037
2038 external_in.send((1, 1)).await.unwrap();
2039 assert_eq!(
2040 external_out.next().await.unwrap(),
2041 vec![(1, 1)].into_iter().collect()
2042 );
2043
2044 external_in.send((2, 2)).await.unwrap();
2045 assert_eq!(
2046 external_out.next().await.unwrap(),
2047 vec![(1, 1), (2, 2)].into_iter().collect()
2048 );
2049 }
2050
2051 #[cfg(feature = "deploy")]
2052 #[tokio::test]
2053 async fn into_singleton_unbounded_value() {
2054 let mut deployment = Deployment::new();
2055
2056 let mut flow = FlowBuilder::new();
2057 let node = flow.process::<()>();
2058 let external = flow.external::<()>();
2059
2060 let (input_port, input) = node.source_external_bincode(&external);
2061 let out = input
2062 .into_keyed()
2063 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2064 .into_singleton()
2065 .sample_eager(nondet!(/** test */))
2066 .send_bincode_external(&external);
2067
2068 let nodes = flow
2069 .with_process(&node, deployment.Localhost())
2070 .with_external(&external, deployment.Localhost())
2071 .deploy(&mut deployment);
2072
2073 deployment.deploy().await.unwrap();
2074
2075 let mut external_in = nodes.connect(input_port).await;
2076 let mut external_out = nodes.connect(out).await;
2077
2078 deployment.start().await.unwrap();
2079
2080 assert_eq!(
2081 external_out.next().await.unwrap(),
2082 std::collections::HashMap::new()
2083 );
2084
2085 external_in.send((1, 1)).await.unwrap();
2086 assert_eq!(
2087 external_out.next().await.unwrap(),
2088 vec![(1, 1)].into_iter().collect()
2089 );
2090
2091 external_in.send((1, 2)).await.unwrap();
2092 assert_eq!(
2093 external_out.next().await.unwrap(),
2094 vec![(1, 2)].into_iter().collect()
2095 );
2096
2097 external_in.send((2, 2)).await.unwrap();
2098 assert_eq!(
2099 external_out.next().await.unwrap(),
2100 vec![(1, 2), (2, 1)].into_iter().collect()
2101 );
2102
2103 external_in.send((1, 1)).await.unwrap();
2104 assert_eq!(
2105 external_out.next().await.unwrap(),
2106 vec![(1, 3), (2, 1)].into_iter().collect()
2107 );
2108
2109 external_in.send((3, 1)).await.unwrap();
2110 assert_eq!(
2111 external_out.next().await.unwrap(),
2112 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
2113 );
2114 }
2115
2116 #[cfg(feature = "sim")]
2117 #[test]
2118 fn sim_unbounded_singleton_snapshot() {
2119 let mut flow = FlowBuilder::new();
2120 let node = flow.process::<()>();
2121
2122 let (input_port, input) = node.sim_input();
2123 let output = input
2124 .into_keyed()
2125 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2126 .snapshot(&node.tick(), nondet!(/** test */))
2127 .entries()
2128 .all_ticks()
2129 .sim_output();
2130
2131 let count = flow.sim().exhaustive(async || {
2132 input_port.send((1, 123));
2133 input_port.send((1, 456));
2134 input_port.send((2, 123));
2135
2136 let all = output.collect_sorted::<Vec<_>>().await;
2137 assert_eq!(all.last().unwrap(), &(2, 1));
2138 });
2139
2140 assert_eq!(count, 8);
2141 }
2142
2143 #[cfg(feature = "deploy")]
2144 #[tokio::test]
2145 async fn join_keyed_stream() {
2146 let mut deployment = Deployment::new();
2147
2148 let mut flow = FlowBuilder::new();
2149 let node = flow.process::<()>();
2150 let external = flow.external::<()>();
2151
2152 let tick = node.tick();
2153 let keyed_data = node
2154 .source_iter(q!(vec![(1, 10), (2, 20)]))
2155 .into_keyed()
2156 .batch(&tick, nondet!(/** test */))
2157 .first();
2158 let requests = node
2159 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
2160 .into_keyed()
2161 .batch(&tick, nondet!(/** test */));
2162
2163 let out = keyed_data
2164 .join_keyed_stream(requests)
2165 .entries()
2166 .all_ticks()
2167 .send_bincode_external(&external);
2168
2169 let nodes = flow
2170 .with_process(&node, deployment.Localhost())
2171 .with_external(&external, deployment.Localhost())
2172 .deploy(&mut deployment);
2173
2174 deployment.deploy().await.unwrap();
2175
2176 let mut external_out = nodes.connect(out).await;
2177
2178 deployment.start().await.unwrap();
2179
2180 let mut results = vec![];
2181 for _ in 0..2 {
2182 results.push(external_out.next().await.unwrap());
2183 }
2184 results.sort();
2185
2186 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
2187 }
2188
2189 #[cfg(feature = "sim")]
2190 #[test]
2191 fn threshold_greater_or_equal_monotonic() {
2192 let mut flow = FlowBuilder::new();
2193 let node = flow.process::<()>();
2194
2195 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2196 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2197
2198 // Create a monotonically increasing keyed singleton via fold with monotone proof
2199 let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2200 input.into_keyed().fold(
2201 q!(|| 0usize),
2202 q!(
2203 |acc, v| *acc += v,
2204 monotone = crate::properties::manual_proof!(/** += is monotonic */)
2205 ),
2206 );
2207
2208 // BoundedValue keyed singleton of thresholds (from .first() on unbounded stream)
2209 let thresholds = thresh_input.into_keyed().first();
2210
2211 let output = counts
2212 .threshold_greater_or_equal(thresholds)
2213 .entries()
2214 .sim_output();
2215
2216 let count = flow.sim().exhaustive(async || {
2217 // Set thresholds: key 1 needs value >= 5, key 2 needs value >= 10
2218 thresh_port.send((1, 5));
2219 thresh_port.send((2, 10));
2220
2221 // key 1 gets increments: 3 + 3 = 6, which is >= 5 ✓
2222 input_port.send((1, 3));
2223 input_port.send((1, 3));
2224 // key 2 gets increments: 3 + 3 = 6, which is < 10 ✗
2225 input_port.send((2, 3));
2226 input_port.send((2, 3));
2227
2228 let results = output.collect_sorted::<Vec<_>>().await;
2229 assert_eq!(results, vec![(1, 5)]);
2230 });
2231
2232 assert!(count > 0);
2233 }
2234
2235 #[cfg(feature = "sim")]
2236 #[test]
2237 fn threshold_greater_or_equal_uniform() {
2238 let mut flow = FlowBuilder::new();
2239 let node = flow.process::<()>();
2240
2241 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2242
2243 let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2244 input.into_keyed().fold(
2245 q!(|| 0usize),
2246 q!(
2247 |acc, v| *acc += v,
2248 monotone = crate::properties::manual_proof!(/** += is monotonic */)
2249 ),
2250 );
2251
2252 // Uniform threshold: all keys need value >= 5
2253 let threshold = node.singleton(q!(5usize));
2254
2255 let output = counts
2256 .threshold_greater_or_equal_uniform(threshold)
2257 .entries()
2258 .sim_output();
2259
2260 let count = flow.sim().exhaustive(async || {
2261 // key 1: 3 + 3 = 6 >= 5 ✓
2262 input_port.send((1, 3));
2263 input_port.send((1, 3));
2264 // key 2: 2 + 2 = 4 < 5 ✗
2265 input_port.send((2, 2));
2266 input_port.send((2, 2));
2267
2268 let results = output.collect_sorted::<Vec<_>>().await;
2269 assert_eq!(results, vec![(1, 5)]);
2270 });
2271
2272 assert!(count > 0);
2273 }
2274
2275 #[cfg(feature = "sim")]
2276 #[test]
2277 fn threshold_greater_or_equal_bounded_value() {
2278 let mut flow = FlowBuilder::new();
2279 let node = flow.process::<()>();
2280
2281 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2282 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2283
2284 // BoundedValue keyed singleton (values fixed once per key via .first())
2285 let values = input.into_keyed().first();
2286
2287 // BoundedValue keyed singleton of thresholds
2288 let thresholds = thresh_input.into_keyed().first();
2289
2290 let output = values
2291 .threshold_greater_or_equal(thresholds)
2292 .entries()
2293 .sim_output();
2294
2295 let count = flow.sim().exhaustive(async || {
2296 // Set thresholds: key 1 needs >= 3, key 2 needs >= 10
2297 thresh_port.send((1, 3));
2298 thresh_port.send((2, 10));
2299
2300 // key 1 gets value 5 >= 3 ✓, key 2 gets value 4 < 10 ✗
2301 input_port.send((1, 5));
2302 input_port.send((2, 4));
2303
2304 let results = output.collect_sorted::<Vec<_>>().await;
2305 assert_eq!(results, vec![(1, 3)]);
2306 });
2307
2308 assert!(count > 0);
2309 }
2310
2311 #[cfg(feature = "sim")]
2312 #[test]
2313 fn threshold_greater_or_equal_uniform_bounded_value() {
2314 let mut flow = FlowBuilder::new();
2315 let node = flow.process::<()>();
2316
2317 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2318
2319 // BoundedValue keyed singleton (values fixed once per key via .first())
2320 let values = input.into_keyed().first();
2321
2322 // Uniform threshold: all keys need value >= 5
2323 let threshold = node.singleton(q!(5usize));
2324
2325 let output = values
2326 .threshold_greater_or_equal_uniform(threshold)
2327 .entries()
2328 .sim_output();
2329
2330 let count = flow.sim().exhaustive(async || {
2331 // key 1 gets value 7 >= 5 ✓, key 2 gets value 3 < 5 ✗
2332 input_port.send((1, 7));
2333 input_port.send((2, 3));
2334
2335 let results = output.collect_sorted::<Vec<_>>().await;
2336 assert_eq!(results, vec![(1, 5)]);
2337 });
2338
2339 assert!(count > 0);
2340 }
2341
2342 #[cfg(feature = "sim")]
2343 #[test]
2344 fn threshold_greater_or_equal_bounded() {
2345 let mut flow = FlowBuilder::new();
2346 let node = flow.process::<()>();
2347
2348 // Bounded keyed singleton (fully known upfront)
2349 let values = node
2350 .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2351 .into_keyed()
2352 .first();
2353
2354 // BoundedValue thresholds (from async source)
2355 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2356 let thresholds = thresh_input.into_keyed().first();
2357
2358 let output = values
2359 .threshold_greater_or_equal(thresholds)
2360 .entries()
2361 .sim_output();
2362
2363 let count = flow.sim().exhaustive(async || {
2364 thresh_port.send((1, 5));
2365 thresh_port.send((2, 10));
2366
2367 // key 1: 6 >= 5 ✓, key 2: 4 < 10 ✗
2368 let results = output.collect_sorted::<Vec<_>>().await;
2369 assert_eq!(results, vec![(1, 5)]);
2370 });
2371
2372 assert!(count > 0);
2373 }
2374
2375 #[cfg(feature = "sim")]
2376 #[test]
2377 fn threshold_greater_or_equal_uniform_bounded() {
2378 let mut flow = FlowBuilder::new();
2379 let node = flow.process::<()>();
2380
2381 let values = node
2382 .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2383 .into_keyed()
2384 .first();
2385 let threshold = node.singleton(q!(5usize));
2386
2387 let output = values
2388 .threshold_greater_or_equal_uniform(threshold)
2389 .entries()
2390 .sim_output();
2391
2392 let count = flow.sim().exhaustive(async || {
2393 // key 1: 6 >= 5 ✓, key 2: 4 < 5 ✗
2394 let results = output.collect_sorted::<Vec<_>>().await;
2395 assert_eq!(results, vec![(1, 5)]);
2396 });
2397
2398 assert!(count > 0);
2399 }
2400}