hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20use crate::forward_handle::ForwardRef;
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
31///
32/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
33/// removed / changed), this also includes an additional variant [`BoundedValue`], which indicates
34/// that entries may be added over time, but once an entry is added it will never be removed and
35/// its value will never change.
36pub trait KeyedSingletonBound {
37 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
38 type UnderlyingBound: Boundedness;
39 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
40 type ValueBound: Boundedness;
41
42 /// The type of the keyed singleton if the value for each key is immutable.
43 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
44
45 /// The type of the keyed singleton if the value for each key may change asynchronously.
46 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
47
48 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
49 fn bound_kind() -> KeyedSingletonBoundKind;
50}
51
52impl KeyedSingletonBound for Unbounded {
53 type UnderlyingBound = Unbounded;
54 type ValueBound = Unbounded;
55 type WithBoundedValue = BoundedValue;
56 type WithUnboundedValue = Unbounded;
57
58 fn bound_kind() -> KeyedSingletonBoundKind {
59 KeyedSingletonBoundKind::Unbounded
60 }
61}
62
63impl KeyedSingletonBound for Bounded {
64 type UnderlyingBound = Bounded;
65 type ValueBound = Bounded;
66 type WithBoundedValue = Bounded;
67 type WithUnboundedValue = UnreachableBound;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Bounded
71 }
72}
73
74/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
75/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
76/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
77pub struct BoundedValue;
78
79impl KeyedSingletonBound for BoundedValue {
80 type UnderlyingBound = Unbounded;
81 type ValueBound = Bounded;
82 type WithBoundedValue = BoundedValue;
83 type WithUnboundedValue = Unbounded;
84
85 fn bound_kind() -> KeyedSingletonBoundKind {
86 KeyedSingletonBoundKind::BoundedValue
87 }
88}
89
90#[doc(hidden)]
91pub struct UnreachableBound;
92
93impl KeyedSingletonBound for UnreachableBound {
94 type UnderlyingBound = Bounded;
95 type ValueBound = Unbounded;
96
97 type WithBoundedValue = Bounded;
98 type WithUnboundedValue = UnreachableBound;
99
100 fn bound_kind() -> KeyedSingletonBoundKind {
101 unreachable!("UnreachableBound cannot be instantiated")
102 }
103}
104
105/// Mapping from keys of type `K` to values of type `V`.
106///
107/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
108/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
109/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
110/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
111/// keys cannot be removed and the value for each key is immutable.
112///
113/// Type Parameters:
114/// - `K`: the type of the key for each entry
115/// - `V`: the type of the value for each entry
116/// - `Loc`: the [`Location`] where the keyed singleton is materialized
117/// - `Bound`: tracks whether the entries are:
118/// - [`Bounded`] (local and finite)
119/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
120/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
121pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
122 pub(crate) location: Loc,
123 pub(crate) ir_node: RefCell<HydroNode>,
124
125 _phantom: PhantomData<(K, V, Loc, Bound)>,
126}
127
128impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
129 for KeyedSingleton<K, V, Loc, Bound>
130{
131 fn clone(&self) -> Self {
132 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
133 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
134 *self.ir_node.borrow_mut() = HydroNode::Tee {
135 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
136 metadata: self.location.new_node_metadata(Self::collection_kind()),
137 };
138 }
139
140 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
141 KeyedSingleton {
142 location: self.location.clone(),
143 ir_node: HydroNode::Tee {
144 inner: TeeNode(inner.0.clone()),
145 metadata: metadata.clone(),
146 }
147 .into(),
148 _phantom: PhantomData,
149 }
150 } else {
151 unreachable!()
152 }
153 }
154}
155
156impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
157 for KeyedSingleton<K, V, L, B>
158where
159 L: Location<'a> + NoTick,
160{
161 type Location = L;
162
163 fn create_source(ident: syn::Ident, location: L) -> Self {
164 KeyedSingleton {
165 location: location.clone(),
166 ir_node: RefCell::new(HydroNode::CycleSource {
167 ident,
168 metadata: location.new_node_metadata(Self::collection_kind()),
169 }),
170 _phantom: PhantomData,
171 }
172 }
173}
174
175impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
176 for KeyedSingleton<K, V, L, B>
177where
178 L: Location<'a> + NoTick,
179{
180 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
181 assert_eq!(
182 Location::id(&self.location),
183 expected_location,
184 "locations do not match"
185 );
186 self.location
187 .flow_state()
188 .borrow_mut()
189 .push_root(HydroRoot::CycleSink {
190 ident,
191 input: Box::new(self.ir_node.into_inner()),
192 op_metadata: HydroIrOpMetadata::new(),
193 });
194 }
195}
196
197impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
198 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
199 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
200 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
201
202 KeyedSingleton {
203 location,
204 ir_node: RefCell::new(ir_node),
205 _phantom: PhantomData,
206 }
207 }
208
209 /// Returns the [`Location`] where this keyed singleton is being materialized.
210 pub fn location(&self) -> &L {
211 &self.location
212 }
213}
214
215#[cfg(stageleft_runtime)]
216fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
217 me: KeyedSingleton<K, V, L, Bounded>,
218) -> Singleton<usize, L, Bounded> {
219 me.entries().count()
220}
221
222#[cfg(stageleft_runtime)]
223fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
224 me: KeyedSingleton<K, V, L, Bounded>,
225) -> Singleton<HashMap<K, V>, L, Bounded>
226where
227 K: Eq + Hash,
228{
229 me.entries()
230 .assume_ordering(nondet!(
231 /// Because this is a keyed singleton, there is only one value per key.
232 ))
233 .fold(
234 q!(|| HashMap::new()),
235 q!(|map, (k, v)| {
236 map.insert(k, v);
237 }),
238 )
239}
240
241impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
242 pub(crate) fn collection_kind() -> CollectionKind {
243 CollectionKind::KeyedSingleton {
244 bound: B::bound_kind(),
245 key_type: stageleft::quote_type::<K>().into(),
246 value_type: stageleft::quote_type::<V>().into(),
247 }
248 }
249
250 /// Transforms each value by invoking `f` on each element, with keys staying the same
251 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
252 ///
253 /// If you do not want to modify the stream and instead only want to view
254 /// each item use [`KeyedSingleton::inspect`] instead.
255 ///
256 /// # Example
257 /// ```rust
258 /// # use hydro_lang::prelude::*;
259 /// # use futures::StreamExt;
260 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
261 /// let keyed_singleton = // { 1: 2, 2: 4 }
262 /// # process
263 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
264 /// # .into_keyed()
265 /// # .first();
266 /// keyed_singleton.map(q!(|v| v + 1))
267 /// # .entries()
268 /// # }, |mut stream| async move {
269 /// // { 1: 3, 2: 5 }
270 /// # let mut results = Vec::new();
271 /// # for _ in 0..2 {
272 /// # results.push(stream.next().await.unwrap());
273 /// # }
274 /// # results.sort();
275 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
276 /// # }));
277 /// ```
278 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
279 where
280 F: Fn(V) -> U + 'a,
281 {
282 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
283 let map_f = q!({
284 let orig = f;
285 move |(k, v)| (k, orig(v))
286 })
287 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
288 .into();
289
290 KeyedSingleton::new(
291 self.location.clone(),
292 HydroNode::Map {
293 f: map_f,
294 input: Box::new(self.ir_node.into_inner()),
295 metadata: self
296 .location
297 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
298 },
299 )
300 }
301
302 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
303 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
304 ///
305 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
306 /// the new value `U`. The key remains unchanged in the output.
307 ///
308 /// # Example
309 /// ```rust
310 /// # use hydro_lang::prelude::*;
311 /// # use futures::StreamExt;
312 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
313 /// let keyed_singleton = // { 1: 2, 2: 4 }
314 /// # process
315 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
316 /// # .into_keyed()
317 /// # .first();
318 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
319 /// # .entries()
320 /// # }, |mut stream| async move {
321 /// // { 1: 3, 2: 6 }
322 /// # let mut results = Vec::new();
323 /// # for _ in 0..2 {
324 /// # results.push(stream.next().await.unwrap());
325 /// # }
326 /// # results.sort();
327 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
328 /// # }));
329 /// ```
330 pub fn map_with_key<U, F>(
331 self,
332 f: impl IntoQuotedMut<'a, F, L> + Copy,
333 ) -> KeyedSingleton<K, U, L, B>
334 where
335 F: Fn((K, V)) -> U + 'a,
336 K: Clone,
337 {
338 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
339 let map_f = q!({
340 let orig = f;
341 move |(k, v)| {
342 let out = orig((Clone::clone(&k), v));
343 (k, out)
344 }
345 })
346 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
347 .into();
348
349 KeyedSingleton::new(
350 self.location.clone(),
351 HydroNode::Map {
352 f: map_f,
353 input: Box::new(self.ir_node.into_inner()),
354 metadata: self
355 .location
356 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
357 },
358 )
359 }
360
361 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
362 ///
363 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
364 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
365 /// is filtered out.
366 ///
367 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
368 /// not modify or take ownership of the values. If you need to modify the values while filtering
369 /// use [`KeyedSingleton::filter_map`] instead.
370 ///
371 /// # Example
372 /// ```rust
373 /// # use hydro_lang::prelude::*;
374 /// # use futures::StreamExt;
375 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
376 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
377 /// # process
378 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
379 /// # .into_keyed()
380 /// # .first();
381 /// keyed_singleton.filter(q!(|&v| v > 1))
382 /// # .entries()
383 /// # }, |mut stream| async move {
384 /// // { 1: 2, 2: 4 }
385 /// # let mut results = Vec::new();
386 /// # for _ in 0..2 {
387 /// # results.push(stream.next().await.unwrap());
388 /// # }
389 /// # results.sort();
390 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
391 /// # }));
392 /// ```
393 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
394 where
395 F: Fn(&V) -> bool + 'a,
396 {
397 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
398 let filter_f = q!({
399 let orig = f;
400 move |t: &(_, _)| orig(&t.1)
401 })
402 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
403 .into();
404
405 KeyedSingleton::new(
406 self.location.clone(),
407 HydroNode::Filter {
408 f: filter_f,
409 input: Box::new(self.ir_node.into_inner()),
410 metadata: self
411 .location
412 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
413 },
414 )
415 }
416
417 /// An operator that both filters and maps values. It yields only the key-value pairs where
418 /// the supplied closure `f` returns `Some(value)`.
419 ///
420 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
421 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
422 /// If it returns `None`, the key-value pair is filtered out.
423 ///
424 /// # Example
425 /// ```rust
426 /// # use hydro_lang::prelude::*;
427 /// # use futures::StreamExt;
428 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
429 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
430 /// # process
431 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
432 /// # .into_keyed()
433 /// # .first();
434 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
435 /// # .entries()
436 /// # }, |mut stream| async move {
437 /// // { 1: 42, 3: 100 }
438 /// # let mut results = Vec::new();
439 /// # for _ in 0..2 {
440 /// # results.push(stream.next().await.unwrap());
441 /// # }
442 /// # results.sort();
443 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
444 /// # }));
445 /// ```
446 pub fn filter_map<F, U>(
447 self,
448 f: impl IntoQuotedMut<'a, F, L> + Copy,
449 ) -> KeyedSingleton<K, U, L, B>
450 where
451 F: Fn(V) -> Option<U> + 'a,
452 {
453 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
454 let filter_map_f = q!({
455 let orig = f;
456 move |(k, v)| orig(v).map(|o| (k, o))
457 })
458 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
459 .into();
460
461 KeyedSingleton::new(
462 self.location.clone(),
463 HydroNode::FilterMap {
464 f: filter_map_f,
465 input: Box::new(self.ir_node.into_inner()),
466 metadata: self
467 .location
468 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
469 },
470 )
471 }
472
473 /// Gets the number of keys in the keyed singleton.
474 ///
475 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
476 /// since keys may be added / removed over time. When the set of keys changes, the count will
477 /// be asynchronously updated.
478 ///
479 /// # Example
480 /// ```rust
481 /// # use hydro_lang::prelude::*;
482 /// # use futures::StreamExt;
483 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
484 /// # let tick = process.tick();
485 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
486 /// # process
487 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
488 /// # .into_keyed()
489 /// # .batch(&tick, nondet!(/** test */))
490 /// # .first();
491 /// keyed_singleton.key_count()
492 /// # .all_ticks()
493 /// # }, |mut stream| async move {
494 /// // 3
495 /// # assert_eq!(stream.next().await.unwrap(), 3);
496 /// # }));
497 /// ```
498 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
499 if B::ValueBound::BOUNDED {
500 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
501 location: self.location,
502 ir_node: self.ir_node,
503 _phantom: PhantomData,
504 };
505
506 me.entries().count()
507 } else if L::is_top_level()
508 && let Some(tick) = self.location.try_tick()
509 {
510 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
511 location: self.location,
512 ir_node: self.ir_node,
513 _phantom: PhantomData,
514 };
515
516 let out =
517 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
518 .latest();
519 Singleton::new(out.location, out.ir_node.into_inner())
520 } else {
521 panic!("Unbounded KeyedSingleton inside a tick");
522 }
523 }
524
525 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
526 ///
527 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
528 /// asynchronously as well.
529 ///
530 /// # Example
531 /// ```rust
532 /// # use hydro_lang::prelude::*;
533 /// # use futures::StreamExt;
534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
536 /// # process
537 /// # .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
538 /// # .into_keyed()
539 /// # .batch(&process.tick(), nondet!(/** test */))
540 /// # .first();
541 /// keyed_singleton.into_singleton()
542 /// # .all_ticks()
543 /// # }, |mut stream| async move {
544 /// // { 1: "a", 2: "b", 3: "c" }
545 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
546 /// # }));
547 /// ```
548 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
549 where
550 K: Eq + Hash,
551 {
552 if B::ValueBound::BOUNDED {
553 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
554 location: self.location,
555 ir_node: self.ir_node,
556 _phantom: PhantomData,
557 };
558
559 me.entries()
560 .assume_ordering(nondet!(
561 /// Because this is a keyed singleton, there is only one value per key.
562 ))
563 .fold(
564 q!(|| HashMap::new()),
565 q!(|map, (k, v)| {
566 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
567 map.insert(k, v);
568 }),
569 )
570 } else if L::is_top_level()
571 && let Some(tick) = self.location.try_tick()
572 {
573 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
574 location: self.location,
575 ir_node: self.ir_node,
576 _phantom: PhantomData,
577 };
578
579 let out = into_singleton_inside_tick(
580 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
581 )
582 .latest();
583 Singleton::new(out.location, out.ir_node.into_inner())
584 } else {
585 panic!("Unbounded KeyedSingleton inside a tick");
586 }
587 }
588
589 /// An operator which allows you to "name" a `HydroNode`.
590 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
591 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
592 {
593 let mut node = self.ir_node.borrow_mut();
594 let metadata = node.metadata_mut();
595 metadata.tag = Some(name.to_string());
596 }
597 self
598 }
599}
600
601impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
602 KeyedSingleton<K, V, L, B>
603{
604 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
605 ///
606 /// The value for each key must be bounded, otherwise the resulting stream elements would be
607 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
608 /// into the output.
609 ///
610 /// # Example
611 /// ```rust
612 /// # use hydro_lang::prelude::*;
613 /// # use futures::StreamExt;
614 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
615 /// let keyed_singleton = // { 1: 2, 2: 4 }
616 /// # process
617 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
618 /// # .into_keyed()
619 /// # .first();
620 /// keyed_singleton.entries()
621 /// # }, |mut stream| async move {
622 /// // (1, 2), (2, 4) in any order
623 /// # let mut results = Vec::new();
624 /// # for _ in 0..2 {
625 /// # results.push(stream.next().await.unwrap());
626 /// # }
627 /// # results.sort();
628 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
629 /// # }));
630 /// ```
631 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
632 self.into_keyed_stream().entries()
633 }
634
635 /// Flattens the keyed singleton into an unordered stream of just the values.
636 ///
637 /// The value for each key must be bounded, otherwise the resulting stream elements would be
638 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
639 /// into the output.
640 ///
641 /// # Example
642 /// ```rust
643 /// # use hydro_lang::prelude::*;
644 /// # use futures::StreamExt;
645 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
646 /// let keyed_singleton = // { 1: 2, 2: 4 }
647 /// # process
648 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
649 /// # .into_keyed()
650 /// # .first();
651 /// keyed_singleton.values()
652 /// # }, |mut stream| async move {
653 /// // 2, 4 in any order
654 /// # let mut results = Vec::new();
655 /// # for _ in 0..2 {
656 /// # results.push(stream.next().await.unwrap());
657 /// # }
658 /// # results.sort();
659 /// # assert_eq!(results, vec![2, 4]);
660 /// # }));
661 /// ```
662 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
663 let map_f = q!(|(_, v)| v)
664 .splice_fn1_ctx::<(K, V), V>(&self.location)
665 .into();
666
667 Stream::new(
668 self.location.clone(),
669 HydroNode::Map {
670 f: map_f,
671 input: Box::new(self.ir_node.into_inner()),
672 metadata: self.location.new_node_metadata(Stream::<
673 V,
674 L,
675 B::UnderlyingBound,
676 NoOrder,
677 ExactlyOnce,
678 >::collection_kind()),
679 },
680 )
681 }
682
683 /// Flattens the keyed singleton into an unordered stream of just the keys.
684 ///
685 /// The value for each key must be bounded, otherwise the removal of keys would result in
686 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
687 /// into the output.
688 ///
689 /// # Example
690 /// ```rust
691 /// # use hydro_lang::prelude::*;
692 /// # use futures::StreamExt;
693 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
694 /// let keyed_singleton = // { 1: 2, 2: 4 }
695 /// # process
696 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
697 /// # .into_keyed()
698 /// # .first();
699 /// keyed_singleton.keys()
700 /// # }, |mut stream| async move {
701 /// // 1, 2 in any order
702 /// # let mut results = Vec::new();
703 /// # for _ in 0..2 {
704 /// # results.push(stream.next().await.unwrap());
705 /// # }
706 /// # results.sort();
707 /// # assert_eq!(results, vec![1, 2]);
708 /// # }));
709 /// ```
710 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
711 self.entries().map(q!(|(k, _)| k))
712 }
713
714 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
715 /// entries whose keys are not in the provided stream.
716 ///
717 /// # Example
718 /// ```rust
719 /// # use hydro_lang::prelude::*;
720 /// # use futures::StreamExt;
721 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722 /// let tick = process.tick();
723 /// let keyed_singleton = // { 1: 2, 2: 4 }
724 /// # process
725 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
726 /// # .into_keyed()
727 /// # .first()
728 /// # .batch(&tick, nondet!(/** test */));
729 /// let keys_to_remove = process
730 /// .source_iter(q!(vec![1]))
731 /// .batch(&tick, nondet!(/** test */));
732 /// keyed_singleton.filter_key_not_in(keys_to_remove)
733 /// # .entries().all_ticks()
734 /// # }, |mut stream| async move {
735 /// // { 2: 4 }
736 /// # for w in vec![(2, 4)] {
737 /// # assert_eq!(stream.next().await.unwrap(), w);
738 /// # }
739 /// # }));
740 /// ```
741 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
742 self,
743 other: Stream<K, L, Bounded, O2, R2>,
744 ) -> Self
745 where
746 K: Hash + Eq,
747 {
748 check_matching_location(&self.location, &other.location);
749
750 KeyedSingleton::new(
751 self.location.clone(),
752 HydroNode::AntiJoin {
753 pos: Box::new(self.ir_node.into_inner()),
754 neg: Box::new(other.ir_node.into_inner()),
755 metadata: self.location.new_node_metadata(Self::collection_kind()),
756 },
757 )
758 }
759
760 /// An operator which allows you to "inspect" each value of a keyed singleton without
761 /// modifying it. The closure `f` is called on a reference to each value. This is
762 /// mainly useful for debugging, and should not be used to generate side-effects.
763 ///
764 /// # Example
765 /// ```rust
766 /// # use hydro_lang::prelude::*;
767 /// # use futures::StreamExt;
768 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769 /// let keyed_singleton = // { 1: 2, 2: 4 }
770 /// # process
771 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
772 /// # .into_keyed()
773 /// # .first();
774 /// keyed_singleton
775 /// .inspect(q!(|v| println!("{}", v)))
776 /// # .entries()
777 /// # }, |mut stream| async move {
778 /// // { 1: 2, 2: 4 }
779 /// # for w in vec![(1, 2), (2, 4)] {
780 /// # assert_eq!(stream.next().await.unwrap(), w);
781 /// # }
782 /// # }));
783 /// ```
784 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
785 where
786 F: Fn(&V) + 'a,
787 {
788 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
789 let inspect_f = q!({
790 let orig = f;
791 move |t: &(_, _)| orig(&t.1)
792 })
793 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
794 .into();
795
796 KeyedSingleton::new(
797 self.location.clone(),
798 HydroNode::Inspect {
799 f: inspect_f,
800 input: Box::new(self.ir_node.into_inner()),
801 metadata: self.location.new_node_metadata(Self::collection_kind()),
802 },
803 )
804 }
805
806 /// An operator which allows you to "inspect" each entry of a keyed singleton without
807 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
808 /// mainly useful for debugging, and should not be used to generate side-effects.
809 ///
810 /// # Example
811 /// ```rust
812 /// # use hydro_lang::prelude::*;
813 /// # use futures::StreamExt;
814 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815 /// let keyed_singleton = // { 1: 2, 2: 4 }
816 /// # process
817 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
818 /// # .into_keyed()
819 /// # .first();
820 /// keyed_singleton
821 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
822 /// # .entries()
823 /// # }, |mut stream| async move {
824 /// // { 1: 2, 2: 4 }
825 /// # for w in vec![(1, 2), (2, 4)] {
826 /// # assert_eq!(stream.next().await.unwrap(), w);
827 /// # }
828 /// # }));
829 /// ```
830 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
831 where
832 F: Fn(&(K, V)) + 'a,
833 {
834 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
835
836 KeyedSingleton::new(
837 self.location.clone(),
838 HydroNode::Inspect {
839 f: inspect_f,
840 input: Box::new(self.ir_node.into_inner()),
841 metadata: self.location.new_node_metadata(Self::collection_kind()),
842 },
843 )
844 }
845
846 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
847 ///
848 /// Because this method requires values to be bounded, the output [`Optional`] will only be
849 /// asynchronously updated if a new key is added that is higher than the previous max key.
850 ///
851 /// # Example
852 /// ```rust
853 /// # use hydro_lang::prelude::*;
854 /// # use futures::StreamExt;
855 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
856 /// let tick = process.tick();
857 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
858 /// # process
859 /// # .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
860 /// # .into_keyed()
861 /// # .first();
862 /// keyed_singleton.get_max_key()
863 /// # .sample_eager(nondet!(/** test */))
864 /// # }, |mut stream| async move {
865 /// // (2, 456)
866 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
867 /// # }));
868 /// ```
869 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
870 where
871 K: Ord,
872 {
873 self.entries()
874 .assume_ordering(nondet!(
875 /// There is only one element associated with each key, and the keys are totallly
876 /// ordered so we will produce a deterministic value. We can't call
877 /// `reduce_commutative_idempotent` because the closure technically isn't commutative
878 /// in the case where both passed entries have the same key but different values.
879 ///
880 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
881 /// the two inputs do not have the same key.
882 ))
883 .reduce_idempotent(q!({
884 move |curr, new| {
885 if new.0 > curr.0 {
886 *curr = new;
887 }
888 }
889 }))
890 }
891
892 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
893 /// element, the value.
894 ///
895 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
896 ///
897 /// # Example
898 /// ```rust
899 /// # use hydro_lang::prelude::*;
900 /// # use futures::StreamExt;
901 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
902 /// let keyed_singleton = // { 1: 2, 2: 4 }
903 /// # process
904 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
905 /// # .into_keyed()
906 /// # .first();
907 /// keyed_singleton
908 /// .clone()
909 /// .into_keyed_stream()
910 /// .interleave(
911 /// keyed_singleton.into_keyed_stream()
912 /// )
913 /// # .entries()
914 /// # }, |mut stream| async move {
915 /// /// // { 1: [2, 2], 2: [4, 4] }
916 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
917 /// # assert_eq!(stream.next().await.unwrap(), w);
918 /// # }
919 /// # }));
920 /// ```
921 pub fn into_keyed_stream(
922 self,
923 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
924 KeyedStream::new(
925 self.location.clone(),
926 HydroNode::Cast {
927 inner: Box::new(self.ir_node.into_inner()),
928 metadata: self.location.new_node_metadata(KeyedStream::<
929 K,
930 V,
931 L,
932 B::UnderlyingBound,
933 TotalOrder,
934 ExactlyOnce,
935 >::collection_kind()),
936 },
937 )
938 }
939}
940
941impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
942 /// Gets the value associated with a specific key from the keyed singleton.
943 ///
944 /// # Example
945 /// ```rust
946 /// # use hydro_lang::prelude::*;
947 /// # use futures::StreamExt;
948 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
949 /// let tick = process.tick();
950 /// let keyed_data = process
951 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
952 /// .into_keyed()
953 /// .batch(&tick, nondet!(/** test */))
954 /// .first();
955 /// let key = tick.singleton(q!(1));
956 /// keyed_data.get(key).all_ticks()
957 /// # }, |mut stream| async move {
958 /// // 2
959 /// # assert_eq!(stream.next().await.unwrap(), 2);
960 /// # }));
961 /// ```
962 pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
963 self.entries()
964 .join(key.into_stream().map(q!(|k| (k, ()))))
965 .map(q!(|(_, (v, _))| v))
966 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
967 .first()
968 }
969
970 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
971 /// is some additional metadata, emits a keyed stream of lookup results where the key
972 /// is the same as before, but the value is a tuple of the lookup result and the metadata
973 /// of the request. If the key is not found, no output will be produced.
974 ///
975 /// # Example
976 /// ```rust
977 /// # use hydro_lang::prelude::*;
978 /// # use futures::StreamExt;
979 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980 /// let tick = process.tick();
981 /// let keyed_data = process
982 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
983 /// .into_keyed()
984 /// .batch(&tick, nondet!(/** test */))
985 /// .first();
986 /// let other_data = process
987 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
988 /// .into_keyed()
989 /// .batch(&tick, nondet!(/** test */));
990 /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
991 /// # }, |mut stream| async move {
992 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
993 /// # let mut results = vec![];
994 /// # for _ in 0..3 {
995 /// # results.push(stream.next().await.unwrap());
996 /// # }
997 /// # results.sort();
998 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
999 /// # }));
1000 /// ```
1001 pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
1002 self,
1003 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
1004 ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
1005 self.entries()
1006 .weaker_retries::<R2>()
1007 .join(requests.entries())
1008 .into_keyed()
1009 }
1010
1011 /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
1012 /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
1013 /// containing the value from `self` and an option of the value from `from`. If the key is not
1014 /// present in `from`, the option will be [`None`].
1015 ///
1016 /// # Example
1017 /// ```rust
1018 /// # use hydro_lang::prelude::*;
1019 /// # use futures::StreamExt;
1020 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1021 /// # let tick = process.tick();
1022 /// let requests = // { 1: 10, 2: 20 }
1023 /// # process
1024 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
1025 /// # .into_keyed()
1026 /// # .batch(&tick, nondet!(/** test */))
1027 /// # .first();
1028 /// let other_data = // { 10: 100, 11: 101 }
1029 /// # process
1030 /// # .source_iter(q!(vec![(10, 100), (11, 101)]))
1031 /// # .into_keyed()
1032 /// # .batch(&tick, nondet!(/** test */))
1033 /// # .first();
1034 /// requests.get_from(other_data)
1035 /// # .entries().all_ticks()
1036 /// # }, |mut stream| async move {
1037 /// // { 1: (10, Some(100)), 2: (20, None) }
1038 /// # let mut results = vec![];
1039 /// # for _ in 0..2 {
1040 /// # results.push(stream.next().await.unwrap());
1041 /// # }
1042 /// # results.sort();
1043 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1044 /// # }));
1045 /// ```
1046 pub fn get_from<V2: Clone>(
1047 self,
1048 from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
1049 ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
1050 where
1051 K: Clone,
1052 V: Hash + Eq + Clone,
1053 {
1054 let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
1055 let lookup_result = from.get_many_if_present(to_lookup.clone());
1056 let missing_values =
1057 to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
1058 let result_stream = lookup_result
1059 .entries()
1060 .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
1061 .into_keyed()
1062 .chain(
1063 missing_values
1064 .entries()
1065 .map(q!(|(v, k)| (k, (v, None))))
1066 .into_keyed(),
1067 );
1068
1069 KeyedSingleton::new(
1070 result_stream.location.clone(),
1071 HydroNode::Cast {
1072 inner: Box::new(result_stream.ir_node.into_inner()),
1073 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1074 K,
1075 (V, Option<V2>),
1076 Tick<L>,
1077 Bounded,
1078 >::collection_kind(
1079 )),
1080 },
1081 )
1082 }
1083}
1084
1085impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1086where
1087 L: Location<'a>,
1088{
1089 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1090 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1091 ///
1092 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1093 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1094 /// argument that declares where the keyed singleton will be atomically processed. Batching a
1095 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1096 /// batching into a different [`Tick`] will introduce asynchrony.
1097 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1098 let out_location = Atomic { tick: tick.clone() };
1099 KeyedSingleton::new(
1100 out_location.clone(),
1101 HydroNode::BeginAtomic {
1102 inner: Box::new(self.ir_node.into_inner()),
1103 metadata: out_location
1104 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1105 },
1106 )
1107 }
1108}
1109
1110impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1111where
1112 L: Location<'a> + NoTick,
1113{
1114 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1115 /// See [`KeyedSingleton::atomic`] for more details.
1116 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1117 KeyedSingleton::new(
1118 self.location.tick.l.clone(),
1119 HydroNode::EndAtomic {
1120 inner: Box::new(self.ir_node.into_inner()),
1121 metadata: self
1122 .location
1123 .tick
1124 .l
1125 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1126 },
1127 )
1128 }
1129}
1130
1131impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1132 /// Asynchronously yields this keyed singleton outside the tick, which will
1133 /// be asynchronously updated with the latest set of entries inside the tick.
1134 ///
1135 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1136 /// tick that tracks the inner value. This is useful for getting the value as of the
1137 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1138 ///
1139 /// The entire set of entries are propagated on each tick, which means that if a tick
1140 /// does not have a key "XYZ" that was present in the previous tick, the entry for "XYZ" will
1141 /// also be removed from the output.
1142 ///
1143 /// # Example
1144 /// ```rust
1145 /// # use hydro_lang::prelude::*;
1146 /// # use futures::StreamExt;
1147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148 /// let tick = process.tick();
1149 /// # // ticks are lazy by default, forces the second tick to run
1150 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1151 /// # let batch_first_tick = process
1152 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1153 /// # .batch(&tick, nondet!(/** test */))
1154 /// # .into_keyed();
1155 /// # let batch_second_tick = process
1156 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1157 /// # .batch(&tick, nondet!(/** test */))
1158 /// # .into_keyed()
1159 /// # .defer_tick(); // appears on the second tick
1160 /// # let input_batch = batch_first_tick.chain(batch_second_tick).first();
1161 /// input_batch // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1162 /// .latest()
1163 /// # .snapshot(&tick, nondet!(/** test */))
1164 /// # .entries()
1165 /// # .all_ticks()
1166 /// # }, |mut stream| async move {
1167 /// // asynchronously changes from { 1: 2, 2: 3 } ~> { 2: 4, 3: 5 }
1168 /// # for w in vec![(1, 2), (2, 3), (2, 4), (3, 5)] {
1169 /// # assert_eq!(stream.next().await.unwrap(), w);
1170 /// # }
1171 /// # }));
1172 /// ```
1173 pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
1174 KeyedSingleton::new(
1175 self.location.outer().clone(),
1176 HydroNode::YieldConcat {
1177 inner: Box::new(self.ir_node.into_inner()),
1178 metadata: self.location.outer().new_node_metadata(KeyedSingleton::<
1179 K,
1180 V,
1181 L,
1182 Unbounded,
1183 >::collection_kind(
1184 )),
1185 },
1186 )
1187 }
1188
1189 /// Synchronously yields this keyed singleton outside the tick as an unbounded keyed singleton,
1190 /// which will be updated with the latest set of entries inside the tick.
1191 ///
1192 /// Unlike [`KeyedSingleton::latest`], this preserves synchronous execution, as the output
1193 /// keyed singleton is emitted in an [`Atomic`] context that will process elements synchronously
1194 /// with the input keyed singleton's [`Tick`] context.
1195 pub fn latest_atomic(self) -> KeyedSingleton<K, V, Atomic<L>, Unbounded> {
1196 let out_location = Atomic {
1197 tick: self.location.clone(),
1198 };
1199
1200 KeyedSingleton::new(
1201 out_location.clone(),
1202 HydroNode::YieldConcat {
1203 inner: Box::new(self.ir_node.into_inner()),
1204 metadata: out_location.new_node_metadata(KeyedSingleton::<
1205 K,
1206 V,
1207 Atomic<L>,
1208 Unbounded,
1209 >::collection_kind()),
1210 },
1211 )
1212 }
1213
1214 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1215 /// tick `T` always has the entries of `self` at tick `T - 1`.
1216 ///
1217 /// At tick `0`, the output has no entries, since there is no previous tick.
1218 ///
1219 /// This operator enables stateful iterative processing with ticks, by sending data from one
1220 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1221 ///
1222 /// # Example
1223 /// ```rust
1224 /// # use hydro_lang::prelude::*;
1225 /// # use futures::StreamExt;
1226 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1227 /// let tick = process.tick();
1228 /// # // ticks are lazy by default, forces the second tick to run
1229 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1230 /// # let batch_first_tick = process
1231 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1232 /// # .batch(&tick, nondet!(/** test */))
1233 /// # .into_keyed();
1234 /// # let batch_second_tick = process
1235 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1236 /// # .batch(&tick, nondet!(/** test */))
1237 /// # .into_keyed()
1238 /// # .defer_tick(); // appears on the second tick
1239 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1240 /// # batch_first_tick.chain(batch_second_tick).first();
1241 /// input_batch.clone().filter_key_not_in(
1242 /// input_batch.defer_tick().keys() // keys present in the previous tick
1243 /// )
1244 /// # .entries().all_ticks()
1245 /// # }, |mut stream| async move {
1246 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1247 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1248 /// # assert_eq!(stream.next().await.unwrap(), w);
1249 /// # }
1250 /// # }));
1251 /// ```
1252 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1253 KeyedSingleton::new(
1254 self.location.clone(),
1255 HydroNode::DeferTick {
1256 input: Box::new(self.ir_node.into_inner()),
1257 metadata: self
1258 .location
1259 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1260 },
1261 )
1262 }
1263}
1264
1265impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1266where
1267 L: Location<'a>,
1268{
1269 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1270 /// point in time.
1271 ///
1272 /// # Non-Determinism
1273 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1274 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1275 pub fn snapshot(
1276 self,
1277 tick: &Tick<L>,
1278 _nondet: NonDet,
1279 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1280 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1281 KeyedSingleton::new(
1282 tick.clone(),
1283 HydroNode::Batch {
1284 inner: Box::new(self.ir_node.into_inner()),
1285 metadata: tick
1286 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1287 },
1288 )
1289 }
1290}
1291
1292impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1293where
1294 L: Location<'a> + NoTick,
1295{
1296 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1297 /// state of the keyed singleton being atomically processed.
1298 ///
1299 /// # Non-Determinism
1300 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1301 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1302 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1303 KeyedSingleton::new(
1304 self.location.clone().tick,
1305 HydroNode::Batch {
1306 inner: Box::new(self.ir_node.into_inner()),
1307 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1308 K,
1309 V,
1310 Tick<L>,
1311 Bounded,
1312 >::collection_kind(
1313 )),
1314 },
1315 )
1316 }
1317}
1318
1319impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1320where
1321 L: Location<'a> + NoTick,
1322{
1323 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1324 /// arrived since the previous batch was released.
1325 ///
1326 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1327 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1328 ///
1329 /// # Non-Determinism
1330 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1331 /// has a non-deterministic set of key-value pairs.
1332 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1333 self.atomic(tick).batch_atomic(nondet)
1334 }
1335}
1336
1337impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1338where
1339 L: Location<'a> + NoTick,
1340{
1341 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1342 /// atomically processed.
1343 ///
1344 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1345 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1346 ///
1347 /// # Non-Determinism
1348 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1349 /// has a non-deterministic set of key-value pairs.
1350 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1351 let _ = nondet;
1352 KeyedSingleton::new(
1353 self.location.clone().tick,
1354 HydroNode::Batch {
1355 inner: Box::new(self.ir_node.into_inner()),
1356 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1357 K,
1358 V,
1359 Tick<L>,
1360 Bounded,
1361 >::collection_kind(
1362 )),
1363 },
1364 )
1365 }
1366}
1367
1368#[cfg(test)]
1369mod tests {
1370 use std::collections::HashMap;
1371
1372 use futures::{SinkExt, StreamExt};
1373 use hydro_deploy::Deployment;
1374 use stageleft::q;
1375
1376 use crate::compile::builder::FlowBuilder;
1377 use crate::location::Location;
1378 use crate::nondet::nondet;
1379
1380 #[tokio::test]
1381 async fn key_count_bounded_value() {
1382 let mut deployment = Deployment::new();
1383
1384 let flow = FlowBuilder::new();
1385 let node = flow.process::<()>();
1386 let external = flow.external::<()>();
1387
1388 let (input_port, input) = node.source_external_bincode(&external);
1389 let out = input
1390 .into_keyed()
1391 .first()
1392 .key_count()
1393 .sample_eager(nondet!(/** test */))
1394 .send_bincode_external(&external);
1395
1396 let nodes = flow
1397 .with_process(&node, deployment.Localhost())
1398 .with_external(&external, deployment.Localhost())
1399 .deploy(&mut deployment);
1400
1401 deployment.deploy().await.unwrap();
1402
1403 let mut external_in = nodes.connect(input_port).await;
1404 let mut external_out = nodes.connect(out).await;
1405
1406 deployment.start().await.unwrap();
1407
1408 assert_eq!(external_out.next().await.unwrap(), 0);
1409
1410 external_in.send((1, 1)).await.unwrap();
1411 assert_eq!(external_out.next().await.unwrap(), 1);
1412
1413 external_in.send((2, 2)).await.unwrap();
1414 assert_eq!(external_out.next().await.unwrap(), 2);
1415 }
1416
1417 #[tokio::test]
1418 async fn key_count_unbounded_value() {
1419 let mut deployment = Deployment::new();
1420
1421 let flow = FlowBuilder::new();
1422 let node = flow.process::<()>();
1423 let external = flow.external::<()>();
1424
1425 let (input_port, input) = node.source_external_bincode(&external);
1426 let out = input
1427 .into_keyed()
1428 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1429 .key_count()
1430 .sample_eager(nondet!(/** test */))
1431 .send_bincode_external(&external);
1432
1433 let nodes = flow
1434 .with_process(&node, deployment.Localhost())
1435 .with_external(&external, deployment.Localhost())
1436 .deploy(&mut deployment);
1437
1438 deployment.deploy().await.unwrap();
1439
1440 let mut external_in = nodes.connect(input_port).await;
1441 let mut external_out = nodes.connect(out).await;
1442
1443 deployment.start().await.unwrap();
1444
1445 assert_eq!(external_out.next().await.unwrap(), 0);
1446
1447 external_in.send((1, 1)).await.unwrap();
1448 assert_eq!(external_out.next().await.unwrap(), 1);
1449
1450 external_in.send((1, 2)).await.unwrap();
1451 assert_eq!(external_out.next().await.unwrap(), 1);
1452
1453 external_in.send((2, 2)).await.unwrap();
1454 assert_eq!(external_out.next().await.unwrap(), 2);
1455
1456 external_in.send((1, 1)).await.unwrap();
1457 assert_eq!(external_out.next().await.unwrap(), 2);
1458
1459 external_in.send((3, 1)).await.unwrap();
1460 assert_eq!(external_out.next().await.unwrap(), 3);
1461 }
1462
1463 #[tokio::test]
1464 async fn into_singleton_bounded_value() {
1465 let mut deployment = Deployment::new();
1466
1467 let flow = FlowBuilder::new();
1468 let node = flow.process::<()>();
1469 let external = flow.external::<()>();
1470
1471 let (input_port, input) = node.source_external_bincode(&external);
1472 let out = input
1473 .into_keyed()
1474 .first()
1475 .into_singleton()
1476 .sample_eager(nondet!(/** test */))
1477 .send_bincode_external(&external);
1478
1479 let nodes = flow
1480 .with_process(&node, deployment.Localhost())
1481 .with_external(&external, deployment.Localhost())
1482 .deploy(&mut deployment);
1483
1484 deployment.deploy().await.unwrap();
1485
1486 let mut external_in = nodes.connect(input_port).await;
1487 let mut external_out = nodes.connect(out).await;
1488
1489 deployment.start().await.unwrap();
1490
1491 assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1492
1493 external_in.send((1, 1)).await.unwrap();
1494 assert_eq!(
1495 external_out.next().await.unwrap(),
1496 vec![(1, 1)].into_iter().collect()
1497 );
1498
1499 external_in.send((2, 2)).await.unwrap();
1500 assert_eq!(
1501 external_out.next().await.unwrap(),
1502 vec![(1, 1), (2, 2)].into_iter().collect()
1503 );
1504 }
1505
1506 #[tokio::test]
1507 async fn into_singleton_unbounded_value() {
1508 let mut deployment = Deployment::new();
1509
1510 let flow = FlowBuilder::new();
1511 let node = flow.process::<()>();
1512 let external = flow.external::<()>();
1513
1514 let (input_port, input) = node.source_external_bincode(&external);
1515 let out = input
1516 .into_keyed()
1517 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1518 .into_singleton()
1519 .sample_eager(nondet!(/** test */))
1520 .send_bincode_external(&external);
1521
1522 let nodes = flow
1523 .with_process(&node, deployment.Localhost())
1524 .with_external(&external, deployment.Localhost())
1525 .deploy(&mut deployment);
1526
1527 deployment.deploy().await.unwrap();
1528
1529 let mut external_in = nodes.connect(input_port).await;
1530 let mut external_out = nodes.connect(out).await;
1531
1532 deployment.start().await.unwrap();
1533
1534 assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1535
1536 external_in.send((1, 1)).await.unwrap();
1537 assert_eq!(
1538 external_out.next().await.unwrap(),
1539 vec![(1, 1)].into_iter().collect()
1540 );
1541
1542 external_in.send((1, 2)).await.unwrap();
1543 assert_eq!(
1544 external_out.next().await.unwrap(),
1545 vec![(1, 2)].into_iter().collect()
1546 );
1547
1548 external_in.send((2, 2)).await.unwrap();
1549 assert_eq!(
1550 external_out.next().await.unwrap(),
1551 vec![(1, 2), (2, 1)].into_iter().collect()
1552 );
1553
1554 external_in.send((1, 1)).await.unwrap();
1555 assert_eq!(
1556 external_out.next().await.unwrap(),
1557 vec![(1, 3), (2, 1)].into_iter().collect()
1558 );
1559
1560 external_in.send((3, 1)).await.unwrap();
1561 assert_eq!(
1562 external_out.next().await.unwrap(),
1563 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1564 );
1565 }
1566}