hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41 type UnderlyingBound: Boundedness;
42 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43 type ValueBound: Boundedness;
44
45 /// The type of the keyed singleton if the value for each key is immutable.
46 type WithBoundedValue: KeyedSingletonBound<
47 UnderlyingBound = Self::UnderlyingBound,
48 ValueBound = Bounded,
49 EraseMonotonic = Self::WithBoundedValue,
50 >;
51
52 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55 /// The type of the keyed singleton if the value for each key is no longer monotonic.
56 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59 fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63 type UnderlyingBound = Unbounded;
64 type ValueBound = Unbounded;
65 type WithBoundedValue = BoundedValue;
66 type KeyedStreamToMonotone = MonotonicValue;
67 type EraseMonotonic = Unbounded;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Unbounded
71 }
72}
73
74impl KeyedSingletonBound for Bounded {
75 type UnderlyingBound = Bounded;
76 type ValueBound = Bounded;
77 type WithBoundedValue = Bounded;
78 type KeyedStreamToMonotone = Bounded;
79 type EraseMonotonic = Bounded;
80
81 fn bound_kind() -> KeyedSingletonBoundKind {
82 KeyedSingletonBoundKind::Bounded
83 }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91 type UnderlyingBound = Unbounded;
92 type ValueBound = Bounded;
93 type WithBoundedValue = BoundedValue;
94 type KeyedStreamToMonotone = BoundedValue;
95 type EraseMonotonic = BoundedValue;
96
97 fn bound_kind() -> KeyedSingletonBoundKind {
98 KeyedSingletonBoundKind::BoundedValue
99 }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107 type UnderlyingBound = Unbounded;
108 type ValueBound = Unbounded;
109 type WithBoundedValue = BoundedValue;
110 type KeyedStreamToMonotone = MonotonicValue;
111 type EraseMonotonic = Unbounded;
112
113 fn bound_kind() -> KeyedSingletonBoundKind {
114 KeyedSingletonBoundKind::MonotonicValue
115 }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131/// - [`Bounded`] (local and finite)
132/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135 pub(crate) location: Loc,
136 pub(crate) ir_node: RefCell<HydroNode>,
137 pub(crate) flow_state: FlowState,
138
139 _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143 fn drop(&mut self) {
144 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147 input: Box::new(ir_node),
148 op_metadata: HydroIrOpMetadata::new(),
149 });
150 }
151 }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155 for KeyedSingleton<K, V, Loc, Bound>
156{
157 fn clone(&self) -> Self {
158 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160 *self.ir_node.borrow_mut() = HydroNode::Tee {
161 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162 metadata: self.location.new_node_metadata(Self::collection_kind()),
163 };
164 }
165
166 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167 KeyedSingleton {
168 location: self.location.clone(),
169 flow_state: self.flow_state.clone(),
170 ir_node: HydroNode::Tee {
171 inner: SharedNode(inner.0.clone()),
172 metadata: metadata.clone(),
173 }
174 .into(),
175 _phantom: PhantomData,
176 }
177 } else {
178 unreachable!()
179 }
180 }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184 for KeyedSingleton<K, V, L, B>
185where
186 L: Location<'a> + NoTick,
187{
188 type Location = L;
189
190 fn create_source(cycle_id: CycleId, location: L) -> Self {
191 KeyedSingleton {
192 flow_state: location.flow_state().clone(),
193 location: location.clone(),
194 ir_node: RefCell::new(HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 }),
198 _phantom: PhantomData,
199 }
200 }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205 L: Location<'a>,
206{
207 type Location = Tick<L>;
208
209 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210 KeyedSingleton::new(
211 location.clone(),
212 HydroNode::CycleSource {
213 cycle_id,
214 metadata: location.new_node_metadata(Self::collection_kind()),
215 },
216 )
217 }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 KeyedSingleton::defer_tick(self)
226 }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230 for KeyedSingleton<K, V, L, B>
231where
232 L: Location<'a> + NoTick,
233{
234 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235 assert_eq!(
236 Location::id(&self.location),
237 expected_location,
238 "locations do not match"
239 );
240 self.location
241 .flow_state()
242 .borrow_mut()
243 .push_root(HydroRoot::CycleSink {
244 cycle_id,
245 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246 op_metadata: HydroIrOpMetadata::new(),
247 });
248 }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253 L: Location<'a>,
254{
255 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256 assert_eq!(
257 Location::id(&self.location),
258 expected_location,
259 "locations do not match"
260 );
261 self.location
262 .flow_state()
263 .borrow_mut()
264 .push_root(HydroRoot::CycleSink {
265 cycle_id,
266 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267 op_metadata: HydroIrOpMetadata::new(),
268 });
269 }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277 let flow_state = location.flow_state().clone();
278 KeyedSingleton {
279 location,
280 flow_state,
281 ir_node: RefCell::new(ir_node),
282 _phantom: PhantomData,
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed singleton is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290}
291
292#[cfg(stageleft_runtime)]
293fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
294 me: KeyedSingleton<K, V, L, Bounded>,
295) -> Singleton<usize, L, Bounded> {
296 me.entries().count()
297}
298
299#[cfg(stageleft_runtime)]
300fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
301 me: KeyedSingleton<K, V, L, Bounded>,
302) -> Singleton<HashMap<K, V>, L, Bounded>
303where
304 K: Eq + Hash,
305{
306 me.entries()
307 .assume_ordering(nondet!(
308 /// Because this is a keyed singleton, there is only one value per key.
309 ))
310 .fold(
311 q!(|| HashMap::new()),
312 q!(|map, (k, v)| {
313 map.insert(k, v);
314 }),
315 )
316}
317
318impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
319 pub(crate) fn collection_kind() -> CollectionKind {
320 CollectionKind::KeyedSingleton {
321 bound: B::bound_kind(),
322 key_type: stageleft::quote_type::<K>().into(),
323 value_type: stageleft::quote_type::<V>().into(),
324 }
325 }
326
327 /// Transforms each value by invoking `f` on each element, with keys staying the same
328 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
329 ///
330 /// If you do not want to modify the stream and instead only want to view
331 /// each item use [`KeyedSingleton::inspect`] instead.
332 ///
333 /// # Example
334 /// ```rust
335 /// # #[cfg(feature = "deploy")] {
336 /// # use hydro_lang::prelude::*;
337 /// # use futures::StreamExt;
338 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
339 /// let keyed_singleton = // { 1: 2, 2: 4 }
340 /// # process
341 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
342 /// # .into_keyed()
343 /// # .first();
344 /// keyed_singleton.map(q!(|v| v + 1))
345 /// # .entries()
346 /// # }, |mut stream| async move {
347 /// // { 1: 3, 2: 5 }
348 /// # let mut results = Vec::new();
349 /// # for _ in 0..2 {
350 /// # results.push(stream.next().await.unwrap());
351 /// # }
352 /// # results.sort();
353 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
354 /// # }));
355 /// # }
356 /// ```
357 pub fn map<U, F>(
358 self,
359 f: impl IntoQuotedMut<'a, F, L> + Copy,
360 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
361 where
362 F: Fn(V) -> U + 'a,
363 {
364 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
365 let map_f = q!({
366 let orig = f;
367 move |(k, v)| (k, orig(v))
368 })
369 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
370 .into();
371
372 KeyedSingleton::new(
373 self.location.clone(),
374 HydroNode::Map {
375 f: map_f,
376 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
377 metadata: self.location.new_node_metadata(KeyedSingleton::<
378 K,
379 U,
380 L,
381 B::EraseMonotonic,
382 >::collection_kind()),
383 },
384 )
385 }
386
387 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
388 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
389 ///
390 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
391 /// the new value `U`. The key remains unchanged in the output.
392 ///
393 /// # Example
394 /// ```rust
395 /// # #[cfg(feature = "deploy")] {
396 /// # use hydro_lang::prelude::*;
397 /// # use futures::StreamExt;
398 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
399 /// let keyed_singleton = // { 1: 2, 2: 4 }
400 /// # process
401 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
402 /// # .into_keyed()
403 /// # .first();
404 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
405 /// # .entries()
406 /// # }, |mut stream| async move {
407 /// // { 1: 3, 2: 6 }
408 /// # let mut results = Vec::new();
409 /// # for _ in 0..2 {
410 /// # results.push(stream.next().await.unwrap());
411 /// # }
412 /// # results.sort();
413 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
414 /// # }));
415 /// # }
416 /// ```
417 pub fn map_with_key<U, F>(
418 self,
419 f: impl IntoQuotedMut<'a, F, L> + Copy,
420 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
421 where
422 F: Fn((K, V)) -> U + 'a,
423 K: Clone,
424 {
425 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
426 let map_f = q!({
427 let orig = f;
428 move |(k, v)| {
429 let out = orig((Clone::clone(&k), v));
430 (k, out)
431 }
432 })
433 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
434 .into();
435
436 KeyedSingleton::new(
437 self.location.clone(),
438 HydroNode::Map {
439 f: map_f,
440 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
441 metadata: self.location.new_node_metadata(KeyedSingleton::<
442 K,
443 U,
444 L,
445 B::EraseMonotonic,
446 >::collection_kind()),
447 },
448 )
449 }
450
451 /// Gets the number of keys in the keyed singleton.
452 ///
453 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
454 /// since keys may be added / removed over time. When the set of keys changes, the count will
455 /// be asynchronously updated.
456 ///
457 /// # Example
458 /// ```rust
459 /// # #[cfg(feature = "deploy")] {
460 /// # use hydro_lang::prelude::*;
461 /// # use futures::StreamExt;
462 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
463 /// # let tick = process.tick();
464 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
465 /// # process
466 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
467 /// # .into_keyed()
468 /// # .batch(&tick, nondet!(/** test */))
469 /// # .first();
470 /// keyed_singleton.key_count()
471 /// # .all_ticks()
472 /// # }, |mut stream| async move {
473 /// // 3
474 /// # assert_eq!(stream.next().await.unwrap(), 3);
475 /// # }));
476 /// # }
477 /// ```
478 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
479 if B::ValueBound::BOUNDED {
480 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
481 location: self.location.clone(),
482 flow_state: self.flow_state.clone(),
483 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
484 _phantom: PhantomData,
485 };
486
487 me.entries().count().ignore_monotonic()
488 } else if L::is_top_level()
489 && let Some(tick) = self.location.try_tick()
490 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
491 {
492 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
493 self.location.clone(),
494 self.ir_node.replace(HydroNode::Placeholder),
495 );
496
497 let out =
498 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
499 .latest();
500 Singleton::new(
501 out.location.clone(),
502 out.ir_node.replace(HydroNode::Placeholder),
503 )
504 } else {
505 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
506 }
507 }
508
509 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
510 ///
511 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
512 /// asynchronously as well.
513 ///
514 /// # Example
515 /// ```rust
516 /// # #[cfg(feature = "deploy")] {
517 /// # use hydro_lang::prelude::*;
518 /// # use futures::StreamExt;
519 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
520 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
521 /// # process
522 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
523 /// # .into_keyed()
524 /// # .batch(&process.tick(), nondet!(/** test */))
525 /// # .first();
526 /// keyed_singleton.into_singleton()
527 /// # .all_ticks()
528 /// # }, |mut stream| async move {
529 /// // { 1: "a", 2: "b", 3: "c" }
530 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
531 /// # }));
532 /// # }
533 /// ```
534 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
535 where
536 K: Eq + Hash,
537 {
538 if B::ValueBound::BOUNDED {
539 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
540 location: self.location.clone(),
541 flow_state: self.flow_state.clone(),
542 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
543 _phantom: PhantomData,
544 };
545
546 me.entries()
547 .assume_ordering(nondet!(
548 /// Because this is a keyed singleton, there is only one value per key.
549 ))
550 .fold(
551 q!(|| HashMap::new()),
552 q!(|map, (k, v)| {
553 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
554 map.insert(k, v);
555 }),
556 )
557 } else if L::is_top_level()
558 && let Some(tick) = self.location.try_tick()
559 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
560 {
561 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
562 self.location.clone(),
563 self.ir_node.replace(HydroNode::Placeholder),
564 );
565
566 let out = into_singleton_inside_tick(
567 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
568 )
569 .latest();
570 Singleton::new(
571 out.location.clone(),
572 out.ir_node.replace(HydroNode::Placeholder),
573 )
574 } else {
575 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
576 }
577 }
578
579 /// An operator which allows you to "name" a `HydroNode`.
580 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
581 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
582 {
583 let mut node = self.ir_node.borrow_mut();
584 let metadata = node.metadata_mut();
585 metadata.tag = Some(name.to_owned());
586 }
587 self
588 }
589
590 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
591 /// implies that `B == Bounded`.
592 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
593 where
594 B: IsBounded,
595 {
596 KeyedSingleton::new(
597 self.location.clone(),
598 self.ir_node.replace(HydroNode::Placeholder),
599 )
600 }
601
602 /// Gets the value associated with a specific key from the keyed singleton.
603 /// Returns `None` if the key is `None` or there is no associated value.
604 ///
605 /// # Example
606 /// ```rust
607 /// # #[cfg(feature = "deploy")] {
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// let tick = process.tick();
612 /// let keyed_data = process
613 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
614 /// .into_keyed()
615 /// .batch(&tick, nondet!(/** test */))
616 /// .first();
617 /// let key = tick.singleton(q!(1));
618 /// keyed_data.get(key).all_ticks()
619 /// # }, |mut stream| async move {
620 /// // 2
621 /// # assert_eq!(stream.next().await.unwrap(), 2);
622 /// # }));
623 /// # }
624 /// ```
625 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
626 where
627 B: IsBounded,
628 K: Hash + Eq,
629 {
630 self.make_bounded()
631 .into_keyed_stream()
632 .get(key)
633 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
634 .first()
635 }
636
637 /// Emit a keyed stream containing keys shared between the keyed singleton and the
638 /// keyed stream, where each value in the output keyed stream is a tuple of
639 /// (the keyed singleton's value, the keyed stream's value).
640 ///
641 /// # Example
642 /// ```rust
643 /// # #[cfg(feature = "deploy")] {
644 /// # use hydro_lang::prelude::*;
645 /// # use futures::StreamExt;
646 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
647 /// let tick = process.tick();
648 /// let keyed_data = process
649 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
650 /// .into_keyed()
651 /// .batch(&tick, nondet!(/** test */))
652 /// .first();
653 /// let other_data = process
654 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
655 /// .into_keyed()
656 /// .batch(&tick, nondet!(/** test */));
657 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
658 /// # }, |mut stream| async move {
659 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
660 /// # let mut results = vec![];
661 /// # for _ in 0..3 {
662 /// # results.push(stream.next().await.unwrap());
663 /// # }
664 /// # results.sort();
665 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
666 /// # }));
667 /// # }
668 /// ```
669 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
670 self,
671 keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
672 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2>
673 where
674 B: IsBounded,
675 K: Eq + Hash,
676 {
677 self.make_bounded()
678 .entries()
679 .weaken_retries::<R2>()
680 .join(keyed_stream.entries())
681 .into_keyed()
682 }
683
684 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
685 /// where each value in the output keyed singleton is a tuple of
686 /// (self.value, other.value).
687 ///
688 /// # Example
689 /// ```rust
690 /// # #[cfg(feature = "deploy")] {
691 /// # use hydro_lang::prelude::*;
692 /// # use futures::StreamExt;
693 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
694 /// # let tick = process.tick();
695 /// let requests = // { 1: 10, 2: 20, 3: 30 }
696 /// # process
697 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
698 /// # .into_keyed()
699 /// # .batch(&tick, nondet!(/** test */))
700 /// # .first();
701 /// let other = // { 1: 100, 2: 200, 4: 400 }
702 /// # process
703 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
704 /// # .into_keyed()
705 /// # .batch(&tick, nondet!(/** test */))
706 /// # .first();
707 /// requests.join_keyed_singleton(other)
708 /// # .entries().all_ticks()
709 /// # }, |mut stream| async move {
710 /// // { 1: (10, 100), 2: (20, 200) }
711 /// # let mut results = vec![];
712 /// # for _ in 0..2 {
713 /// # results.push(stream.next().await.unwrap());
714 /// # }
715 /// # results.sort();
716 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
717 /// # }));
718 /// # }
719 /// ```
720 pub fn join_keyed_singleton<V2: Clone>(
721 self,
722 other: KeyedSingleton<K, V2, L, Bounded>,
723 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
724 where
725 B: IsBounded,
726 K: Eq + Hash,
727 {
728 let result_stream = self
729 .make_bounded()
730 .entries()
731 .join(other.entries())
732 .into_keyed();
733
734 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
735 KeyedSingleton::new(
736 result_stream.location.clone(),
737 HydroNode::Cast {
738 inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
739 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
740 K,
741 (V, V2),
742 L,
743 Bounded,
744 >::collection_kind(
745 )),
746 },
747 )
748 }
749
750 /// For each value in `self`, find the matching key in `lookup`.
751 /// The output is a keyed singleton with the key from `self`, and a value
752 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
753 /// If the key is not present in `lookup`, the option will be [`None`].
754 ///
755 /// # Example
756 /// ```rust
757 /// # #[cfg(feature = "deploy")] {
758 /// # use hydro_lang::prelude::*;
759 /// # use futures::StreamExt;
760 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
761 /// # let tick = process.tick();
762 /// let requests = // { 1: 10, 2: 20 }
763 /// # process
764 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
765 /// # .into_keyed()
766 /// # .batch(&tick, nondet!(/** test */))
767 /// # .first();
768 /// let other_data = // { 10: 100, 11: 110 }
769 /// # process
770 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
771 /// # .into_keyed()
772 /// # .batch(&tick, nondet!(/** test */))
773 /// # .first();
774 /// requests.lookup_keyed_singleton(other_data)
775 /// # .entries().all_ticks()
776 /// # }, |mut stream| async move {
777 /// // { 1: (10, Some(100)), 2: (20, None) }
778 /// # let mut results = vec![];
779 /// # for _ in 0..2 {
780 /// # results.push(stream.next().await.unwrap());
781 /// # }
782 /// # results.sort();
783 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
784 /// # }));
785 /// # }
786 /// ```
787 pub fn lookup_keyed_singleton<V2>(
788 self,
789 lookup: KeyedSingleton<V, V2, L, Bounded>,
790 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
791 where
792 B: IsBounded,
793 K: Eq + Hash + Clone,
794 V: Eq + Hash + Clone,
795 V2: Clone,
796 {
797 let result_stream = self
798 .make_bounded()
799 .into_keyed_stream()
800 .lookup_keyed_stream(lookup.into_keyed_stream());
801
802 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
803 KeyedSingleton::new(
804 result_stream.location.clone(),
805 HydroNode::Cast {
806 inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
807 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
808 K,
809 (V, Option<V2>),
810 L,
811 Bounded,
812 >::collection_kind(
813 )),
814 },
815 )
816 }
817
818 /// For each value in `self`, find the matching key in `lookup`.
819 /// The output is a keyed stream with the key from `self`, and a value
820 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
821 /// If the key is not present in `lookup`, the option will be [`None`].
822 ///
823 /// # Example
824 /// ```rust
825 /// # #[cfg(feature = "deploy")] {
826 /// # use hydro_lang::prelude::*;
827 /// # use futures::StreamExt;
828 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
829 /// # let tick = process.tick();
830 /// let requests = // { 1: 10, 2: 20 }
831 /// # process
832 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
833 /// # .into_keyed()
834 /// # .batch(&tick, nondet!(/** test */))
835 /// # .first();
836 /// let other_data = // { 10: 100, 10: 110 }
837 /// # process
838 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
839 /// # .into_keyed()
840 /// # .batch(&tick, nondet!(/** test */));
841 /// requests.lookup_keyed_stream(other_data)
842 /// # .entries().all_ticks()
843 /// # }, |mut stream| async move {
844 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
845 /// # let mut results = vec![];
846 /// # for _ in 0..3 {
847 /// # results.push(stream.next().await.unwrap());
848 /// # }
849 /// # results.sort();
850 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
851 /// # }));
852 /// # }
853 /// ```
854 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
855 self,
856 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
857 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
858 where
859 B: IsBounded,
860 K: Eq + Hash + Clone,
861 V: Eq + Hash + Clone,
862 V2: Clone,
863 {
864 self.make_bounded()
865 .entries()
866 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
867 .into_keyed()
868 .lookup_keyed_stream(lookup)
869 }
870}
871
872impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
873 KeyedSingleton<K, V, L, B>
874{
875 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
876 ///
877 /// The value for each key must be bounded, otherwise the resulting stream elements would be
878 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
879 /// into the output.
880 ///
881 /// # Example
882 /// ```rust
883 /// # #[cfg(feature = "deploy")] {
884 /// # use hydro_lang::prelude::*;
885 /// # use futures::StreamExt;
886 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887 /// let keyed_singleton = // { 1: 2, 2: 4 }
888 /// # process
889 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
890 /// # .into_keyed()
891 /// # .first();
892 /// keyed_singleton.entries()
893 /// # }, |mut stream| async move {
894 /// // (1, 2), (2, 4) in any order
895 /// # let mut results = Vec::new();
896 /// # for _ in 0..2 {
897 /// # results.push(stream.next().await.unwrap());
898 /// # }
899 /// # results.sort();
900 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
901 /// # }));
902 /// # }
903 /// ```
904 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
905 self.into_keyed_stream().entries()
906 }
907
908 /// Flattens the keyed singleton into an unordered stream of just the values.
909 ///
910 /// The value for each key must be bounded, otherwise the resulting stream elements would be
911 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
912 /// into the output.
913 ///
914 /// # Example
915 /// ```rust
916 /// # #[cfg(feature = "deploy")] {
917 /// # use hydro_lang::prelude::*;
918 /// # use futures::StreamExt;
919 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
920 /// let keyed_singleton = // { 1: 2, 2: 4 }
921 /// # process
922 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
923 /// # .into_keyed()
924 /// # .first();
925 /// keyed_singleton.values()
926 /// # }, |mut stream| async move {
927 /// // 2, 4 in any order
928 /// # let mut results = Vec::new();
929 /// # for _ in 0..2 {
930 /// # results.push(stream.next().await.unwrap());
931 /// # }
932 /// # results.sort();
933 /// # assert_eq!(results, vec![2, 4]);
934 /// # }));
935 /// # }
936 /// ```
937 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
938 let map_f = q!(|(_, v)| v)
939 .splice_fn1_ctx::<(K, V), V>(&self.location)
940 .into();
941
942 Stream::new(
943 self.location.clone(),
944 HydroNode::Map {
945 f: map_f,
946 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
947 metadata: self.location.new_node_metadata(Stream::<
948 V,
949 L,
950 B::UnderlyingBound,
951 NoOrder,
952 ExactlyOnce,
953 >::collection_kind()),
954 },
955 )
956 }
957
958 /// Flattens the keyed singleton into an unordered stream of just the keys.
959 ///
960 /// The value for each key must be bounded, otherwise the removal of keys would result in
961 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
962 /// into the output.
963 ///
964 /// # Example
965 /// ```rust
966 /// # #[cfg(feature = "deploy")] {
967 /// # use hydro_lang::prelude::*;
968 /// # use futures::StreamExt;
969 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
970 /// let keyed_singleton = // { 1: 2, 2: 4 }
971 /// # process
972 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
973 /// # .into_keyed()
974 /// # .first();
975 /// keyed_singleton.keys()
976 /// # }, |mut stream| async move {
977 /// // 1, 2 in any order
978 /// # let mut results = Vec::new();
979 /// # for _ in 0..2 {
980 /// # results.push(stream.next().await.unwrap());
981 /// # }
982 /// # results.sort();
983 /// # assert_eq!(results, vec![1, 2]);
984 /// # }));
985 /// # }
986 /// ```
987 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
988 self.entries().map(q!(|(k, _)| k))
989 }
990
991 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
992 /// entries whose keys are not in the provided stream.
993 ///
994 /// # Example
995 /// ```rust
996 /// # #[cfg(feature = "deploy")] {
997 /// # use hydro_lang::prelude::*;
998 /// # use futures::StreamExt;
999 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1000 /// let tick = process.tick();
1001 /// let keyed_singleton = // { 1: 2, 2: 4 }
1002 /// # process
1003 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1004 /// # .into_keyed()
1005 /// # .first()
1006 /// # .batch(&tick, nondet!(/** test */));
1007 /// let keys_to_remove = process
1008 /// .source_iter(q!(vec![1]))
1009 /// .batch(&tick, nondet!(/** test */));
1010 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1011 /// # .entries().all_ticks()
1012 /// # }, |mut stream| async move {
1013 /// // { 2: 4 }
1014 /// # for w in vec![(2, 4)] {
1015 /// # assert_eq!(stream.next().await.unwrap(), w);
1016 /// # }
1017 /// # }));
1018 /// # }
1019 /// ```
1020 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1021 self,
1022 other: Stream<K, L, Bounded, O2, R2>,
1023 ) -> Self
1024 where
1025 K: Hash + Eq,
1026 {
1027 check_matching_location(&self.location, &other.location);
1028
1029 KeyedSingleton::new(
1030 self.location.clone(),
1031 HydroNode::AntiJoin {
1032 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1033 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1034 metadata: self.location.new_node_metadata(Self::collection_kind()),
1035 },
1036 )
1037 }
1038
1039 /// An operator which allows you to "inspect" each value of a keyed singleton without
1040 /// modifying it. The closure `f` is called on a reference to each value. This is
1041 /// mainly useful for debugging, and should not be used to generate side-effects.
1042 ///
1043 /// # Example
1044 /// ```rust
1045 /// # #[cfg(feature = "deploy")] {
1046 /// # use hydro_lang::prelude::*;
1047 /// # use futures::StreamExt;
1048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049 /// let keyed_singleton = // { 1: 2, 2: 4 }
1050 /// # process
1051 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1052 /// # .into_keyed()
1053 /// # .first();
1054 /// keyed_singleton
1055 /// .inspect(q!(|v| println!("{}", v)))
1056 /// # .entries()
1057 /// # }, |mut stream| async move {
1058 /// // { 1: 2, 2: 4 }
1059 /// # for w in vec![(1, 2), (2, 4)] {
1060 /// # assert_eq!(stream.next().await.unwrap(), w);
1061 /// # }
1062 /// # }));
1063 /// # }
1064 /// ```
1065 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1066 where
1067 F: Fn(&V) + 'a,
1068 {
1069 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1070 let inspect_f = q!({
1071 let orig = f;
1072 move |t: &(_, _)| orig(&t.1)
1073 })
1074 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1075 .into();
1076
1077 KeyedSingleton::new(
1078 self.location.clone(),
1079 HydroNode::Inspect {
1080 f: inspect_f,
1081 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1082 metadata: self.location.new_node_metadata(Self::collection_kind()),
1083 },
1084 )
1085 }
1086
1087 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1088 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1089 /// mainly useful for debugging, and should not be used to generate side-effects.
1090 ///
1091 /// # Example
1092 /// ```rust
1093 /// # #[cfg(feature = "deploy")] {
1094 /// # use hydro_lang::prelude::*;
1095 /// # use futures::StreamExt;
1096 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1097 /// let keyed_singleton = // { 1: 2, 2: 4 }
1098 /// # process
1099 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1100 /// # .into_keyed()
1101 /// # .first();
1102 /// keyed_singleton
1103 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1104 /// # .entries()
1105 /// # }, |mut stream| async move {
1106 /// // { 1: 2, 2: 4 }
1107 /// # for w in vec![(1, 2), (2, 4)] {
1108 /// # assert_eq!(stream.next().await.unwrap(), w);
1109 /// # }
1110 /// # }));
1111 /// # }
1112 /// ```
1113 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1114 where
1115 F: Fn(&(K, V)) + 'a,
1116 {
1117 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1118
1119 KeyedSingleton::new(
1120 self.location.clone(),
1121 HydroNode::Inspect {
1122 f: inspect_f,
1123 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1124 metadata: self.location.new_node_metadata(Self::collection_kind()),
1125 },
1126 )
1127 }
1128
1129 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1130 ///
1131 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1132 /// asynchronously updated if a new key is added that is higher than the previous max key.
1133 ///
1134 /// # Example
1135 /// ```rust
1136 /// # #[cfg(feature = "deploy")] {
1137 /// # use hydro_lang::prelude::*;
1138 /// # use futures::StreamExt;
1139 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1140 /// let tick = process.tick();
1141 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1142 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1143 /// # .into_keyed()
1144 /// # .first();
1145 /// keyed_singleton.get_max_key()
1146 /// # .sample_eager(nondet!(/** test */))
1147 /// # }, |mut stream| async move {
1148 /// // (2, 456)
1149 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1150 /// # }));
1151 /// # }
1152 /// ```
1153 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1154 where
1155 K: Ord,
1156 {
1157 self.entries()
1158 .assume_ordering_trusted(nondet!(
1159 /// There is only one element associated with each key, and the keys are totallly
1160 /// ordered so we will produce a deterministic value. The closure technically
1161 /// isn't commutative in the case where both passed entries have the same key
1162 /// but different values.
1163 ///
1164 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1165 /// the two inputs do not have the same key.
1166 ))
1167 .reduce(q!(
1168 move |curr, new| {
1169 if new.0 > curr.0 {
1170 *curr = new;
1171 }
1172 },
1173 idempotent = manual_proof!(/** repeated elements are ignored */)
1174 ))
1175 }
1176
1177 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1178 /// element, the value.
1179 ///
1180 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1181 ///
1182 /// # Example
1183 /// ```rust
1184 /// # #[cfg(feature = "deploy")] {
1185 /// # use hydro_lang::prelude::*;
1186 /// # use futures::StreamExt;
1187 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1188 /// let keyed_singleton = // { 1: 2, 2: 4 }
1189 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1190 /// # .into_keyed()
1191 /// # .first();
1192 /// keyed_singleton
1193 /// .clone()
1194 /// .into_keyed_stream()
1195 /// .merge_unordered(
1196 /// keyed_singleton.into_keyed_stream()
1197 /// )
1198 /// # .entries()
1199 /// # }, |mut stream| async move {
1200 /// /// // { 1: [2, 2], 2: [4, 4] }
1201 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1202 /// # assert_eq!(stream.next().await.unwrap(), w);
1203 /// # }
1204 /// # }));
1205 /// # }
1206 /// ```
1207 pub fn into_keyed_stream(
1208 self,
1209 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1210 KeyedStream::new(
1211 self.location.clone(),
1212 HydroNode::Cast {
1213 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214 metadata: self.location.new_node_metadata(KeyedStream::<
1215 K,
1216 V,
1217 L,
1218 B::UnderlyingBound,
1219 TotalOrder,
1220 ExactlyOnce,
1221 >::collection_kind()),
1222 },
1223 )
1224 }
1225}
1226
1227impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1228where
1229 L: Location<'a>,
1230{
1231 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1232 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1233 ///
1234 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1235 /// processed before an acknowledgement is emitted.
1236 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1237 let id = self.location.flow_state().borrow_mut().next_clock_id();
1238 let out_location = Atomic {
1239 tick: Tick {
1240 id,
1241 l: self.location.clone(),
1242 },
1243 };
1244 KeyedSingleton::new(
1245 out_location.clone(),
1246 HydroNode::BeginAtomic {
1247 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1248 metadata: out_location
1249 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1250 },
1251 )
1252 }
1253}
1254
1255impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1256where
1257 L: Location<'a> + NoTick,
1258{
1259 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1260 /// See [`KeyedSingleton::atomic`] for more details.
1261 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1262 KeyedSingleton::new(
1263 self.location.tick.l.clone(),
1264 HydroNode::EndAtomic {
1265 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1266 metadata: self
1267 .location
1268 .tick
1269 .l
1270 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1271 },
1272 )
1273 }
1274}
1275
1276impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1277 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1278 /// tick `T` always has the entries of `self` at tick `T - 1`.
1279 ///
1280 /// At tick `0`, the output has no entries, since there is no previous tick.
1281 ///
1282 /// This operator enables stateful iterative processing with ticks, by sending data from one
1283 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1284 ///
1285 /// # Example
1286 /// ```rust
1287 /// # #[cfg(feature = "deploy")] {
1288 /// # use hydro_lang::prelude::*;
1289 /// # use futures::StreamExt;
1290 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1291 /// let tick = process.tick();
1292 /// # // ticks are lazy by default, forces the second tick to run
1293 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1294 /// # let batch_first_tick = process
1295 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1296 /// # .batch(&tick, nondet!(/** test */))
1297 /// # .into_keyed();
1298 /// # let batch_second_tick = process
1299 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1300 /// # .batch(&tick, nondet!(/** test */))
1301 /// # .into_keyed()
1302 /// # .defer_tick(); // appears on the second tick
1303 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1304 /// # batch_first_tick.chain(batch_second_tick).first();
1305 /// input_batch.clone().filter_key_not_in(
1306 /// input_batch.defer_tick().keys() // keys present in the previous tick
1307 /// )
1308 /// # .entries().all_ticks()
1309 /// # }, |mut stream| async move {
1310 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1311 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1312 /// # assert_eq!(stream.next().await.unwrap(), w);
1313 /// # }
1314 /// # }));
1315 /// # }
1316 /// ```
1317 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1318 KeyedSingleton::new(
1319 self.location.clone(),
1320 HydroNode::DeferTick {
1321 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1322 metadata: self
1323 .location
1324 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1325 },
1326 )
1327 }
1328}
1329
1330impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1331where
1332 L: Location<'a>,
1333{
1334 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1335 /// point in time.
1336 ///
1337 /// # Non-Determinism
1338 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1339 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1340 pub fn snapshot(
1341 self,
1342 tick: &Tick<L>,
1343 _nondet: NonDet,
1344 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1345 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1346 KeyedSingleton::new(
1347 tick.clone(),
1348 HydroNode::Batch {
1349 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1350 metadata: tick
1351 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1352 },
1353 )
1354 }
1355}
1356
1357impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1358where
1359 L: Location<'a> + NoTick,
1360{
1361 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1362 /// state of the keyed singleton being atomically processed.
1363 ///
1364 /// # Non-Determinism
1365 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1366 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1367 pub fn snapshot_atomic(
1368 self,
1369 tick: &Tick<L>,
1370 _nondet: NonDet,
1371 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1372 KeyedSingleton::new(
1373 tick.clone(),
1374 HydroNode::Batch {
1375 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1376 metadata: tick
1377 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1378 },
1379 )
1380 }
1381}
1382
1383impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1384where
1385 L: Location<'a>,
1386{
1387 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1388 ///
1389 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1390 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1391 /// is filtered out.
1392 ///
1393 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1394 /// not modify or take ownership of the values. If you need to modify the values while filtering
1395 /// use [`KeyedSingleton::filter_map`] instead.
1396 ///
1397 /// # Example
1398 /// ```rust
1399 /// # #[cfg(feature = "deploy")] {
1400 /// # use hydro_lang::prelude::*;
1401 /// # use futures::StreamExt;
1402 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1403 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1404 /// # process
1405 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1406 /// # .into_keyed()
1407 /// # .first();
1408 /// keyed_singleton.filter(q!(|&v| v > 1))
1409 /// # .entries()
1410 /// # }, |mut stream| async move {
1411 /// // { 1: 2, 2: 4 }
1412 /// # let mut results = Vec::new();
1413 /// # for _ in 0..2 {
1414 /// # results.push(stream.next().await.unwrap());
1415 /// # }
1416 /// # results.sort();
1417 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1418 /// # }));
1419 /// # }
1420 /// ```
1421 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1422 where
1423 F: Fn(&V) -> bool + 'a,
1424 {
1425 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1426 let filter_f = q!({
1427 let orig = f;
1428 move |t: &(_, _)| orig(&t.1)
1429 })
1430 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1431 .into();
1432
1433 KeyedSingleton::new(
1434 self.location.clone(),
1435 HydroNode::Filter {
1436 f: filter_f,
1437 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1438 metadata: self
1439 .location
1440 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1441 },
1442 )
1443 }
1444
1445 /// An operator that both filters and maps values. It yields only the key-value pairs where
1446 /// the supplied closure `f` returns `Some(value)`.
1447 ///
1448 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1449 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1450 /// If it returns `None`, the key-value pair is filtered out.
1451 ///
1452 /// # Example
1453 /// ```rust
1454 /// # #[cfg(feature = "deploy")] {
1455 /// # use hydro_lang::prelude::*;
1456 /// # use futures::StreamExt;
1457 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1458 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1459 /// # process
1460 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1461 /// # .into_keyed()
1462 /// # .first();
1463 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1464 /// # .entries()
1465 /// # }, |mut stream| async move {
1466 /// // { 1: 42, 3: 100 }
1467 /// # let mut results = Vec::new();
1468 /// # for _ in 0..2 {
1469 /// # results.push(stream.next().await.unwrap());
1470 /// # }
1471 /// # results.sort();
1472 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1473 /// # }));
1474 /// # }
1475 /// ```
1476 pub fn filter_map<F, U>(
1477 self,
1478 f: impl IntoQuotedMut<'a, F, L> + Copy,
1479 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1480 where
1481 F: Fn(V) -> Option<U> + 'a,
1482 {
1483 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1484 let filter_map_f = q!({
1485 let orig = f;
1486 move |(k, v)| orig(v).map(|o| (k, o))
1487 })
1488 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1489 .into();
1490
1491 KeyedSingleton::new(
1492 self.location.clone(),
1493 HydroNode::FilterMap {
1494 f: filter_map_f,
1495 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1496 metadata: self.location.new_node_metadata(KeyedSingleton::<
1497 K,
1498 U,
1499 L,
1500 B::EraseMonotonic,
1501 >::collection_kind()),
1502 },
1503 )
1504 }
1505
1506 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1507 /// arrived since the previous batch was released.
1508 ///
1509 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1510 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1511 ///
1512 /// # Non-Determinism
1513 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1514 /// has a non-deterministic set of key-value pairs.
1515 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1516 where
1517 L: NoTick,
1518 {
1519 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1520 KeyedSingleton::new(
1521 tick.clone(),
1522 HydroNode::Batch {
1523 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1524 metadata: tick
1525 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1526 },
1527 )
1528 }
1529}
1530
1531impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1532where
1533 L: Location<'a> + NoTick,
1534{
1535 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1536 /// atomically processed.
1537 ///
1538 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1539 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1540 ///
1541 /// # Non-Determinism
1542 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1543 /// has a non-deterministic set of key-value pairs.
1544 pub fn batch_atomic(
1545 self,
1546 tick: &Tick<L>,
1547 nondet: NonDet,
1548 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1549 let _ = nondet;
1550 KeyedSingleton::new(
1551 tick.clone(),
1552 HydroNode::Batch {
1553 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1554 metadata: tick
1555 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1556 },
1557 )
1558 }
1559}
1560
1561#[cfg(test)]
1562mod tests {
1563 #[cfg(feature = "deploy")]
1564 use futures::{SinkExt, StreamExt};
1565 #[cfg(feature = "deploy")]
1566 use hydro_deploy::Deployment;
1567 #[cfg(any(feature = "deploy", feature = "sim"))]
1568 use stageleft::q;
1569
1570 #[cfg(any(feature = "deploy", feature = "sim"))]
1571 use crate::compile::builder::FlowBuilder;
1572 #[cfg(any(feature = "deploy", feature = "sim"))]
1573 use crate::location::Location;
1574 #[cfg(any(feature = "deploy", feature = "sim"))]
1575 use crate::nondet::nondet;
1576
1577 #[cfg(feature = "deploy")]
1578 #[tokio::test]
1579 async fn key_count_bounded_value() {
1580 let mut deployment = Deployment::new();
1581
1582 let mut flow = FlowBuilder::new();
1583 let node = flow.process::<()>();
1584 let external = flow.external::<()>();
1585
1586 let (input_port, input) = node.source_external_bincode(&external);
1587 let out = input
1588 .into_keyed()
1589 .first()
1590 .key_count()
1591 .sample_eager(nondet!(/** test */))
1592 .send_bincode_external(&external);
1593
1594 let nodes = flow
1595 .with_process(&node, deployment.Localhost())
1596 .with_external(&external, deployment.Localhost())
1597 .deploy(&mut deployment);
1598
1599 deployment.deploy().await.unwrap();
1600
1601 let mut external_in = nodes.connect(input_port).await;
1602 let mut external_out = nodes.connect(out).await;
1603
1604 deployment.start().await.unwrap();
1605
1606 assert_eq!(external_out.next().await.unwrap(), 0);
1607
1608 external_in.send((1, 1)).await.unwrap();
1609 assert_eq!(external_out.next().await.unwrap(), 1);
1610
1611 external_in.send((2, 2)).await.unwrap();
1612 assert_eq!(external_out.next().await.unwrap(), 2);
1613 }
1614
1615 #[cfg(feature = "deploy")]
1616 #[tokio::test]
1617 async fn key_count_unbounded_value() {
1618 let mut deployment = Deployment::new();
1619
1620 let mut flow = FlowBuilder::new();
1621 let node = flow.process::<()>();
1622 let external = flow.external::<()>();
1623
1624 let (input_port, input) = node.source_external_bincode(&external);
1625 let out = input
1626 .into_keyed()
1627 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1628 .key_count()
1629 .sample_eager(nondet!(/** test */))
1630 .send_bincode_external(&external);
1631
1632 let nodes = flow
1633 .with_process(&node, deployment.Localhost())
1634 .with_external(&external, deployment.Localhost())
1635 .deploy(&mut deployment);
1636
1637 deployment.deploy().await.unwrap();
1638
1639 let mut external_in = nodes.connect(input_port).await;
1640 let mut external_out = nodes.connect(out).await;
1641
1642 deployment.start().await.unwrap();
1643
1644 assert_eq!(external_out.next().await.unwrap(), 0);
1645
1646 external_in.send((1, 1)).await.unwrap();
1647 assert_eq!(external_out.next().await.unwrap(), 1);
1648
1649 external_in.send((1, 2)).await.unwrap();
1650 assert_eq!(external_out.next().await.unwrap(), 1);
1651
1652 external_in.send((2, 2)).await.unwrap();
1653 assert_eq!(external_out.next().await.unwrap(), 2);
1654
1655 external_in.send((1, 1)).await.unwrap();
1656 assert_eq!(external_out.next().await.unwrap(), 2);
1657
1658 external_in.send((3, 1)).await.unwrap();
1659 assert_eq!(external_out.next().await.unwrap(), 3);
1660 }
1661
1662 #[cfg(feature = "deploy")]
1663 #[tokio::test]
1664 async fn into_singleton_bounded_value() {
1665 let mut deployment = Deployment::new();
1666
1667 let mut flow = FlowBuilder::new();
1668 let node = flow.process::<()>();
1669 let external = flow.external::<()>();
1670
1671 let (input_port, input) = node.source_external_bincode(&external);
1672 let out = input
1673 .into_keyed()
1674 .first()
1675 .into_singleton()
1676 .sample_eager(nondet!(/** test */))
1677 .send_bincode_external(&external);
1678
1679 let nodes = flow
1680 .with_process(&node, deployment.Localhost())
1681 .with_external(&external, deployment.Localhost())
1682 .deploy(&mut deployment);
1683
1684 deployment.deploy().await.unwrap();
1685
1686 let mut external_in = nodes.connect(input_port).await;
1687 let mut external_out = nodes.connect(out).await;
1688
1689 deployment.start().await.unwrap();
1690
1691 assert_eq!(
1692 external_out.next().await.unwrap(),
1693 std::collections::HashMap::new()
1694 );
1695
1696 external_in.send((1, 1)).await.unwrap();
1697 assert_eq!(
1698 external_out.next().await.unwrap(),
1699 vec![(1, 1)].into_iter().collect()
1700 );
1701
1702 external_in.send((2, 2)).await.unwrap();
1703 assert_eq!(
1704 external_out.next().await.unwrap(),
1705 vec![(1, 1), (2, 2)].into_iter().collect()
1706 );
1707 }
1708
1709 #[cfg(feature = "deploy")]
1710 #[tokio::test]
1711 async fn into_singleton_unbounded_value() {
1712 let mut deployment = Deployment::new();
1713
1714 let mut flow = FlowBuilder::new();
1715 let node = flow.process::<()>();
1716 let external = flow.external::<()>();
1717
1718 let (input_port, input) = node.source_external_bincode(&external);
1719 let out = input
1720 .into_keyed()
1721 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1722 .into_singleton()
1723 .sample_eager(nondet!(/** test */))
1724 .send_bincode_external(&external);
1725
1726 let nodes = flow
1727 .with_process(&node, deployment.Localhost())
1728 .with_external(&external, deployment.Localhost())
1729 .deploy(&mut deployment);
1730
1731 deployment.deploy().await.unwrap();
1732
1733 let mut external_in = nodes.connect(input_port).await;
1734 let mut external_out = nodes.connect(out).await;
1735
1736 deployment.start().await.unwrap();
1737
1738 assert_eq!(
1739 external_out.next().await.unwrap(),
1740 std::collections::HashMap::new()
1741 );
1742
1743 external_in.send((1, 1)).await.unwrap();
1744 assert_eq!(
1745 external_out.next().await.unwrap(),
1746 vec![(1, 1)].into_iter().collect()
1747 );
1748
1749 external_in.send((1, 2)).await.unwrap();
1750 assert_eq!(
1751 external_out.next().await.unwrap(),
1752 vec![(1, 2)].into_iter().collect()
1753 );
1754
1755 external_in.send((2, 2)).await.unwrap();
1756 assert_eq!(
1757 external_out.next().await.unwrap(),
1758 vec![(1, 2), (2, 1)].into_iter().collect()
1759 );
1760
1761 external_in.send((1, 1)).await.unwrap();
1762 assert_eq!(
1763 external_out.next().await.unwrap(),
1764 vec![(1, 3), (2, 1)].into_iter().collect()
1765 );
1766
1767 external_in.send((3, 1)).await.unwrap();
1768 assert_eq!(
1769 external_out.next().await.unwrap(),
1770 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1771 );
1772 }
1773
1774 #[cfg(feature = "sim")]
1775 #[test]
1776 fn sim_unbounded_singleton_snapshot() {
1777 let mut flow = FlowBuilder::new();
1778 let node = flow.process::<()>();
1779
1780 let (input_port, input) = node.sim_input();
1781 let output = input
1782 .into_keyed()
1783 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1784 .snapshot(&node.tick(), nondet!(/** test */))
1785 .entries()
1786 .all_ticks()
1787 .sim_output();
1788
1789 let count = flow.sim().exhaustive(async || {
1790 input_port.send((1, 123));
1791 input_port.send((1, 456));
1792 input_port.send((2, 123));
1793
1794 let all = output.collect_sorted::<Vec<_>>().await;
1795 assert_eq!(all.last().unwrap(), &(2, 1));
1796 });
1797
1798 assert_eq!(count, 8);
1799 }
1800
1801 #[cfg(feature = "deploy")]
1802 #[tokio::test]
1803 async fn join_keyed_stream() {
1804 let mut deployment = Deployment::new();
1805
1806 let mut flow = FlowBuilder::new();
1807 let node = flow.process::<()>();
1808 let external = flow.external::<()>();
1809
1810 let tick = node.tick();
1811 let keyed_data = node
1812 .source_iter(q!(vec![(1, 10), (2, 20)]))
1813 .into_keyed()
1814 .batch(&tick, nondet!(/** test */))
1815 .first();
1816 let requests = node
1817 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1818 .into_keyed()
1819 .batch(&tick, nondet!(/** test */));
1820
1821 let out = keyed_data
1822 .join_keyed_stream(requests)
1823 .entries()
1824 .all_ticks()
1825 .send_bincode_external(&external);
1826
1827 let nodes = flow
1828 .with_process(&node, deployment.Localhost())
1829 .with_external(&external, deployment.Localhost())
1830 .deploy(&mut deployment);
1831
1832 deployment.deploy().await.unwrap();
1833
1834 let mut external_out = nodes.connect(out).await;
1835
1836 deployment.start().await.unwrap();
1837
1838 let mut results = vec![];
1839 for _ in 0..2 {
1840 results.push(external_out.next().await.unwrap());
1841 }
1842 results.sort();
1843
1844 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1845 }
1846}