hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41 type UnderlyingBound: Boundedness;
42 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43 type ValueBound: Boundedness;
44
45 /// The type of the keyed singleton if the value for each key is immutable.
46 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
47
48 /// The type of the keyed singleton if the value for each key may change asynchronously.
49 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
50
51 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
52 fn bound_kind() -> KeyedSingletonBoundKind;
53}
54
55impl KeyedSingletonBound for Unbounded {
56 type UnderlyingBound = Unbounded;
57 type ValueBound = Unbounded;
58 type WithBoundedValue = BoundedValue;
59 type WithUnboundedValue = Unbounded;
60
61 fn bound_kind() -> KeyedSingletonBoundKind {
62 KeyedSingletonBoundKind::Unbounded
63 }
64}
65
66impl KeyedSingletonBound for Bounded {
67 type UnderlyingBound = Bounded;
68 type ValueBound = Bounded;
69 type WithBoundedValue = Bounded;
70 type WithUnboundedValue = UnreachableBound;
71
72 fn bound_kind() -> KeyedSingletonBoundKind {
73 KeyedSingletonBoundKind::Bounded
74 }
75}
76
77/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
78/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
79/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
80pub struct BoundedValue;
81
82impl KeyedSingletonBound for BoundedValue {
83 type UnderlyingBound = Unbounded;
84 type ValueBound = Bounded;
85 type WithBoundedValue = BoundedValue;
86 type WithUnboundedValue = Unbounded;
87
88 fn bound_kind() -> KeyedSingletonBoundKind {
89 KeyedSingletonBoundKind::BoundedValue
90 }
91}
92
93#[doc(hidden)]
94pub struct UnreachableBound;
95
96impl KeyedSingletonBound for UnreachableBound {
97 type UnderlyingBound = Bounded;
98 type ValueBound = Unbounded;
99
100 type WithBoundedValue = Bounded;
101 type WithUnboundedValue = UnreachableBound;
102
103 fn bound_kind() -> KeyedSingletonBoundKind {
104 unreachable!("UnreachableBound cannot be instantiated")
105 }
106}
107
108/// Mapping from keys of type `K` to values of type `V`.
109///
110/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
111/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
112/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
113/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
114/// keys cannot be removed and the value for each key is immutable.
115///
116/// Type Parameters:
117/// - `K`: the type of the key for each entry
118/// - `V`: the type of the value for each entry
119/// - `Loc`: the [`Location`] where the keyed singleton is materialized
120/// - `Bound`: tracks whether the entries are:
121/// - [`Bounded`] (local and finite)
122/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
123/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
124pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
125 pub(crate) location: Loc,
126 pub(crate) ir_node: RefCell<HydroNode>,
127 pub(crate) flow_state: FlowState,
128
129 _phantom: PhantomData<(K, V, Loc, Bound)>,
130}
131
132impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
133 fn drop(&mut self) {
134 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
135 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
136 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
137 input: Box::new(ir_node),
138 op_metadata: HydroIrOpMetadata::new(),
139 });
140 }
141 }
142}
143
144impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
145 for KeyedSingleton<K, V, Loc, Bound>
146{
147 fn clone(&self) -> Self {
148 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
149 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
150 *self.ir_node.borrow_mut() = HydroNode::Tee {
151 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
152 metadata: self.location.new_node_metadata(Self::collection_kind()),
153 };
154 }
155
156 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
157 KeyedSingleton {
158 location: self.location.clone(),
159 flow_state: self.flow_state.clone(),
160 ir_node: HydroNode::Tee {
161 inner: SharedNode(inner.0.clone()),
162 metadata: metadata.clone(),
163 }
164 .into(),
165 _phantom: PhantomData,
166 }
167 } else {
168 unreachable!()
169 }
170 }
171}
172
173impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
174 for KeyedSingleton<K, V, L, B>
175where
176 L: Location<'a> + NoTick,
177{
178 type Location = L;
179
180 fn create_source(cycle_id: CycleId, location: L) -> Self {
181 KeyedSingleton {
182 flow_state: location.flow_state().clone(),
183 location: location.clone(),
184 ir_node: RefCell::new(HydroNode::CycleSource {
185 cycle_id,
186 metadata: location.new_node_metadata(Self::collection_kind()),
187 }),
188 _phantom: PhantomData,
189 }
190 }
191}
192
193impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
194where
195 L: Location<'a>,
196{
197 type Location = Tick<L>;
198
199 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
200 KeyedSingleton::new(
201 location.clone(),
202 HydroNode::CycleSource {
203 cycle_id,
204 metadata: location.new_node_metadata(Self::collection_kind()),
205 },
206 )
207 }
208}
209
210impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
211where
212 L: Location<'a>,
213{
214 fn defer_tick(self) -> Self {
215 KeyedSingleton::defer_tick(self)
216 }
217}
218
219impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
220 for KeyedSingleton<K, V, L, B>
221where
222 L: Location<'a> + NoTick,
223{
224 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
225 assert_eq!(
226 Location::id(&self.location),
227 expected_location,
228 "locations do not match"
229 );
230 self.location
231 .flow_state()
232 .borrow_mut()
233 .push_root(HydroRoot::CycleSink {
234 cycle_id,
235 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
236 op_metadata: HydroIrOpMetadata::new(),
237 });
238 }
239}
240
241impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
242where
243 L: Location<'a>,
244{
245 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
246 assert_eq!(
247 Location::id(&self.location),
248 expected_location,
249 "locations do not match"
250 );
251 self.location
252 .flow_state()
253 .borrow_mut()
254 .push_root(HydroRoot::CycleSink {
255 cycle_id,
256 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
257 op_metadata: HydroIrOpMetadata::new(),
258 });
259 }
260}
261
262impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
263 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
264 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
265 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
266
267 let flow_state = location.flow_state().clone();
268 KeyedSingleton {
269 location,
270 flow_state,
271 ir_node: RefCell::new(ir_node),
272 _phantom: PhantomData,
273 }
274 }
275
276 /// Returns the [`Location`] where this keyed singleton is being materialized.
277 pub fn location(&self) -> &L {
278 &self.location
279 }
280}
281
282#[cfg(stageleft_runtime)]
283fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
284 me: KeyedSingleton<K, V, L, Bounded>,
285) -> Singleton<usize, L, Bounded> {
286 me.entries().count()
287}
288
289#[cfg(stageleft_runtime)]
290fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
291 me: KeyedSingleton<K, V, L, Bounded>,
292) -> Singleton<HashMap<K, V>, L, Bounded>
293where
294 K: Eq + Hash,
295{
296 me.entries()
297 .assume_ordering(nondet!(
298 /// Because this is a keyed singleton, there is only one value per key.
299 ))
300 .fold(
301 q!(|| HashMap::new()),
302 q!(|map, (k, v)| {
303 map.insert(k, v);
304 }),
305 )
306}
307
308impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
309 pub(crate) fn collection_kind() -> CollectionKind {
310 CollectionKind::KeyedSingleton {
311 bound: B::bound_kind(),
312 key_type: stageleft::quote_type::<K>().into(),
313 value_type: stageleft::quote_type::<V>().into(),
314 }
315 }
316
317 /// Transforms each value by invoking `f` on each element, with keys staying the same
318 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
319 ///
320 /// If you do not want to modify the stream and instead only want to view
321 /// each item use [`KeyedSingleton::inspect`] instead.
322 ///
323 /// # Example
324 /// ```rust
325 /// # #[cfg(feature = "deploy")] {
326 /// # use hydro_lang::prelude::*;
327 /// # use futures::StreamExt;
328 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
329 /// let keyed_singleton = // { 1: 2, 2: 4 }
330 /// # process
331 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
332 /// # .into_keyed()
333 /// # .first();
334 /// keyed_singleton.map(q!(|v| v + 1))
335 /// # .entries()
336 /// # }, |mut stream| async move {
337 /// // { 1: 3, 2: 5 }
338 /// # let mut results = Vec::new();
339 /// # for _ in 0..2 {
340 /// # results.push(stream.next().await.unwrap());
341 /// # }
342 /// # results.sort();
343 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
344 /// # }));
345 /// # }
346 /// ```
347 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
348 where
349 F: Fn(V) -> U + 'a,
350 {
351 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
352 let map_f = q!({
353 let orig = f;
354 move |(k, v)| (k, orig(v))
355 })
356 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
357 .into();
358
359 KeyedSingleton::new(
360 self.location.clone(),
361 HydroNode::Map {
362 f: map_f,
363 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
364 metadata: self
365 .location
366 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
367 },
368 )
369 }
370
371 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
372 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
373 ///
374 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
375 /// the new value `U`. The key remains unchanged in the output.
376 ///
377 /// # Example
378 /// ```rust
379 /// # #[cfg(feature = "deploy")] {
380 /// # use hydro_lang::prelude::*;
381 /// # use futures::StreamExt;
382 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383 /// let keyed_singleton = // { 1: 2, 2: 4 }
384 /// # process
385 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
386 /// # .into_keyed()
387 /// # .first();
388 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
389 /// # .entries()
390 /// # }, |mut stream| async move {
391 /// // { 1: 3, 2: 6 }
392 /// # let mut results = Vec::new();
393 /// # for _ in 0..2 {
394 /// # results.push(stream.next().await.unwrap());
395 /// # }
396 /// # results.sort();
397 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
398 /// # }));
399 /// # }
400 /// ```
401 pub fn map_with_key<U, F>(
402 self,
403 f: impl IntoQuotedMut<'a, F, L> + Copy,
404 ) -> KeyedSingleton<K, U, L, B>
405 where
406 F: Fn((K, V)) -> U + 'a,
407 K: Clone,
408 {
409 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
410 let map_f = q!({
411 let orig = f;
412 move |(k, v)| {
413 let out = orig((Clone::clone(&k), v));
414 (k, out)
415 }
416 })
417 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
418 .into();
419
420 KeyedSingleton::new(
421 self.location.clone(),
422 HydroNode::Map {
423 f: map_f,
424 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
425 metadata: self
426 .location
427 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
428 },
429 )
430 }
431
432 /// Gets the number of keys in the keyed singleton.
433 ///
434 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
435 /// since keys may be added / removed over time. When the set of keys changes, the count will
436 /// be asynchronously updated.
437 ///
438 /// # Example
439 /// ```rust
440 /// # #[cfg(feature = "deploy")] {
441 /// # use hydro_lang::prelude::*;
442 /// # use futures::StreamExt;
443 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444 /// # let tick = process.tick();
445 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
446 /// # process
447 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
448 /// # .into_keyed()
449 /// # .batch(&tick, nondet!(/** test */))
450 /// # .first();
451 /// keyed_singleton.key_count()
452 /// # .all_ticks()
453 /// # }, |mut stream| async move {
454 /// // 3
455 /// # assert_eq!(stream.next().await.unwrap(), 3);
456 /// # }));
457 /// # }
458 /// ```
459 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
460 if B::ValueBound::BOUNDED {
461 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
462 location: self.location.clone(),
463 flow_state: self.flow_state.clone(),
464 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
465 _phantom: PhantomData,
466 };
467
468 me.entries().count()
469 } else if L::is_top_level()
470 && let Some(tick) = self.location.try_tick()
471 {
472 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
473 location: self.location.clone(),
474 flow_state: self.flow_state.clone(),
475 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
476 _phantom: PhantomData,
477 };
478
479 let out =
480 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
481 .latest();
482 Singleton::new(
483 out.location.clone(),
484 out.ir_node.replace(HydroNode::Placeholder),
485 )
486 } else {
487 panic!("Unbounded KeyedSingleton inside a tick");
488 }
489 }
490
491 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
492 ///
493 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
494 /// asynchronously as well.
495 ///
496 /// # Example
497 /// ```rust
498 /// # #[cfg(feature = "deploy")] {
499 /// # use hydro_lang::prelude::*;
500 /// # use futures::StreamExt;
501 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
502 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
503 /// # process
504 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
505 /// # .into_keyed()
506 /// # .batch(&process.tick(), nondet!(/** test */))
507 /// # .first();
508 /// keyed_singleton.into_singleton()
509 /// # .all_ticks()
510 /// # }, |mut stream| async move {
511 /// // { 1: "a", 2: "b", 3: "c" }
512 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
513 /// # }));
514 /// # }
515 /// ```
516 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
517 where
518 K: Eq + Hash,
519 {
520 if B::ValueBound::BOUNDED {
521 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
522 location: self.location.clone(),
523 flow_state: self.flow_state.clone(),
524 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
525 _phantom: PhantomData,
526 };
527
528 me.entries()
529 .assume_ordering(nondet!(
530 /// Because this is a keyed singleton, there is only one value per key.
531 ))
532 .fold(
533 q!(|| HashMap::new()),
534 q!(|map, (k, v)| {
535 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
536 map.insert(k, v);
537 }),
538 )
539 } else if L::is_top_level()
540 && let Some(tick) = self.location.try_tick()
541 {
542 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
543 location: self.location.clone(),
544 flow_state: self.flow_state.clone(),
545 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
546 _phantom: PhantomData,
547 };
548
549 let out = into_singleton_inside_tick(
550 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
551 )
552 .latest();
553 Singleton::new(
554 out.location.clone(),
555 out.ir_node.replace(HydroNode::Placeholder),
556 )
557 } else {
558 panic!("Unbounded KeyedSingleton inside a tick");
559 }
560 }
561
562 /// An operator which allows you to "name" a `HydroNode`.
563 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
564 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
565 {
566 let mut node = self.ir_node.borrow_mut();
567 let metadata = node.metadata_mut();
568 metadata.tag = Some(name.to_owned());
569 }
570 self
571 }
572
573 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
574 /// implies that `B == Bounded`.
575 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
576 where
577 B: IsBounded,
578 {
579 KeyedSingleton::new(
580 self.location.clone(),
581 self.ir_node.replace(HydroNode::Placeholder),
582 )
583 }
584
585 /// Gets the value associated with a specific key from the keyed singleton.
586 /// Returns `None` if the key is `None` or there is no associated value.
587 ///
588 /// # Example
589 /// ```rust
590 /// # #[cfg(feature = "deploy")] {
591 /// # use hydro_lang::prelude::*;
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594 /// let tick = process.tick();
595 /// let keyed_data = process
596 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
597 /// .into_keyed()
598 /// .batch(&tick, nondet!(/** test */))
599 /// .first();
600 /// let key = tick.singleton(q!(1));
601 /// keyed_data.get(key).all_ticks()
602 /// # }, |mut stream| async move {
603 /// // 2
604 /// # assert_eq!(stream.next().await.unwrap(), 2);
605 /// # }));
606 /// # }
607 /// ```
608 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
609 where
610 B: IsBounded,
611 K: Hash + Eq,
612 {
613 self.make_bounded()
614 .into_keyed_stream()
615 .get(key)
616 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
617 .first()
618 }
619
620 /// Emit a keyed stream containing keys shared between the keyed singleton and the
621 /// keyed stream, where each value in the output keyed stream is a tuple of
622 /// (the keyed singleton's value, the keyed stream's value).
623 ///
624 /// # Example
625 /// ```rust
626 /// # #[cfg(feature = "deploy")] {
627 /// # use hydro_lang::prelude::*;
628 /// # use futures::StreamExt;
629 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
630 /// let tick = process.tick();
631 /// let keyed_data = process
632 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
633 /// .into_keyed()
634 /// .batch(&tick, nondet!(/** test */))
635 /// .first();
636 /// let other_data = process
637 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
638 /// .into_keyed()
639 /// .batch(&tick, nondet!(/** test */));
640 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
641 /// # }, |mut stream| async move {
642 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
643 /// # let mut results = vec![];
644 /// # for _ in 0..3 {
645 /// # results.push(stream.next().await.unwrap());
646 /// # }
647 /// # results.sort();
648 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
649 /// # }));
650 /// # }
651 /// ```
652 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
653 self,
654 keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
655 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2>
656 where
657 B: IsBounded,
658 K: Eq + Hash,
659 {
660 self.make_bounded()
661 .entries()
662 .weaken_retries::<R2>()
663 .join(keyed_stream.entries())
664 .into_keyed()
665 }
666
667 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
668 /// where each value in the output keyed singleton is a tuple of
669 /// (self.value, other.value).
670 ///
671 /// # Example
672 /// ```rust
673 /// # #[cfg(feature = "deploy")] {
674 /// # use hydro_lang::prelude::*;
675 /// # use futures::StreamExt;
676 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
677 /// # let tick = process.tick();
678 /// let requests = // { 1: 10, 2: 20, 3: 30 }
679 /// # process
680 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
681 /// # .into_keyed()
682 /// # .batch(&tick, nondet!(/** test */))
683 /// # .first();
684 /// let other = // { 1: 100, 2: 200, 4: 400 }
685 /// # process
686 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
687 /// # .into_keyed()
688 /// # .batch(&tick, nondet!(/** test */))
689 /// # .first();
690 /// requests.join_keyed_singleton(other)
691 /// # .entries().all_ticks()
692 /// # }, |mut stream| async move {
693 /// // { 1: (10, 100), 2: (20, 200) }
694 /// # let mut results = vec![];
695 /// # for _ in 0..2 {
696 /// # results.push(stream.next().await.unwrap());
697 /// # }
698 /// # results.sort();
699 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
700 /// # }));
701 /// # }
702 /// ```
703 pub fn join_keyed_singleton<V2: Clone>(
704 self,
705 other: KeyedSingleton<K, V2, L, Bounded>,
706 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
707 where
708 B: IsBounded,
709 K: Eq + Hash,
710 {
711 let result_stream = self
712 .make_bounded()
713 .entries()
714 .join(other.entries())
715 .into_keyed();
716
717 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
718 KeyedSingleton::new(
719 result_stream.location.clone(),
720 HydroNode::Cast {
721 inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
722 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
723 K,
724 (V, V2),
725 L,
726 Bounded,
727 >::collection_kind(
728 )),
729 },
730 )
731 }
732
733 /// For each value in `self`, find the matching key in `lookup`.
734 /// The output is a keyed singleton with the key from `self`, and a value
735 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
736 /// If the key is not present in `lookup`, the option will be [`None`].
737 ///
738 /// # Example
739 /// ```rust
740 /// # #[cfg(feature = "deploy")] {
741 /// # use hydro_lang::prelude::*;
742 /// # use futures::StreamExt;
743 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
744 /// # let tick = process.tick();
745 /// let requests = // { 1: 10, 2: 20 }
746 /// # process
747 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
748 /// # .into_keyed()
749 /// # .batch(&tick, nondet!(/** test */))
750 /// # .first();
751 /// let other_data = // { 10: 100, 11: 110 }
752 /// # process
753 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
754 /// # .into_keyed()
755 /// # .batch(&tick, nondet!(/** test */))
756 /// # .first();
757 /// requests.lookup_keyed_singleton(other_data)
758 /// # .entries().all_ticks()
759 /// # }, |mut stream| async move {
760 /// // { 1: (10, Some(100)), 2: (20, None) }
761 /// # let mut results = vec![];
762 /// # for _ in 0..2 {
763 /// # results.push(stream.next().await.unwrap());
764 /// # }
765 /// # results.sort();
766 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
767 /// # }));
768 /// # }
769 /// ```
770 pub fn lookup_keyed_singleton<V2>(
771 self,
772 lookup: KeyedSingleton<V, V2, L, Bounded>,
773 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
774 where
775 B: IsBounded,
776 K: Eq + Hash + Clone,
777 V: Eq + Hash + Clone,
778 V2: Clone,
779 {
780 let result_stream = self
781 .make_bounded()
782 .into_keyed_stream()
783 .lookup_keyed_stream(lookup.into_keyed_stream());
784
785 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
786 KeyedSingleton::new(
787 result_stream.location.clone(),
788 HydroNode::Cast {
789 inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
790 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
791 K,
792 (V, Option<V2>),
793 L,
794 Bounded,
795 >::collection_kind(
796 )),
797 },
798 )
799 }
800
801 /// For each value in `self`, find the matching key in `lookup`.
802 /// The output is a keyed stream with the key from `self`, and a value
803 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
804 /// If the key is not present in `lookup`, the option will be [`None`].
805 ///
806 /// # Example
807 /// ```rust
808 /// # #[cfg(feature = "deploy")] {
809 /// # use hydro_lang::prelude::*;
810 /// # use futures::StreamExt;
811 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
812 /// # let tick = process.tick();
813 /// let requests = // { 1: 10, 2: 20 }
814 /// # process
815 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
816 /// # .into_keyed()
817 /// # .batch(&tick, nondet!(/** test */))
818 /// # .first();
819 /// let other_data = // { 10: 100, 10: 110 }
820 /// # process
821 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
822 /// # .into_keyed()
823 /// # .batch(&tick, nondet!(/** test */));
824 /// requests.lookup_keyed_stream(other_data)
825 /// # .entries().all_ticks()
826 /// # }, |mut stream| async move {
827 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
828 /// # let mut results = vec![];
829 /// # for _ in 0..3 {
830 /// # results.push(stream.next().await.unwrap());
831 /// # }
832 /// # results.sort();
833 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
834 /// # }));
835 /// # }
836 /// ```
837 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
838 self,
839 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
840 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
841 where
842 B: IsBounded,
843 K: Eq + Hash + Clone,
844 V: Eq + Hash + Clone,
845 V2: Clone,
846 {
847 self.make_bounded()
848 .entries()
849 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
850 .into_keyed()
851 .lookup_keyed_stream(lookup)
852 }
853}
854
855impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
856 KeyedSingleton<K, V, L, B>
857{
858 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
859 ///
860 /// The value for each key must be bounded, otherwise the resulting stream elements would be
861 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
862 /// into the output.
863 ///
864 /// # Example
865 /// ```rust
866 /// # #[cfg(feature = "deploy")] {
867 /// # use hydro_lang::prelude::*;
868 /// # use futures::StreamExt;
869 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
870 /// let keyed_singleton = // { 1: 2, 2: 4 }
871 /// # process
872 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
873 /// # .into_keyed()
874 /// # .first();
875 /// keyed_singleton.entries()
876 /// # }, |mut stream| async move {
877 /// // (1, 2), (2, 4) in any order
878 /// # let mut results = Vec::new();
879 /// # for _ in 0..2 {
880 /// # results.push(stream.next().await.unwrap());
881 /// # }
882 /// # results.sort();
883 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
884 /// # }));
885 /// # }
886 /// ```
887 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
888 self.into_keyed_stream().entries()
889 }
890
891 /// Flattens the keyed singleton into an unordered stream of just the values.
892 ///
893 /// The value for each key must be bounded, otherwise the resulting stream elements would be
894 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
895 /// into the output.
896 ///
897 /// # Example
898 /// ```rust
899 /// # #[cfg(feature = "deploy")] {
900 /// # use hydro_lang::prelude::*;
901 /// # use futures::StreamExt;
902 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903 /// let keyed_singleton = // { 1: 2, 2: 4 }
904 /// # process
905 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
906 /// # .into_keyed()
907 /// # .first();
908 /// keyed_singleton.values()
909 /// # }, |mut stream| async move {
910 /// // 2, 4 in any order
911 /// # let mut results = Vec::new();
912 /// # for _ in 0..2 {
913 /// # results.push(stream.next().await.unwrap());
914 /// # }
915 /// # results.sort();
916 /// # assert_eq!(results, vec![2, 4]);
917 /// # }));
918 /// # }
919 /// ```
920 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
921 let map_f = q!(|(_, v)| v)
922 .splice_fn1_ctx::<(K, V), V>(&self.location)
923 .into();
924
925 Stream::new(
926 self.location.clone(),
927 HydroNode::Map {
928 f: map_f,
929 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
930 metadata: self.location.new_node_metadata(Stream::<
931 V,
932 L,
933 B::UnderlyingBound,
934 NoOrder,
935 ExactlyOnce,
936 >::collection_kind()),
937 },
938 )
939 }
940
941 /// Flattens the keyed singleton into an unordered stream of just the keys.
942 ///
943 /// The value for each key must be bounded, otherwise the removal of keys would result in
944 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
945 /// into the output.
946 ///
947 /// # Example
948 /// ```rust
949 /// # #[cfg(feature = "deploy")] {
950 /// # use hydro_lang::prelude::*;
951 /// # use futures::StreamExt;
952 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953 /// let keyed_singleton = // { 1: 2, 2: 4 }
954 /// # process
955 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
956 /// # .into_keyed()
957 /// # .first();
958 /// keyed_singleton.keys()
959 /// # }, |mut stream| async move {
960 /// // 1, 2 in any order
961 /// # let mut results = Vec::new();
962 /// # for _ in 0..2 {
963 /// # results.push(stream.next().await.unwrap());
964 /// # }
965 /// # results.sort();
966 /// # assert_eq!(results, vec![1, 2]);
967 /// # }));
968 /// # }
969 /// ```
970 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
971 self.entries().map(q!(|(k, _)| k))
972 }
973
974 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
975 /// entries whose keys are not in the provided stream.
976 ///
977 /// # Example
978 /// ```rust
979 /// # #[cfg(feature = "deploy")] {
980 /// # use hydro_lang::prelude::*;
981 /// # use futures::StreamExt;
982 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
983 /// let tick = process.tick();
984 /// let keyed_singleton = // { 1: 2, 2: 4 }
985 /// # process
986 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
987 /// # .into_keyed()
988 /// # .first()
989 /// # .batch(&tick, nondet!(/** test */));
990 /// let keys_to_remove = process
991 /// .source_iter(q!(vec![1]))
992 /// .batch(&tick, nondet!(/** test */));
993 /// keyed_singleton.filter_key_not_in(keys_to_remove)
994 /// # .entries().all_ticks()
995 /// # }, |mut stream| async move {
996 /// // { 2: 4 }
997 /// # for w in vec![(2, 4)] {
998 /// # assert_eq!(stream.next().await.unwrap(), w);
999 /// # }
1000 /// # }));
1001 /// # }
1002 /// ```
1003 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1004 self,
1005 other: Stream<K, L, Bounded, O2, R2>,
1006 ) -> Self
1007 where
1008 K: Hash + Eq,
1009 {
1010 check_matching_location(&self.location, &other.location);
1011
1012 KeyedSingleton::new(
1013 self.location.clone(),
1014 HydroNode::AntiJoin {
1015 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1016 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1017 metadata: self.location.new_node_metadata(Self::collection_kind()),
1018 },
1019 )
1020 }
1021
1022 /// An operator which allows you to "inspect" each value of a keyed singleton without
1023 /// modifying it. The closure `f` is called on a reference to each value. This is
1024 /// mainly useful for debugging, and should not be used to generate side-effects.
1025 ///
1026 /// # Example
1027 /// ```rust
1028 /// # #[cfg(feature = "deploy")] {
1029 /// # use hydro_lang::prelude::*;
1030 /// # use futures::StreamExt;
1031 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1032 /// let keyed_singleton = // { 1: 2, 2: 4 }
1033 /// # process
1034 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1035 /// # .into_keyed()
1036 /// # .first();
1037 /// keyed_singleton
1038 /// .inspect(q!(|v| println!("{}", v)))
1039 /// # .entries()
1040 /// # }, |mut stream| async move {
1041 /// // { 1: 2, 2: 4 }
1042 /// # for w in vec![(1, 2), (2, 4)] {
1043 /// # assert_eq!(stream.next().await.unwrap(), w);
1044 /// # }
1045 /// # }));
1046 /// # }
1047 /// ```
1048 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1049 where
1050 F: Fn(&V) + 'a,
1051 {
1052 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1053 let inspect_f = q!({
1054 let orig = f;
1055 move |t: &(_, _)| orig(&t.1)
1056 })
1057 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1058 .into();
1059
1060 KeyedSingleton::new(
1061 self.location.clone(),
1062 HydroNode::Inspect {
1063 f: inspect_f,
1064 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1065 metadata: self.location.new_node_metadata(Self::collection_kind()),
1066 },
1067 )
1068 }
1069
1070 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1071 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1072 /// mainly useful for debugging, and should not be used to generate side-effects.
1073 ///
1074 /// # Example
1075 /// ```rust
1076 /// # #[cfg(feature = "deploy")] {
1077 /// # use hydro_lang::prelude::*;
1078 /// # use futures::StreamExt;
1079 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080 /// let keyed_singleton = // { 1: 2, 2: 4 }
1081 /// # process
1082 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1083 /// # .into_keyed()
1084 /// # .first();
1085 /// keyed_singleton
1086 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1087 /// # .entries()
1088 /// # }, |mut stream| async move {
1089 /// // { 1: 2, 2: 4 }
1090 /// # for w in vec![(1, 2), (2, 4)] {
1091 /// # assert_eq!(stream.next().await.unwrap(), w);
1092 /// # }
1093 /// # }));
1094 /// # }
1095 /// ```
1096 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1097 where
1098 F: Fn(&(K, V)) + 'a,
1099 {
1100 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1101
1102 KeyedSingleton::new(
1103 self.location.clone(),
1104 HydroNode::Inspect {
1105 f: inspect_f,
1106 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1107 metadata: self.location.new_node_metadata(Self::collection_kind()),
1108 },
1109 )
1110 }
1111
1112 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1113 ///
1114 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1115 /// asynchronously updated if a new key is added that is higher than the previous max key.
1116 ///
1117 /// # Example
1118 /// ```rust
1119 /// # #[cfg(feature = "deploy")] {
1120 /// # use hydro_lang::prelude::*;
1121 /// # use futures::StreamExt;
1122 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123 /// let tick = process.tick();
1124 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1125 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1126 /// # .into_keyed()
1127 /// # .first();
1128 /// keyed_singleton.get_max_key()
1129 /// # .sample_eager(nondet!(/** test */))
1130 /// # }, |mut stream| async move {
1131 /// // (2, 456)
1132 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1133 /// # }));
1134 /// # }
1135 /// ```
1136 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1137 where
1138 K: Ord,
1139 {
1140 self.entries()
1141 .assume_ordering_trusted(nondet!(
1142 /// There is only one element associated with each key, and the keys are totallly
1143 /// ordered so we will produce a deterministic value. The closure technically
1144 /// isn't commutative in the case where both passed entries have the same key
1145 /// but different values.
1146 ///
1147 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1148 /// the two inputs do not have the same key.
1149 ))
1150 .reduce(q!(
1151 move |curr, new| {
1152 if new.0 > curr.0 {
1153 *curr = new;
1154 }
1155 },
1156 idempotent = manual_proof!(/** repeated elements are ignored */)
1157 ))
1158 }
1159
1160 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1161 /// element, the value.
1162 ///
1163 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1164 ///
1165 /// # Example
1166 /// ```rust
1167 /// # #[cfg(feature = "deploy")] {
1168 /// # use hydro_lang::prelude::*;
1169 /// # use futures::StreamExt;
1170 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171 /// let keyed_singleton = // { 1: 2, 2: 4 }
1172 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1173 /// # .into_keyed()
1174 /// # .first();
1175 /// keyed_singleton
1176 /// .clone()
1177 /// .into_keyed_stream()
1178 /// .merge_unordered(
1179 /// keyed_singleton.into_keyed_stream()
1180 /// )
1181 /// # .entries()
1182 /// # }, |mut stream| async move {
1183 /// /// // { 1: [2, 2], 2: [4, 4] }
1184 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1185 /// # assert_eq!(stream.next().await.unwrap(), w);
1186 /// # }
1187 /// # }));
1188 /// # }
1189 /// ```
1190 pub fn into_keyed_stream(
1191 self,
1192 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1193 KeyedStream::new(
1194 self.location.clone(),
1195 HydroNode::Cast {
1196 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1197 metadata: self.location.new_node_metadata(KeyedStream::<
1198 K,
1199 V,
1200 L,
1201 B::UnderlyingBound,
1202 TotalOrder,
1203 ExactlyOnce,
1204 >::collection_kind()),
1205 },
1206 )
1207 }
1208}
1209
1210impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1211where
1212 L: Location<'a>,
1213{
1214 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1215 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1216 ///
1217 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1218 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1219 /// argument that declares where the keyed singleton will be atomically processed. Batching a
1220 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1221 /// batching into a different [`Tick`] will introduce asynchrony.
1222 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1223 let out_location = Atomic { tick: tick.clone() };
1224 KeyedSingleton::new(
1225 out_location.clone(),
1226 HydroNode::BeginAtomic {
1227 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1228 metadata: out_location
1229 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1230 },
1231 )
1232 }
1233}
1234
1235impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1236where
1237 L: Location<'a> + NoTick,
1238{
1239 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1240 /// See [`KeyedSingleton::atomic`] for more details.
1241 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1242 KeyedSingleton::new(
1243 self.location.tick.l.clone(),
1244 HydroNode::EndAtomic {
1245 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1246 metadata: self
1247 .location
1248 .tick
1249 .l
1250 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1251 },
1252 )
1253 }
1254}
1255
1256impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1257 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1258 /// tick `T` always has the entries of `self` at tick `T - 1`.
1259 ///
1260 /// At tick `0`, the output has no entries, since there is no previous tick.
1261 ///
1262 /// This operator enables stateful iterative processing with ticks, by sending data from one
1263 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1264 ///
1265 /// # Example
1266 /// ```rust
1267 /// # #[cfg(feature = "deploy")] {
1268 /// # use hydro_lang::prelude::*;
1269 /// # use futures::StreamExt;
1270 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1271 /// let tick = process.tick();
1272 /// # // ticks are lazy by default, forces the second tick to run
1273 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1274 /// # let batch_first_tick = process
1275 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1276 /// # .batch(&tick, nondet!(/** test */))
1277 /// # .into_keyed();
1278 /// # let batch_second_tick = process
1279 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1280 /// # .batch(&tick, nondet!(/** test */))
1281 /// # .into_keyed()
1282 /// # .defer_tick(); // appears on the second tick
1283 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1284 /// # batch_first_tick.chain(batch_second_tick).first();
1285 /// input_batch.clone().filter_key_not_in(
1286 /// input_batch.defer_tick().keys() // keys present in the previous tick
1287 /// )
1288 /// # .entries().all_ticks()
1289 /// # }, |mut stream| async move {
1290 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1291 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1292 /// # assert_eq!(stream.next().await.unwrap(), w);
1293 /// # }
1294 /// # }));
1295 /// # }
1296 /// ```
1297 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1298 KeyedSingleton::new(
1299 self.location.clone(),
1300 HydroNode::DeferTick {
1301 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1302 metadata: self
1303 .location
1304 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1305 },
1306 )
1307 }
1308}
1309
1310impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1311where
1312 L: Location<'a>,
1313{
1314 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1315 /// point in time.
1316 ///
1317 /// # Non-Determinism
1318 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1319 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1320 pub fn snapshot(
1321 self,
1322 tick: &Tick<L>,
1323 _nondet: NonDet,
1324 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1325 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1326 KeyedSingleton::new(
1327 tick.clone(),
1328 HydroNode::Batch {
1329 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1330 metadata: tick
1331 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1332 },
1333 )
1334 }
1335}
1336
1337impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1338where
1339 L: Location<'a> + NoTick,
1340{
1341 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1342 /// state of the keyed singleton being atomically processed.
1343 ///
1344 /// # Non-Determinism
1345 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1346 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1347 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1348 KeyedSingleton::new(
1349 self.location.clone().tick,
1350 HydroNode::Batch {
1351 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1352 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1353 K,
1354 V,
1355 Tick<L>,
1356 Bounded,
1357 >::collection_kind(
1358 )),
1359 },
1360 )
1361 }
1362}
1363
1364impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1365where
1366 L: Location<'a>,
1367{
1368 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1369 ///
1370 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1371 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1372 /// is filtered out.
1373 ///
1374 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1375 /// not modify or take ownership of the values. If you need to modify the values while filtering
1376 /// use [`KeyedSingleton::filter_map`] instead.
1377 ///
1378 /// # Example
1379 /// ```rust
1380 /// # #[cfg(feature = "deploy")] {
1381 /// # use hydro_lang::prelude::*;
1382 /// # use futures::StreamExt;
1383 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1385 /// # process
1386 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1387 /// # .into_keyed()
1388 /// # .first();
1389 /// keyed_singleton.filter(q!(|&v| v > 1))
1390 /// # .entries()
1391 /// # }, |mut stream| async move {
1392 /// // { 1: 2, 2: 4 }
1393 /// # let mut results = Vec::new();
1394 /// # for _ in 0..2 {
1395 /// # results.push(stream.next().await.unwrap());
1396 /// # }
1397 /// # results.sort();
1398 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1399 /// # }));
1400 /// # }
1401 /// ```
1402 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1403 where
1404 F: Fn(&V) -> bool + 'a,
1405 {
1406 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1407 let filter_f = q!({
1408 let orig = f;
1409 move |t: &(_, _)| orig(&t.1)
1410 })
1411 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1412 .into();
1413
1414 KeyedSingleton::new(
1415 self.location.clone(),
1416 HydroNode::Filter {
1417 f: filter_f,
1418 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1419 metadata: self
1420 .location
1421 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1422 },
1423 )
1424 }
1425
1426 /// An operator that both filters and maps values. It yields only the key-value pairs where
1427 /// the supplied closure `f` returns `Some(value)`.
1428 ///
1429 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1430 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1431 /// If it returns `None`, the key-value pair is filtered out.
1432 ///
1433 /// # Example
1434 /// ```rust
1435 /// # #[cfg(feature = "deploy")] {
1436 /// # use hydro_lang::prelude::*;
1437 /// # use futures::StreamExt;
1438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1439 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1440 /// # process
1441 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1442 /// # .into_keyed()
1443 /// # .first();
1444 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1445 /// # .entries()
1446 /// # }, |mut stream| async move {
1447 /// // { 1: 42, 3: 100 }
1448 /// # let mut results = Vec::new();
1449 /// # for _ in 0..2 {
1450 /// # results.push(stream.next().await.unwrap());
1451 /// # }
1452 /// # results.sort();
1453 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1454 /// # }));
1455 /// # }
1456 /// ```
1457 pub fn filter_map<F, U>(
1458 self,
1459 f: impl IntoQuotedMut<'a, F, L> + Copy,
1460 ) -> KeyedSingleton<K, U, L, B>
1461 where
1462 F: Fn(V) -> Option<U> + 'a,
1463 {
1464 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1465 let filter_map_f = q!({
1466 let orig = f;
1467 move |(k, v)| orig(v).map(|o| (k, o))
1468 })
1469 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1470 .into();
1471
1472 KeyedSingleton::new(
1473 self.location.clone(),
1474 HydroNode::FilterMap {
1475 f: filter_map_f,
1476 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1477 metadata: self
1478 .location
1479 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1480 },
1481 )
1482 }
1483
1484 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1485 /// arrived since the previous batch was released.
1486 ///
1487 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1488 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1489 ///
1490 /// # Non-Determinism
1491 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1492 /// has a non-deterministic set of key-value pairs.
1493 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1494 where
1495 L: NoTick,
1496 {
1497 self.atomic(tick).batch_atomic(nondet)
1498 }
1499}
1500
1501impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1502where
1503 L: Location<'a> + NoTick,
1504{
1505 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1506 /// atomically processed.
1507 ///
1508 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1509 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1510 ///
1511 /// # Non-Determinism
1512 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1513 /// has a non-deterministic set of key-value pairs.
1514 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1515 let _ = nondet;
1516 KeyedSingleton::new(
1517 self.location.clone().tick,
1518 HydroNode::Batch {
1519 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1520 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1521 K,
1522 V,
1523 Tick<L>,
1524 Bounded,
1525 >::collection_kind(
1526 )),
1527 },
1528 )
1529 }
1530}
1531
1532#[cfg(test)]
1533mod tests {
1534 #[cfg(feature = "deploy")]
1535 use futures::{SinkExt, StreamExt};
1536 #[cfg(feature = "deploy")]
1537 use hydro_deploy::Deployment;
1538 #[cfg(any(feature = "deploy", feature = "sim"))]
1539 use stageleft::q;
1540
1541 #[cfg(any(feature = "deploy", feature = "sim"))]
1542 use crate::compile::builder::FlowBuilder;
1543 #[cfg(any(feature = "deploy", feature = "sim"))]
1544 use crate::location::Location;
1545 #[cfg(any(feature = "deploy", feature = "sim"))]
1546 use crate::nondet::nondet;
1547
1548 #[cfg(feature = "deploy")]
1549 #[tokio::test]
1550 async fn key_count_bounded_value() {
1551 let mut deployment = Deployment::new();
1552
1553 let mut flow = FlowBuilder::new();
1554 let node = flow.process::<()>();
1555 let external = flow.external::<()>();
1556
1557 let (input_port, input) = node.source_external_bincode(&external);
1558 let out = input
1559 .into_keyed()
1560 .first()
1561 .key_count()
1562 .sample_eager(nondet!(/** test */))
1563 .send_bincode_external(&external);
1564
1565 let nodes = flow
1566 .with_process(&node, deployment.Localhost())
1567 .with_external(&external, deployment.Localhost())
1568 .deploy(&mut deployment);
1569
1570 deployment.deploy().await.unwrap();
1571
1572 let mut external_in = nodes.connect(input_port).await;
1573 let mut external_out = nodes.connect(out).await;
1574
1575 deployment.start().await.unwrap();
1576
1577 assert_eq!(external_out.next().await.unwrap(), 0);
1578
1579 external_in.send((1, 1)).await.unwrap();
1580 assert_eq!(external_out.next().await.unwrap(), 1);
1581
1582 external_in.send((2, 2)).await.unwrap();
1583 assert_eq!(external_out.next().await.unwrap(), 2);
1584 }
1585
1586 #[cfg(feature = "deploy")]
1587 #[tokio::test]
1588 async fn key_count_unbounded_value() {
1589 let mut deployment = Deployment::new();
1590
1591 let mut flow = FlowBuilder::new();
1592 let node = flow.process::<()>();
1593 let external = flow.external::<()>();
1594
1595 let (input_port, input) = node.source_external_bincode(&external);
1596 let out = input
1597 .into_keyed()
1598 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1599 .key_count()
1600 .sample_eager(nondet!(/** test */))
1601 .send_bincode_external(&external);
1602
1603 let nodes = flow
1604 .with_process(&node, deployment.Localhost())
1605 .with_external(&external, deployment.Localhost())
1606 .deploy(&mut deployment);
1607
1608 deployment.deploy().await.unwrap();
1609
1610 let mut external_in = nodes.connect(input_port).await;
1611 let mut external_out = nodes.connect(out).await;
1612
1613 deployment.start().await.unwrap();
1614
1615 assert_eq!(external_out.next().await.unwrap(), 0);
1616
1617 external_in.send((1, 1)).await.unwrap();
1618 assert_eq!(external_out.next().await.unwrap(), 1);
1619
1620 external_in.send((1, 2)).await.unwrap();
1621 assert_eq!(external_out.next().await.unwrap(), 1);
1622
1623 external_in.send((2, 2)).await.unwrap();
1624 assert_eq!(external_out.next().await.unwrap(), 2);
1625
1626 external_in.send((1, 1)).await.unwrap();
1627 assert_eq!(external_out.next().await.unwrap(), 2);
1628
1629 external_in.send((3, 1)).await.unwrap();
1630 assert_eq!(external_out.next().await.unwrap(), 3);
1631 }
1632
1633 #[cfg(feature = "deploy")]
1634 #[tokio::test]
1635 async fn into_singleton_bounded_value() {
1636 let mut deployment = Deployment::new();
1637
1638 let mut flow = FlowBuilder::new();
1639 let node = flow.process::<()>();
1640 let external = flow.external::<()>();
1641
1642 let (input_port, input) = node.source_external_bincode(&external);
1643 let out = input
1644 .into_keyed()
1645 .first()
1646 .into_singleton()
1647 .sample_eager(nondet!(/** test */))
1648 .send_bincode_external(&external);
1649
1650 let nodes = flow
1651 .with_process(&node, deployment.Localhost())
1652 .with_external(&external, deployment.Localhost())
1653 .deploy(&mut deployment);
1654
1655 deployment.deploy().await.unwrap();
1656
1657 let mut external_in = nodes.connect(input_port).await;
1658 let mut external_out = nodes.connect(out).await;
1659
1660 deployment.start().await.unwrap();
1661
1662 assert_eq!(
1663 external_out.next().await.unwrap(),
1664 std::collections::HashMap::new()
1665 );
1666
1667 external_in.send((1, 1)).await.unwrap();
1668 assert_eq!(
1669 external_out.next().await.unwrap(),
1670 vec![(1, 1)].into_iter().collect()
1671 );
1672
1673 external_in.send((2, 2)).await.unwrap();
1674 assert_eq!(
1675 external_out.next().await.unwrap(),
1676 vec![(1, 1), (2, 2)].into_iter().collect()
1677 );
1678 }
1679
1680 #[cfg(feature = "deploy")]
1681 #[tokio::test]
1682 async fn into_singleton_unbounded_value() {
1683 let mut deployment = Deployment::new();
1684
1685 let mut flow = FlowBuilder::new();
1686 let node = flow.process::<()>();
1687 let external = flow.external::<()>();
1688
1689 let (input_port, input) = node.source_external_bincode(&external);
1690 let out = input
1691 .into_keyed()
1692 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1693 .into_singleton()
1694 .sample_eager(nondet!(/** test */))
1695 .send_bincode_external(&external);
1696
1697 let nodes = flow
1698 .with_process(&node, deployment.Localhost())
1699 .with_external(&external, deployment.Localhost())
1700 .deploy(&mut deployment);
1701
1702 deployment.deploy().await.unwrap();
1703
1704 let mut external_in = nodes.connect(input_port).await;
1705 let mut external_out = nodes.connect(out).await;
1706
1707 deployment.start().await.unwrap();
1708
1709 assert_eq!(
1710 external_out.next().await.unwrap(),
1711 std::collections::HashMap::new()
1712 );
1713
1714 external_in.send((1, 1)).await.unwrap();
1715 assert_eq!(
1716 external_out.next().await.unwrap(),
1717 vec![(1, 1)].into_iter().collect()
1718 );
1719
1720 external_in.send((1, 2)).await.unwrap();
1721 assert_eq!(
1722 external_out.next().await.unwrap(),
1723 vec![(1, 2)].into_iter().collect()
1724 );
1725
1726 external_in.send((2, 2)).await.unwrap();
1727 assert_eq!(
1728 external_out.next().await.unwrap(),
1729 vec![(1, 2), (2, 1)].into_iter().collect()
1730 );
1731
1732 external_in.send((1, 1)).await.unwrap();
1733 assert_eq!(
1734 external_out.next().await.unwrap(),
1735 vec![(1, 3), (2, 1)].into_iter().collect()
1736 );
1737
1738 external_in.send((3, 1)).await.unwrap();
1739 assert_eq!(
1740 external_out.next().await.unwrap(),
1741 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1742 );
1743 }
1744
1745 #[cfg(feature = "sim")]
1746 #[test]
1747 fn sim_unbounded_singleton_snapshot() {
1748 let mut flow = FlowBuilder::new();
1749 let node = flow.process::<()>();
1750
1751 let (input_port, input) = node.sim_input();
1752 let output = input
1753 .into_keyed()
1754 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1755 .snapshot(&node.tick(), nondet!(/** test */))
1756 .entries()
1757 .all_ticks()
1758 .sim_output();
1759
1760 let count = flow.sim().exhaustive(async || {
1761 input_port.send((1, 123));
1762 input_port.send((1, 456));
1763 input_port.send((2, 123));
1764
1765 let all = output.collect_sorted::<Vec<_>>().await;
1766 assert_eq!(all.last().unwrap(), &(2, 1));
1767 });
1768
1769 assert_eq!(count, 8);
1770 }
1771
1772 #[cfg(feature = "deploy")]
1773 #[tokio::test]
1774 async fn join_keyed_stream() {
1775 let mut deployment = Deployment::new();
1776
1777 let mut flow = FlowBuilder::new();
1778 let node = flow.process::<()>();
1779 let external = flow.external::<()>();
1780
1781 let tick = node.tick();
1782 let keyed_data = node
1783 .source_iter(q!(vec![(1, 10), (2, 20)]))
1784 .into_keyed()
1785 .batch(&tick, nondet!(/** test */))
1786 .first();
1787 let requests = node
1788 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1789 .into_keyed()
1790 .batch(&tick, nondet!(/** test */));
1791
1792 let out = keyed_data
1793 .join_keyed_stream(requests)
1794 .entries()
1795 .all_ticks()
1796 .send_bincode_external(&external);
1797
1798 let nodes = flow
1799 .with_process(&node, deployment.Localhost())
1800 .with_external(&external, deployment.Localhost())
1801 .deploy(&mut deployment);
1802
1803 deployment.deploy().await.unwrap();
1804
1805 let mut external_out = nodes.connect(out).await;
1806
1807 deployment.start().await.unwrap();
1808
1809 let mut results = vec![];
1810 for _ in 0..2 {
1811 results.push(external_out.next().await.unwrap());
1812 }
1813 results.sort();
1814
1815 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1816 }
1817}