hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20use crate::forward_handle::ForwardRef;
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
31///
32/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
33/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
34/// indicates that entries may be added over time, but once an entry is added it will never be
35/// removed and its value will never change.
36pub trait KeyedSingletonBound {
37 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
38 type UnderlyingBound: Boundedness;
39 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
40 type ValueBound: Boundedness;
41
42 /// The type of the keyed singleton if the value for each key is immutable.
43 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
44
45 /// The type of the keyed singleton if the value for each key may change asynchronously.
46 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
47
48 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
49 fn bound_kind() -> KeyedSingletonBoundKind;
50}
51
52impl KeyedSingletonBound for Unbounded {
53 type UnderlyingBound = Unbounded;
54 type ValueBound = Unbounded;
55 type WithBoundedValue = BoundedValue;
56 type WithUnboundedValue = Unbounded;
57
58 fn bound_kind() -> KeyedSingletonBoundKind {
59 KeyedSingletonBoundKind::Unbounded
60 }
61}
62
63impl KeyedSingletonBound for Bounded {
64 type UnderlyingBound = Bounded;
65 type ValueBound = Bounded;
66 type WithBoundedValue = Bounded;
67 type WithUnboundedValue = UnreachableBound;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Bounded
71 }
72}
73
74/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
75/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
76/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
77pub struct BoundedValue;
78
79impl KeyedSingletonBound for BoundedValue {
80 type UnderlyingBound = Unbounded;
81 type ValueBound = Bounded;
82 type WithBoundedValue = BoundedValue;
83 type WithUnboundedValue = Unbounded;
84
85 fn bound_kind() -> KeyedSingletonBoundKind {
86 KeyedSingletonBoundKind::BoundedValue
87 }
88}
89
90#[doc(hidden)]
91pub struct UnreachableBound;
92
93impl KeyedSingletonBound for UnreachableBound {
94 type UnderlyingBound = Bounded;
95 type ValueBound = Unbounded;
96
97 type WithBoundedValue = Bounded;
98 type WithUnboundedValue = UnreachableBound;
99
100 fn bound_kind() -> KeyedSingletonBoundKind {
101 unreachable!("UnreachableBound cannot be instantiated")
102 }
103}
104
105/// Mapping from keys of type `K` to values of type `V`.
106///
107/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
108/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
109/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
110/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
111/// keys cannot be removed and the value for each key is immutable.
112///
113/// Type Parameters:
114/// - `K`: the type of the key for each entry
115/// - `V`: the type of the value for each entry
116/// - `Loc`: the [`Location`] where the keyed singleton is materialized
117/// - `Bound`: tracks whether the entries are:
118/// - [`Bounded`] (local and finite)
119/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
120/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
121pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
122 pub(crate) location: Loc,
123 pub(crate) ir_node: RefCell<HydroNode>,
124
125 _phantom: PhantomData<(K, V, Loc, Bound)>,
126}
127
128impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
129 for KeyedSingleton<K, V, Loc, Bound>
130{
131 fn clone(&self) -> Self {
132 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
133 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
134 *self.ir_node.borrow_mut() = HydroNode::Tee {
135 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
136 metadata: self.location.new_node_metadata(Self::collection_kind()),
137 };
138 }
139
140 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
141 KeyedSingleton {
142 location: self.location.clone(),
143 ir_node: HydroNode::Tee {
144 inner: TeeNode(inner.0.clone()),
145 metadata: metadata.clone(),
146 }
147 .into(),
148 _phantom: PhantomData,
149 }
150 } else {
151 unreachable!()
152 }
153 }
154}
155
156impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
157 for KeyedSingleton<K, V, L, B>
158where
159 L: Location<'a> + NoTick,
160{
161 type Location = L;
162
163 fn create_source(ident: syn::Ident, location: L) -> Self {
164 KeyedSingleton {
165 location: location.clone(),
166 ir_node: RefCell::new(HydroNode::CycleSource {
167 ident,
168 metadata: location.new_node_metadata(Self::collection_kind()),
169 }),
170 _phantom: PhantomData,
171 }
172 }
173}
174
175impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
176 for KeyedSingleton<K, V, L, B>
177where
178 L: Location<'a> + NoTick,
179{
180 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
181 assert_eq!(
182 Location::id(&self.location),
183 expected_location,
184 "locations do not match"
185 );
186 self.location
187 .flow_state()
188 .borrow_mut()
189 .push_root(HydroRoot::CycleSink {
190 ident,
191 input: Box::new(self.ir_node.into_inner()),
192 op_metadata: HydroIrOpMetadata::new(),
193 });
194 }
195}
196
197impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
198 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
199 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
200 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
201
202 KeyedSingleton {
203 location,
204 ir_node: RefCell::new(ir_node),
205 _phantom: PhantomData,
206 }
207 }
208
209 /// Returns the [`Location`] where this keyed singleton is being materialized.
210 pub fn location(&self) -> &L {
211 &self.location
212 }
213}
214
215#[cfg(stageleft_runtime)]
216fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
217 me: KeyedSingleton<K, V, L, Bounded>,
218) -> Singleton<usize, L, Bounded> {
219 me.entries().count()
220}
221
222#[cfg(stageleft_runtime)]
223fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
224 me: KeyedSingleton<K, V, L, Bounded>,
225) -> Singleton<HashMap<K, V>, L, Bounded>
226where
227 K: Eq + Hash,
228{
229 me.entries()
230 .assume_ordering(nondet!(
231 /// Because this is a keyed singleton, there is only one value per key.
232 ))
233 .fold(
234 q!(|| HashMap::new()),
235 q!(|map, (k, v)| {
236 map.insert(k, v);
237 }),
238 )
239}
240
241impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
242 pub(crate) fn collection_kind() -> CollectionKind {
243 CollectionKind::KeyedSingleton {
244 bound: B::bound_kind(),
245 key_type: stageleft::quote_type::<K>().into(),
246 value_type: stageleft::quote_type::<V>().into(),
247 }
248 }
249
250 /// Transforms each value by invoking `f` on each element, with keys staying the same
251 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
252 ///
253 /// If you do not want to modify the stream and instead only want to view
254 /// each item use [`KeyedSingleton::inspect`] instead.
255 ///
256 /// # Example
257 /// ```rust
258 /// # use hydro_lang::prelude::*;
259 /// # use futures::StreamExt;
260 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
261 /// let keyed_singleton = // { 1: 2, 2: 4 }
262 /// # process
263 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
264 /// # .into_keyed()
265 /// # .first();
266 /// keyed_singleton.map(q!(|v| v + 1))
267 /// # .entries()
268 /// # }, |mut stream| async move {
269 /// // { 1: 3, 2: 5 }
270 /// # let mut results = Vec::new();
271 /// # for _ in 0..2 {
272 /// # results.push(stream.next().await.unwrap());
273 /// # }
274 /// # results.sort();
275 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
276 /// # }));
277 /// ```
278 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
279 where
280 F: Fn(V) -> U + 'a,
281 {
282 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
283 let map_f = q!({
284 let orig = f;
285 move |(k, v)| (k, orig(v))
286 })
287 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
288 .into();
289
290 KeyedSingleton::new(
291 self.location.clone(),
292 HydroNode::Map {
293 f: map_f,
294 input: Box::new(self.ir_node.into_inner()),
295 metadata: self
296 .location
297 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
298 },
299 )
300 }
301
302 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
303 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
304 ///
305 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
306 /// the new value `U`. The key remains unchanged in the output.
307 ///
308 /// # Example
309 /// ```rust
310 /// # use hydro_lang::prelude::*;
311 /// # use futures::StreamExt;
312 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
313 /// let keyed_singleton = // { 1: 2, 2: 4 }
314 /// # process
315 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
316 /// # .into_keyed()
317 /// # .first();
318 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
319 /// # .entries()
320 /// # }, |mut stream| async move {
321 /// // { 1: 3, 2: 6 }
322 /// # let mut results = Vec::new();
323 /// # for _ in 0..2 {
324 /// # results.push(stream.next().await.unwrap());
325 /// # }
326 /// # results.sort();
327 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
328 /// # }));
329 /// ```
330 pub fn map_with_key<U, F>(
331 self,
332 f: impl IntoQuotedMut<'a, F, L> + Copy,
333 ) -> KeyedSingleton<K, U, L, B>
334 where
335 F: Fn((K, V)) -> U + 'a,
336 K: Clone,
337 {
338 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
339 let map_f = q!({
340 let orig = f;
341 move |(k, v)| {
342 let out = orig((Clone::clone(&k), v));
343 (k, out)
344 }
345 })
346 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
347 .into();
348
349 KeyedSingleton::new(
350 self.location.clone(),
351 HydroNode::Map {
352 f: map_f,
353 input: Box::new(self.ir_node.into_inner()),
354 metadata: self
355 .location
356 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
357 },
358 )
359 }
360
361 /// Gets the number of keys in the keyed singleton.
362 ///
363 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
364 /// since keys may be added / removed over time. When the set of keys changes, the count will
365 /// be asynchronously updated.
366 ///
367 /// # Example
368 /// ```rust
369 /// # use hydro_lang::prelude::*;
370 /// # use futures::StreamExt;
371 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
372 /// # let tick = process.tick();
373 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
374 /// # process
375 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
376 /// # .into_keyed()
377 /// # .batch(&tick, nondet!(/** test */))
378 /// # .first();
379 /// keyed_singleton.key_count()
380 /// # .all_ticks()
381 /// # }, |mut stream| async move {
382 /// // 3
383 /// # assert_eq!(stream.next().await.unwrap(), 3);
384 /// # }));
385 /// ```
386 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
387 if B::ValueBound::BOUNDED {
388 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
389 location: self.location,
390 ir_node: self.ir_node,
391 _phantom: PhantomData,
392 };
393
394 me.entries().count()
395 } else if L::is_top_level()
396 && let Some(tick) = self.location.try_tick()
397 {
398 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
399 location: self.location,
400 ir_node: self.ir_node,
401 _phantom: PhantomData,
402 };
403
404 let out =
405 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
406 .latest();
407 Singleton::new(out.location, out.ir_node.into_inner())
408 } else {
409 panic!("Unbounded KeyedSingleton inside a tick");
410 }
411 }
412
413 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
414 ///
415 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
416 /// asynchronously as well.
417 ///
418 /// # Example
419 /// ```rust
420 /// # use hydro_lang::prelude::*;
421 /// # use futures::StreamExt;
422 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
423 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
424 /// # process
425 /// # .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
426 /// # .into_keyed()
427 /// # .batch(&process.tick(), nondet!(/** test */))
428 /// # .first();
429 /// keyed_singleton.into_singleton()
430 /// # .all_ticks()
431 /// # }, |mut stream| async move {
432 /// // { 1: "a", 2: "b", 3: "c" }
433 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
434 /// # }));
435 /// ```
436 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
437 where
438 K: Eq + Hash,
439 {
440 if B::ValueBound::BOUNDED {
441 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
442 location: self.location,
443 ir_node: self.ir_node,
444 _phantom: PhantomData,
445 };
446
447 me.entries()
448 .assume_ordering(nondet!(
449 /// Because this is a keyed singleton, there is only one value per key.
450 ))
451 .fold(
452 q!(|| HashMap::new()),
453 q!(|map, (k, v)| {
454 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
455 map.insert(k, v);
456 }),
457 )
458 } else if L::is_top_level()
459 && let Some(tick) = self.location.try_tick()
460 {
461 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
462 location: self.location,
463 ir_node: self.ir_node,
464 _phantom: PhantomData,
465 };
466
467 let out = into_singleton_inside_tick(
468 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
469 )
470 .latest();
471 Singleton::new(out.location, out.ir_node.into_inner())
472 } else {
473 panic!("Unbounded KeyedSingleton inside a tick");
474 }
475 }
476
477 /// An operator which allows you to "name" a `HydroNode`.
478 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
479 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
480 {
481 let mut node = self.ir_node.borrow_mut();
482 let metadata = node.metadata_mut();
483 metadata.tag = Some(name.to_string());
484 }
485 self
486 }
487}
488
489impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
490 KeyedSingleton<K, V, L, B>
491{
492 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
493 ///
494 /// The value for each key must be bounded, otherwise the resulting stream elements would be
495 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
496 /// into the output.
497 ///
498 /// # Example
499 /// ```rust
500 /// # use hydro_lang::prelude::*;
501 /// # use futures::StreamExt;
502 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
503 /// let keyed_singleton = // { 1: 2, 2: 4 }
504 /// # process
505 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
506 /// # .into_keyed()
507 /// # .first();
508 /// keyed_singleton.entries()
509 /// # }, |mut stream| async move {
510 /// // (1, 2), (2, 4) in any order
511 /// # let mut results = Vec::new();
512 /// # for _ in 0..2 {
513 /// # results.push(stream.next().await.unwrap());
514 /// # }
515 /// # results.sort();
516 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
517 /// # }));
518 /// ```
519 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
520 self.into_keyed_stream().entries()
521 }
522
523 /// Flattens the keyed singleton into an unordered stream of just the values.
524 ///
525 /// The value for each key must be bounded, otherwise the resulting stream elements would be
526 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
527 /// into the output.
528 ///
529 /// # Example
530 /// ```rust
531 /// # use hydro_lang::prelude::*;
532 /// # use futures::StreamExt;
533 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
534 /// let keyed_singleton = // { 1: 2, 2: 4 }
535 /// # process
536 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
537 /// # .into_keyed()
538 /// # .first();
539 /// keyed_singleton.values()
540 /// # }, |mut stream| async move {
541 /// // 2, 4 in any order
542 /// # let mut results = Vec::new();
543 /// # for _ in 0..2 {
544 /// # results.push(stream.next().await.unwrap());
545 /// # }
546 /// # results.sort();
547 /// # assert_eq!(results, vec![2, 4]);
548 /// # }));
549 /// ```
550 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
551 let map_f = q!(|(_, v)| v)
552 .splice_fn1_ctx::<(K, V), V>(&self.location)
553 .into();
554
555 Stream::new(
556 self.location.clone(),
557 HydroNode::Map {
558 f: map_f,
559 input: Box::new(self.ir_node.into_inner()),
560 metadata: self.location.new_node_metadata(Stream::<
561 V,
562 L,
563 B::UnderlyingBound,
564 NoOrder,
565 ExactlyOnce,
566 >::collection_kind()),
567 },
568 )
569 }
570
571 /// Flattens the keyed singleton into an unordered stream of just the keys.
572 ///
573 /// The value for each key must be bounded, otherwise the removal of keys would result in
574 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
575 /// into the output.
576 ///
577 /// # Example
578 /// ```rust
579 /// # use hydro_lang::prelude::*;
580 /// # use futures::StreamExt;
581 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582 /// let keyed_singleton = // { 1: 2, 2: 4 }
583 /// # process
584 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
585 /// # .into_keyed()
586 /// # .first();
587 /// keyed_singleton.keys()
588 /// # }, |mut stream| async move {
589 /// // 1, 2 in any order
590 /// # let mut results = Vec::new();
591 /// # for _ in 0..2 {
592 /// # results.push(stream.next().await.unwrap());
593 /// # }
594 /// # results.sort();
595 /// # assert_eq!(results, vec![1, 2]);
596 /// # }));
597 /// ```
598 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
599 self.entries().map(q!(|(k, _)| k))
600 }
601
602 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
603 /// entries whose keys are not in the provided stream.
604 ///
605 /// # Example
606 /// ```rust
607 /// # use hydro_lang::prelude::*;
608 /// # use futures::StreamExt;
609 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610 /// let tick = process.tick();
611 /// let keyed_singleton = // { 1: 2, 2: 4 }
612 /// # process
613 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
614 /// # .into_keyed()
615 /// # .first()
616 /// # .batch(&tick, nondet!(/** test */));
617 /// let keys_to_remove = process
618 /// .source_iter(q!(vec![1]))
619 /// .batch(&tick, nondet!(/** test */));
620 /// keyed_singleton.filter_key_not_in(keys_to_remove)
621 /// # .entries().all_ticks()
622 /// # }, |mut stream| async move {
623 /// // { 2: 4 }
624 /// # for w in vec![(2, 4)] {
625 /// # assert_eq!(stream.next().await.unwrap(), w);
626 /// # }
627 /// # }));
628 /// ```
629 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
630 self,
631 other: Stream<K, L, Bounded, O2, R2>,
632 ) -> Self
633 where
634 K: Hash + Eq,
635 {
636 check_matching_location(&self.location, &other.location);
637
638 KeyedSingleton::new(
639 self.location.clone(),
640 HydroNode::AntiJoin {
641 pos: Box::new(self.ir_node.into_inner()),
642 neg: Box::new(other.ir_node.into_inner()),
643 metadata: self.location.new_node_metadata(Self::collection_kind()),
644 },
645 )
646 }
647
648 /// An operator which allows you to "inspect" each value of a keyed singleton without
649 /// modifying it. The closure `f` is called on a reference to each value. This is
650 /// mainly useful for debugging, and should not be used to generate side-effects.
651 ///
652 /// # Example
653 /// ```rust
654 /// # use hydro_lang::prelude::*;
655 /// # use futures::StreamExt;
656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
657 /// let keyed_singleton = // { 1: 2, 2: 4 }
658 /// # process
659 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
660 /// # .into_keyed()
661 /// # .first();
662 /// keyed_singleton
663 /// .inspect(q!(|v| println!("{}", v)))
664 /// # .entries()
665 /// # }, |mut stream| async move {
666 /// // { 1: 2, 2: 4 }
667 /// # for w in vec![(1, 2), (2, 4)] {
668 /// # assert_eq!(stream.next().await.unwrap(), w);
669 /// # }
670 /// # }));
671 /// ```
672 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
673 where
674 F: Fn(&V) + 'a,
675 {
676 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
677 let inspect_f = q!({
678 let orig = f;
679 move |t: &(_, _)| orig(&t.1)
680 })
681 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
682 .into();
683
684 KeyedSingleton::new(
685 self.location.clone(),
686 HydroNode::Inspect {
687 f: inspect_f,
688 input: Box::new(self.ir_node.into_inner()),
689 metadata: self.location.new_node_metadata(Self::collection_kind()),
690 },
691 )
692 }
693
694 /// An operator which allows you to "inspect" each entry of a keyed singleton without
695 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
696 /// mainly useful for debugging, and should not be used to generate side-effects.
697 ///
698 /// # Example
699 /// ```rust
700 /// # use hydro_lang::prelude::*;
701 /// # use futures::StreamExt;
702 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
703 /// let keyed_singleton = // { 1: 2, 2: 4 }
704 /// # process
705 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
706 /// # .into_keyed()
707 /// # .first();
708 /// keyed_singleton
709 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
710 /// # .entries()
711 /// # }, |mut stream| async move {
712 /// // { 1: 2, 2: 4 }
713 /// # for w in vec![(1, 2), (2, 4)] {
714 /// # assert_eq!(stream.next().await.unwrap(), w);
715 /// # }
716 /// # }));
717 /// ```
718 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
719 where
720 F: Fn(&(K, V)) + 'a,
721 {
722 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
723
724 KeyedSingleton::new(
725 self.location.clone(),
726 HydroNode::Inspect {
727 f: inspect_f,
728 input: Box::new(self.ir_node.into_inner()),
729 metadata: self.location.new_node_metadata(Self::collection_kind()),
730 },
731 )
732 }
733
734 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
735 ///
736 /// Because this method requires values to be bounded, the output [`Optional`] will only be
737 /// asynchronously updated if a new key is added that is higher than the previous max key.
738 ///
739 /// # Example
740 /// ```rust
741 /// # use hydro_lang::prelude::*;
742 /// # use futures::StreamExt;
743 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
744 /// let tick = process.tick();
745 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
746 /// # process
747 /// # .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
748 /// # .into_keyed()
749 /// # .first();
750 /// keyed_singleton.get_max_key()
751 /// # .sample_eager(nondet!(/** test */))
752 /// # }, |mut stream| async move {
753 /// // (2, 456)
754 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
755 /// # }));
756 /// ```
757 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
758 where
759 K: Ord,
760 {
761 self.entries()
762 .assume_ordering(nondet!(
763 /// There is only one element associated with each key, and the keys are totallly
764 /// ordered so we will produce a deterministic value. We can't call
765 /// `reduce_commutative_idempotent` because the closure technically isn't commutative
766 /// in the case where both passed entries have the same key but different values.
767 ///
768 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
769 /// the two inputs do not have the same key.
770 ))
771 .reduce_idempotent(q!({
772 move |curr, new| {
773 if new.0 > curr.0 {
774 *curr = new;
775 }
776 }
777 }))
778 }
779
780 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
781 /// element, the value.
782 ///
783 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
784 ///
785 /// # Example
786 /// ```rust
787 /// # use hydro_lang::prelude::*;
788 /// # use futures::StreamExt;
789 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
790 /// let keyed_singleton = // { 1: 2, 2: 4 }
791 /// # process
792 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
793 /// # .into_keyed()
794 /// # .first();
795 /// keyed_singleton
796 /// .clone()
797 /// .into_keyed_stream()
798 /// .interleave(
799 /// keyed_singleton.into_keyed_stream()
800 /// )
801 /// # .entries()
802 /// # }, |mut stream| async move {
803 /// /// // { 1: [2, 2], 2: [4, 4] }
804 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
805 /// # assert_eq!(stream.next().await.unwrap(), w);
806 /// # }
807 /// # }));
808 /// ```
809 pub fn into_keyed_stream(
810 self,
811 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
812 KeyedStream::new(
813 self.location.clone(),
814 HydroNode::Cast {
815 inner: Box::new(self.ir_node.into_inner()),
816 metadata: self.location.new_node_metadata(KeyedStream::<
817 K,
818 V,
819 L,
820 B::UnderlyingBound,
821 TotalOrder,
822 ExactlyOnce,
823 >::collection_kind()),
824 },
825 )
826 }
827}
828
829impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
830 /// Gets the value associated with a specific key from the keyed singleton.
831 ///
832 /// # Example
833 /// ```rust
834 /// # use hydro_lang::prelude::*;
835 /// # use futures::StreamExt;
836 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
837 /// let tick = process.tick();
838 /// let keyed_data = process
839 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
840 /// .into_keyed()
841 /// .batch(&tick, nondet!(/** test */))
842 /// .first();
843 /// let key = tick.singleton(q!(1));
844 /// keyed_data.get(key).all_ticks()
845 /// # }, |mut stream| async move {
846 /// // 2
847 /// # assert_eq!(stream.next().await.unwrap(), 2);
848 /// # }));
849 /// ```
850 pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
851 self.entries()
852 .join(key.into_stream().map(q!(|k| (k, ()))))
853 .map(q!(|(_, (v, _))| v))
854 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
855 .first()
856 }
857
858 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
859 /// is some additional metadata, emits a keyed stream of lookup results where the key
860 /// is the same as before, but the value is a tuple of the lookup result and the metadata
861 /// of the request. If the key is not found, no output will be produced.
862 ///
863 /// # Example
864 /// ```rust
865 /// # use hydro_lang::prelude::*;
866 /// # use futures::StreamExt;
867 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
868 /// let tick = process.tick();
869 /// let keyed_data = process
870 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
871 /// .into_keyed()
872 /// .batch(&tick, nondet!(/** test */))
873 /// .first();
874 /// let other_data = process
875 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
876 /// .into_keyed()
877 /// .batch(&tick, nondet!(/** test */));
878 /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
879 /// # }, |mut stream| async move {
880 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
881 /// # let mut results = vec![];
882 /// # for _ in 0..3 {
883 /// # results.push(stream.next().await.unwrap());
884 /// # }
885 /// # results.sort();
886 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
887 /// # }));
888 /// ```
889 pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
890 self,
891 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
892 ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
893 self.entries()
894 .weaker_retries::<R2>()
895 .join(requests.entries())
896 .into_keyed()
897 }
898
899 /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
900 /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
901 /// containing the value from `self` and an option of the value from `from`. If the key is not
902 /// present in `from`, the option will be [`None`].
903 ///
904 /// # Example
905 /// ```rust
906 /// # use hydro_lang::prelude::*;
907 /// # use futures::StreamExt;
908 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
909 /// # let tick = process.tick();
910 /// let requests = // { 1: 10, 2: 20 }
911 /// # process
912 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
913 /// # .into_keyed()
914 /// # .batch(&tick, nondet!(/** test */))
915 /// # .first();
916 /// let other_data = // { 10: 100, 11: 101 }
917 /// # process
918 /// # .source_iter(q!(vec![(10, 100), (11, 101)]))
919 /// # .into_keyed()
920 /// # .batch(&tick, nondet!(/** test */))
921 /// # .first();
922 /// requests.get_from(other_data)
923 /// # .entries().all_ticks()
924 /// # }, |mut stream| async move {
925 /// // { 1: (10, Some(100)), 2: (20, None) }
926 /// # let mut results = vec![];
927 /// # for _ in 0..2 {
928 /// # results.push(stream.next().await.unwrap());
929 /// # }
930 /// # results.sort();
931 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
932 /// # }));
933 /// ```
934 pub fn get_from<V2: Clone>(
935 self,
936 from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
937 ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
938 where
939 K: Clone,
940 V: Hash + Eq + Clone,
941 {
942 let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
943 let lookup_result = from.get_many_if_present(to_lookup.clone());
944 let missing_values =
945 to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
946 let result_stream = lookup_result
947 .entries()
948 .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
949 .into_keyed()
950 .chain(
951 missing_values
952 .entries()
953 .map(q!(|(v, k)| (k, (v, None))))
954 .into_keyed(),
955 );
956
957 KeyedSingleton::new(
958 result_stream.location.clone(),
959 HydroNode::Cast {
960 inner: Box::new(result_stream.ir_node.into_inner()),
961 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
962 K,
963 (V, Option<V2>),
964 Tick<L>,
965 Bounded,
966 >::collection_kind(
967 )),
968 },
969 )
970 }
971}
972
973impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
974where
975 L: Location<'a>,
976{
977 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
978 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
979 ///
980 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
981 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
982 /// argument that declares where the keyed singleton will be atomically processed. Batching a
983 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
984 /// batching into a different [`Tick`] will introduce asynchrony.
985 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
986 let out_location = Atomic { tick: tick.clone() };
987 KeyedSingleton::new(
988 out_location.clone(),
989 HydroNode::BeginAtomic {
990 inner: Box::new(self.ir_node.into_inner()),
991 metadata: out_location
992 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
993 },
994 )
995 }
996}
997
998impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
999where
1000 L: Location<'a> + NoTick,
1001{
1002 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1003 /// See [`KeyedSingleton::atomic`] for more details.
1004 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1005 KeyedSingleton::new(
1006 self.location.tick.l.clone(),
1007 HydroNode::EndAtomic {
1008 inner: Box::new(self.ir_node.into_inner()),
1009 metadata: self
1010 .location
1011 .tick
1012 .l
1013 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1014 },
1015 )
1016 }
1017}
1018
1019impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1020 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1021 /// tick `T` always has the entries of `self` at tick `T - 1`.
1022 ///
1023 /// At tick `0`, the output has no entries, since there is no previous tick.
1024 ///
1025 /// This operator enables stateful iterative processing with ticks, by sending data from one
1026 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1027 ///
1028 /// # Example
1029 /// ```rust
1030 /// # use hydro_lang::prelude::*;
1031 /// # use futures::StreamExt;
1032 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1033 /// let tick = process.tick();
1034 /// # // ticks are lazy by default, forces the second tick to run
1035 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1036 /// # let batch_first_tick = process
1037 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1038 /// # .batch(&tick, nondet!(/** test */))
1039 /// # .into_keyed();
1040 /// # let batch_second_tick = process
1041 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1042 /// # .batch(&tick, nondet!(/** test */))
1043 /// # .into_keyed()
1044 /// # .defer_tick(); // appears on the second tick
1045 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1046 /// # batch_first_tick.chain(batch_second_tick).first();
1047 /// input_batch.clone().filter_key_not_in(
1048 /// input_batch.defer_tick().keys() // keys present in the previous tick
1049 /// )
1050 /// # .entries().all_ticks()
1051 /// # }, |mut stream| async move {
1052 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1053 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1054 /// # assert_eq!(stream.next().await.unwrap(), w);
1055 /// # }
1056 /// # }));
1057 /// ```
1058 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1059 KeyedSingleton::new(
1060 self.location.clone(),
1061 HydroNode::DeferTick {
1062 input: Box::new(self.ir_node.into_inner()),
1063 metadata: self
1064 .location
1065 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1066 },
1067 )
1068 }
1069}
1070
1071impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1072where
1073 L: Location<'a>,
1074{
1075 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1076 /// point in time.
1077 ///
1078 /// # Non-Determinism
1079 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1080 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1081 pub fn snapshot(
1082 self,
1083 tick: &Tick<L>,
1084 _nondet: NonDet,
1085 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1086 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1087 KeyedSingleton::new(
1088 tick.clone(),
1089 HydroNode::Batch {
1090 inner: Box::new(self.ir_node.into_inner()),
1091 metadata: tick
1092 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1093 },
1094 )
1095 }
1096}
1097
1098impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1099where
1100 L: Location<'a> + NoTick,
1101{
1102 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1103 /// state of the keyed singleton being atomically processed.
1104 ///
1105 /// # Non-Determinism
1106 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1107 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1108 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1109 KeyedSingleton::new(
1110 self.location.clone().tick,
1111 HydroNode::Batch {
1112 inner: Box::new(self.ir_node.into_inner()),
1113 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1114 K,
1115 V,
1116 Tick<L>,
1117 Bounded,
1118 >::collection_kind(
1119 )),
1120 },
1121 )
1122 }
1123}
1124
1125impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1126where
1127 L: Location<'a>,
1128{
1129 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1130 ///
1131 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1132 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1133 /// is filtered out.
1134 ///
1135 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1136 /// not modify or take ownership of the values. If you need to modify the values while filtering
1137 /// use [`KeyedSingleton::filter_map`] instead.
1138 ///
1139 /// # Example
1140 /// ```rust
1141 /// # use hydro_lang::prelude::*;
1142 /// # use futures::StreamExt;
1143 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1145 /// # process
1146 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1147 /// # .into_keyed()
1148 /// # .first();
1149 /// keyed_singleton.filter(q!(|&v| v > 1))
1150 /// # .entries()
1151 /// # }, |mut stream| async move {
1152 /// // { 1: 2, 2: 4 }
1153 /// # let mut results = Vec::new();
1154 /// # for _ in 0..2 {
1155 /// # results.push(stream.next().await.unwrap());
1156 /// # }
1157 /// # results.sort();
1158 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1159 /// # }));
1160 /// ```
1161 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1162 where
1163 F: Fn(&V) -> bool + 'a,
1164 {
1165 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1166 let filter_f = q!({
1167 let orig = f;
1168 move |t: &(_, _)| orig(&t.1)
1169 })
1170 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1171 .into();
1172
1173 KeyedSingleton::new(
1174 self.location.clone(),
1175 HydroNode::Filter {
1176 f: filter_f,
1177 input: Box::new(self.ir_node.into_inner()),
1178 metadata: self
1179 .location
1180 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1181 },
1182 )
1183 }
1184
1185 /// An operator that both filters and maps values. It yields only the key-value pairs where
1186 /// the supplied closure `f` returns `Some(value)`.
1187 ///
1188 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1189 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1190 /// If it returns `None`, the key-value pair is filtered out.
1191 ///
1192 /// # Example
1193 /// ```rust
1194 /// # use hydro_lang::prelude::*;
1195 /// # use futures::StreamExt;
1196 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1197 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1198 /// # process
1199 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1200 /// # .into_keyed()
1201 /// # .first();
1202 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1203 /// # .entries()
1204 /// # }, |mut stream| async move {
1205 /// // { 1: 42, 3: 100 }
1206 /// # let mut results = Vec::new();
1207 /// # for _ in 0..2 {
1208 /// # results.push(stream.next().await.unwrap());
1209 /// # }
1210 /// # results.sort();
1211 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1212 /// # }));
1213 /// ```
1214 pub fn filter_map<F, U>(
1215 self,
1216 f: impl IntoQuotedMut<'a, F, L> + Copy,
1217 ) -> KeyedSingleton<K, U, L, B>
1218 where
1219 F: Fn(V) -> Option<U> + 'a,
1220 {
1221 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1222 let filter_map_f = q!({
1223 let orig = f;
1224 move |(k, v)| orig(v).map(|o| (k, o))
1225 })
1226 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1227 .into();
1228
1229 KeyedSingleton::new(
1230 self.location.clone(),
1231 HydroNode::FilterMap {
1232 f: filter_map_f,
1233 input: Box::new(self.ir_node.into_inner()),
1234 metadata: self
1235 .location
1236 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1237 },
1238 )
1239 }
1240
1241 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1242 /// arrived since the previous batch was released.
1243 ///
1244 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1245 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1246 ///
1247 /// # Non-Determinism
1248 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1249 /// has a non-deterministic set of key-value pairs.
1250 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1251 where
1252 L: NoTick,
1253 {
1254 self.atomic(tick).batch_atomic(nondet)
1255 }
1256}
1257
1258impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1259where
1260 L: Location<'a> + NoTick,
1261{
1262 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1263 /// atomically processed.
1264 ///
1265 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1266 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1267 ///
1268 /// # Non-Determinism
1269 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1270 /// has a non-deterministic set of key-value pairs.
1271 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1272 let _ = nondet;
1273 KeyedSingleton::new(
1274 self.location.clone().tick,
1275 HydroNode::Batch {
1276 inner: Box::new(self.ir_node.into_inner()),
1277 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1278 K,
1279 V,
1280 Tick<L>,
1281 Bounded,
1282 >::collection_kind(
1283 )),
1284 },
1285 )
1286 }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291 #[cfg(feature = "deploy")]
1292 use std::collections::HashMap;
1293
1294 #[cfg(feature = "deploy")]
1295 use futures::{SinkExt, StreamExt};
1296 #[cfg(feature = "deploy")]
1297 use hydro_deploy::Deployment;
1298 use stageleft::q;
1299
1300 use crate::compile::builder::FlowBuilder;
1301 use crate::location::Location;
1302 use crate::nondet::nondet;
1303
1304 #[cfg(feature = "deploy")]
1305 #[tokio::test]
1306 async fn key_count_bounded_value() {
1307 let mut deployment = Deployment::new();
1308
1309 let flow = FlowBuilder::new();
1310 let node = flow.process::<()>();
1311 let external = flow.external::<()>();
1312
1313 let (input_port, input) = node.source_external_bincode(&external);
1314 let out = input
1315 .into_keyed()
1316 .first()
1317 .key_count()
1318 .sample_eager(nondet!(/** test */))
1319 .send_bincode_external(&external);
1320
1321 let nodes = flow
1322 .with_process(&node, deployment.Localhost())
1323 .with_external(&external, deployment.Localhost())
1324 .deploy(&mut deployment);
1325
1326 deployment.deploy().await.unwrap();
1327
1328 let mut external_in = nodes.connect(input_port).await;
1329 let mut external_out = nodes.connect(out).await;
1330
1331 deployment.start().await.unwrap();
1332
1333 assert_eq!(external_out.next().await.unwrap(), 0);
1334
1335 external_in.send((1, 1)).await.unwrap();
1336 assert_eq!(external_out.next().await.unwrap(), 1);
1337
1338 external_in.send((2, 2)).await.unwrap();
1339 assert_eq!(external_out.next().await.unwrap(), 2);
1340 }
1341
1342 #[cfg(feature = "deploy")]
1343 #[tokio::test]
1344 async fn key_count_unbounded_value() {
1345 let mut deployment = Deployment::new();
1346
1347 let flow = FlowBuilder::new();
1348 let node = flow.process::<()>();
1349 let external = flow.external::<()>();
1350
1351 let (input_port, input) = node.source_external_bincode(&external);
1352 let out = input
1353 .into_keyed()
1354 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1355 .key_count()
1356 .sample_eager(nondet!(/** test */))
1357 .send_bincode_external(&external);
1358
1359 let nodes = flow
1360 .with_process(&node, deployment.Localhost())
1361 .with_external(&external, deployment.Localhost())
1362 .deploy(&mut deployment);
1363
1364 deployment.deploy().await.unwrap();
1365
1366 let mut external_in = nodes.connect(input_port).await;
1367 let mut external_out = nodes.connect(out).await;
1368
1369 deployment.start().await.unwrap();
1370
1371 assert_eq!(external_out.next().await.unwrap(), 0);
1372
1373 external_in.send((1, 1)).await.unwrap();
1374 assert_eq!(external_out.next().await.unwrap(), 1);
1375
1376 external_in.send((1, 2)).await.unwrap();
1377 assert_eq!(external_out.next().await.unwrap(), 1);
1378
1379 external_in.send((2, 2)).await.unwrap();
1380 assert_eq!(external_out.next().await.unwrap(), 2);
1381
1382 external_in.send((1, 1)).await.unwrap();
1383 assert_eq!(external_out.next().await.unwrap(), 2);
1384
1385 external_in.send((3, 1)).await.unwrap();
1386 assert_eq!(external_out.next().await.unwrap(), 3);
1387 }
1388
1389 #[cfg(feature = "deploy")]
1390 #[tokio::test]
1391 async fn into_singleton_bounded_value() {
1392 let mut deployment = Deployment::new();
1393
1394 let flow = FlowBuilder::new();
1395 let node = flow.process::<()>();
1396 let external = flow.external::<()>();
1397
1398 let (input_port, input) = node.source_external_bincode(&external);
1399 let out = input
1400 .into_keyed()
1401 .first()
1402 .into_singleton()
1403 .sample_eager(nondet!(/** test */))
1404 .send_bincode_external(&external);
1405
1406 let nodes = flow
1407 .with_process(&node, deployment.Localhost())
1408 .with_external(&external, deployment.Localhost())
1409 .deploy(&mut deployment);
1410
1411 deployment.deploy().await.unwrap();
1412
1413 let mut external_in = nodes.connect(input_port).await;
1414 let mut external_out = nodes.connect(out).await;
1415
1416 deployment.start().await.unwrap();
1417
1418 assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1419
1420 external_in.send((1, 1)).await.unwrap();
1421 assert_eq!(
1422 external_out.next().await.unwrap(),
1423 vec![(1, 1)].into_iter().collect()
1424 );
1425
1426 external_in.send((2, 2)).await.unwrap();
1427 assert_eq!(
1428 external_out.next().await.unwrap(),
1429 vec![(1, 1), (2, 2)].into_iter().collect()
1430 );
1431 }
1432
1433 #[cfg(feature = "deploy")]
1434 #[tokio::test]
1435 async fn into_singleton_unbounded_value() {
1436 let mut deployment = Deployment::new();
1437
1438 let flow = FlowBuilder::new();
1439 let node = flow.process::<()>();
1440 let external = flow.external::<()>();
1441
1442 let (input_port, input) = node.source_external_bincode(&external);
1443 let out = input
1444 .into_keyed()
1445 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1446 .into_singleton()
1447 .sample_eager(nondet!(/** test */))
1448 .send_bincode_external(&external);
1449
1450 let nodes = flow
1451 .with_process(&node, deployment.Localhost())
1452 .with_external(&external, deployment.Localhost())
1453 .deploy(&mut deployment);
1454
1455 deployment.deploy().await.unwrap();
1456
1457 let mut external_in = nodes.connect(input_port).await;
1458 let mut external_out = nodes.connect(out).await;
1459
1460 deployment.start().await.unwrap();
1461
1462 assert_eq!(external_out.next().await.unwrap(), HashMap::new());
1463
1464 external_in.send((1, 1)).await.unwrap();
1465 assert_eq!(
1466 external_out.next().await.unwrap(),
1467 vec![(1, 1)].into_iter().collect()
1468 );
1469
1470 external_in.send((1, 2)).await.unwrap();
1471 assert_eq!(
1472 external_out.next().await.unwrap(),
1473 vec![(1, 2)].into_iter().collect()
1474 );
1475
1476 external_in.send((2, 2)).await.unwrap();
1477 assert_eq!(
1478 external_out.next().await.unwrap(),
1479 vec![(1, 2), (2, 1)].into_iter().collect()
1480 );
1481
1482 external_in.send((1, 1)).await.unwrap();
1483 assert_eq!(
1484 external_out.next().await.unwrap(),
1485 vec![(1, 3), (2, 1)].into_iter().collect()
1486 );
1487
1488 external_in.send((3, 1)).await.unwrap();
1489 assert_eq!(
1490 external_out.next().await.unwrap(),
1491 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1492 );
1493 }
1494
1495 #[test]
1496 fn sim_unbounded_singleton_snapshot() {
1497 let flow = FlowBuilder::new();
1498 let node = flow.process::<()>();
1499 let external = flow.external::<()>();
1500
1501 let (input_port, input) = node.source_external_bincode(&external);
1502 let out = input
1503 .into_keyed()
1504 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1505 .snapshot(&node.tick(), nondet!(/** test */))
1506 .entries()
1507 .all_ticks()
1508 .send_bincode_external(&external);
1509
1510 let count = flow.sim().exhaustive(async |mut instance| {
1511 let input = instance.connect(&input_port);
1512 let output = instance.connect(&out);
1513
1514 instance.launch();
1515
1516 input.send((1, 123));
1517 input.send((1, 456));
1518 input.send((2, 123));
1519
1520 let all = output.collect_sorted::<Vec<_>>().await;
1521 assert_eq!(all.last().unwrap(), &(2, 1));
1522 });
1523
1524 assert_eq!(count, 8);
1525 }
1526}