hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20use crate::forward_handle::ForwardRef;
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
27use crate::manual_expr::ManualExpr;
28use crate::nondet::{NonDet, nondet};
29
30/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
31///
32/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
33/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
34/// indicates that entries may be added over time, but once an entry is added it will never be
35/// removed and its value will never change.
36pub trait KeyedSingletonBound {
37 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
38 type UnderlyingBound: Boundedness;
39 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
40 type ValueBound: Boundedness;
41
42 /// The type of the keyed singleton if the value for each key is immutable.
43 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
44
45 /// The type of the keyed singleton if the value for each key may change asynchronously.
46 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
47
48 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
49 fn bound_kind() -> KeyedSingletonBoundKind;
50}
51
52impl KeyedSingletonBound for Unbounded {
53 type UnderlyingBound = Unbounded;
54 type ValueBound = Unbounded;
55 type WithBoundedValue = BoundedValue;
56 type WithUnboundedValue = Unbounded;
57
58 fn bound_kind() -> KeyedSingletonBoundKind {
59 KeyedSingletonBoundKind::Unbounded
60 }
61}
62
63impl KeyedSingletonBound for Bounded {
64 type UnderlyingBound = Bounded;
65 type ValueBound = Bounded;
66 type WithBoundedValue = Bounded;
67 type WithUnboundedValue = UnreachableBound;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Bounded
71 }
72}
73
74/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
75/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
76/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
77pub struct BoundedValue;
78
79impl KeyedSingletonBound for BoundedValue {
80 type UnderlyingBound = Unbounded;
81 type ValueBound = Bounded;
82 type WithBoundedValue = BoundedValue;
83 type WithUnboundedValue = Unbounded;
84
85 fn bound_kind() -> KeyedSingletonBoundKind {
86 KeyedSingletonBoundKind::BoundedValue
87 }
88}
89
90#[doc(hidden)]
91pub struct UnreachableBound;
92
93impl KeyedSingletonBound for UnreachableBound {
94 type UnderlyingBound = Bounded;
95 type ValueBound = Unbounded;
96
97 type WithBoundedValue = Bounded;
98 type WithUnboundedValue = UnreachableBound;
99
100 fn bound_kind() -> KeyedSingletonBoundKind {
101 unreachable!("UnreachableBound cannot be instantiated")
102 }
103}
104
105/// Mapping from keys of type `K` to values of type `V`.
106///
107/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
108/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
109/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
110/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
111/// keys cannot be removed and the value for each key is immutable.
112///
113/// Type Parameters:
114/// - `K`: the type of the key for each entry
115/// - `V`: the type of the value for each entry
116/// - `Loc`: the [`Location`] where the keyed singleton is materialized
117/// - `Bound`: tracks whether the entries are:
118/// - [`Bounded`] (local and finite)
119/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
120/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
121pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
122 pub(crate) location: Loc,
123 pub(crate) ir_node: RefCell<HydroNode>,
124
125 _phantom: PhantomData<(K, V, Loc, Bound)>,
126}
127
128impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
129 for KeyedSingleton<K, V, Loc, Bound>
130{
131 fn clone(&self) -> Self {
132 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
133 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
134 *self.ir_node.borrow_mut() = HydroNode::Tee {
135 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
136 metadata: self.location.new_node_metadata(Self::collection_kind()),
137 };
138 }
139
140 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
141 KeyedSingleton {
142 location: self.location.clone(),
143 ir_node: HydroNode::Tee {
144 inner: TeeNode(inner.0.clone()),
145 metadata: metadata.clone(),
146 }
147 .into(),
148 _phantom: PhantomData,
149 }
150 } else {
151 unreachable!()
152 }
153 }
154}
155
156impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
157 for KeyedSingleton<K, V, L, B>
158where
159 L: Location<'a> + NoTick,
160{
161 type Location = L;
162
163 fn create_source(ident: syn::Ident, location: L) -> Self {
164 KeyedSingleton {
165 location: location.clone(),
166 ir_node: RefCell::new(HydroNode::CycleSource {
167 ident,
168 metadata: location.new_node_metadata(Self::collection_kind()),
169 }),
170 _phantom: PhantomData,
171 }
172 }
173}
174
175impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
176 for KeyedSingleton<K, V, L, B>
177where
178 L: Location<'a> + NoTick,
179{
180 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
181 assert_eq!(
182 Location::id(&self.location),
183 expected_location,
184 "locations do not match"
185 );
186 self.location
187 .flow_state()
188 .borrow_mut()
189 .push_root(HydroRoot::CycleSink {
190 ident,
191 input: Box::new(self.ir_node.into_inner()),
192 op_metadata: HydroIrOpMetadata::new(),
193 });
194 }
195}
196
197impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
198 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
199 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
200 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
201
202 KeyedSingleton {
203 location,
204 ir_node: RefCell::new(ir_node),
205 _phantom: PhantomData,
206 }
207 }
208
209 /// Returns the [`Location`] where this keyed singleton is being materialized.
210 pub fn location(&self) -> &L {
211 &self.location
212 }
213}
214
215#[cfg(stageleft_runtime)]
216fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
217 me: KeyedSingleton<K, V, L, Bounded>,
218) -> Singleton<usize, L, Bounded> {
219 me.entries().count()
220}
221
222#[cfg(stageleft_runtime)]
223fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
224 me: KeyedSingleton<K, V, L, Bounded>,
225) -> Singleton<HashMap<K, V>, L, Bounded>
226where
227 K: Eq + Hash,
228{
229 me.entries()
230 .assume_ordering(nondet!(
231 /// Because this is a keyed singleton, there is only one value per key.
232 ))
233 .fold(
234 q!(|| HashMap::new()),
235 q!(|map, (k, v)| {
236 map.insert(k, v);
237 }),
238 )
239}
240
241impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
242 pub(crate) fn collection_kind() -> CollectionKind {
243 CollectionKind::KeyedSingleton {
244 bound: B::bound_kind(),
245 key_type: stageleft::quote_type::<K>().into(),
246 value_type: stageleft::quote_type::<V>().into(),
247 }
248 }
249
250 /// Transforms each value by invoking `f` on each element, with keys staying the same
251 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
252 ///
253 /// If you do not want to modify the stream and instead only want to view
254 /// each item use [`KeyedSingleton::inspect`] instead.
255 ///
256 /// # Example
257 /// ```rust
258 /// # #[cfg(feature = "deploy")] {
259 /// # use hydro_lang::prelude::*;
260 /// # use futures::StreamExt;
261 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
262 /// let keyed_singleton = // { 1: 2, 2: 4 }
263 /// # process
264 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
265 /// # .into_keyed()
266 /// # .first();
267 /// keyed_singleton.map(q!(|v| v + 1))
268 /// # .entries()
269 /// # }, |mut stream| async move {
270 /// // { 1: 3, 2: 5 }
271 /// # let mut results = Vec::new();
272 /// # for _ in 0..2 {
273 /// # results.push(stream.next().await.unwrap());
274 /// # }
275 /// # results.sort();
276 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
277 /// # }));
278 /// # }
279 /// ```
280 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
281 where
282 F: Fn(V) -> U + 'a,
283 {
284 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
285 let map_f = q!({
286 let orig = f;
287 move |(k, v)| (k, orig(v))
288 })
289 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
290 .into();
291
292 KeyedSingleton::new(
293 self.location.clone(),
294 HydroNode::Map {
295 f: map_f,
296 input: Box::new(self.ir_node.into_inner()),
297 metadata: self
298 .location
299 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
300 },
301 )
302 }
303
304 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
305 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
306 ///
307 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
308 /// the new value `U`. The key remains unchanged in the output.
309 ///
310 /// # Example
311 /// ```rust
312 /// # #[cfg(feature = "deploy")] {
313 /// # use hydro_lang::prelude::*;
314 /// # use futures::StreamExt;
315 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
316 /// let keyed_singleton = // { 1: 2, 2: 4 }
317 /// # process
318 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
319 /// # .into_keyed()
320 /// # .first();
321 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
322 /// # .entries()
323 /// # }, |mut stream| async move {
324 /// // { 1: 3, 2: 6 }
325 /// # let mut results = Vec::new();
326 /// # for _ in 0..2 {
327 /// # results.push(stream.next().await.unwrap());
328 /// # }
329 /// # results.sort();
330 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
331 /// # }));
332 /// # }
333 /// ```
334 pub fn map_with_key<U, F>(
335 self,
336 f: impl IntoQuotedMut<'a, F, L> + Copy,
337 ) -> KeyedSingleton<K, U, L, B>
338 where
339 F: Fn((K, V)) -> U + 'a,
340 K: Clone,
341 {
342 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
343 let map_f = q!({
344 let orig = f;
345 move |(k, v)| {
346 let out = orig((Clone::clone(&k), v));
347 (k, out)
348 }
349 })
350 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
351 .into();
352
353 KeyedSingleton::new(
354 self.location.clone(),
355 HydroNode::Map {
356 f: map_f,
357 input: Box::new(self.ir_node.into_inner()),
358 metadata: self
359 .location
360 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
361 },
362 )
363 }
364
365 /// Gets the number of keys in the keyed singleton.
366 ///
367 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
368 /// since keys may be added / removed over time. When the set of keys changes, the count will
369 /// be asynchronously updated.
370 ///
371 /// # Example
372 /// ```rust
373 /// # #[cfg(feature = "deploy")] {
374 /// # use hydro_lang::prelude::*;
375 /// # use futures::StreamExt;
376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
377 /// # let tick = process.tick();
378 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
379 /// # process
380 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
381 /// # .into_keyed()
382 /// # .batch(&tick, nondet!(/** test */))
383 /// # .first();
384 /// keyed_singleton.key_count()
385 /// # .all_ticks()
386 /// # }, |mut stream| async move {
387 /// // 3
388 /// # assert_eq!(stream.next().await.unwrap(), 3);
389 /// # }));
390 /// # }
391 /// ```
392 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
393 if B::ValueBound::BOUNDED {
394 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
395 location: self.location,
396 ir_node: self.ir_node,
397 _phantom: PhantomData,
398 };
399
400 me.entries().count()
401 } else if L::is_top_level()
402 && let Some(tick) = self.location.try_tick()
403 {
404 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
405 location: self.location,
406 ir_node: self.ir_node,
407 _phantom: PhantomData,
408 };
409
410 let out =
411 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
412 .latest();
413 Singleton::new(out.location, out.ir_node.into_inner())
414 } else {
415 panic!("Unbounded KeyedSingleton inside a tick");
416 }
417 }
418
419 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
420 ///
421 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
422 /// asynchronously as well.
423 ///
424 /// # Example
425 /// ```rust
426 /// # #[cfg(feature = "deploy")] {
427 /// # use hydro_lang::prelude::*;
428 /// # use futures::StreamExt;
429 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
430 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
431 /// # process
432 /// # .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
433 /// # .into_keyed()
434 /// # .batch(&process.tick(), nondet!(/** test */))
435 /// # .first();
436 /// keyed_singleton.into_singleton()
437 /// # .all_ticks()
438 /// # }, |mut stream| async move {
439 /// // { 1: "a", 2: "b", 3: "c" }
440 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
441 /// # }));
442 /// # }
443 /// ```
444 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
445 where
446 K: Eq + Hash,
447 {
448 if B::ValueBound::BOUNDED {
449 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
450 location: self.location,
451 ir_node: self.ir_node,
452 _phantom: PhantomData,
453 };
454
455 me.entries()
456 .assume_ordering(nondet!(
457 /// Because this is a keyed singleton, there is only one value per key.
458 ))
459 .fold(
460 q!(|| HashMap::new()),
461 q!(|map, (k, v)| {
462 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
463 map.insert(k, v);
464 }),
465 )
466 } else if L::is_top_level()
467 && let Some(tick) = self.location.try_tick()
468 {
469 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
470 location: self.location,
471 ir_node: self.ir_node,
472 _phantom: PhantomData,
473 };
474
475 let out = into_singleton_inside_tick(
476 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
477 )
478 .latest();
479 Singleton::new(out.location, out.ir_node.into_inner())
480 } else {
481 panic!("Unbounded KeyedSingleton inside a tick");
482 }
483 }
484
485 /// An operator which allows you to "name" a `HydroNode`.
486 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
487 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
488 {
489 let mut node = self.ir_node.borrow_mut();
490 let metadata = node.metadata_mut();
491 metadata.tag = Some(name.to_string());
492 }
493 self
494 }
495}
496
497impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
498 KeyedSingleton<K, V, L, B>
499{
500 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
501 ///
502 /// The value for each key must be bounded, otherwise the resulting stream elements would be
503 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
504 /// into the output.
505 ///
506 /// # Example
507 /// ```rust
508 /// # #[cfg(feature = "deploy")] {
509 /// # use hydro_lang::prelude::*;
510 /// # use futures::StreamExt;
511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512 /// let keyed_singleton = // { 1: 2, 2: 4 }
513 /// # process
514 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
515 /// # .into_keyed()
516 /// # .first();
517 /// keyed_singleton.entries()
518 /// # }, |mut stream| async move {
519 /// // (1, 2), (2, 4) in any order
520 /// # let mut results = Vec::new();
521 /// # for _ in 0..2 {
522 /// # results.push(stream.next().await.unwrap());
523 /// # }
524 /// # results.sort();
525 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
526 /// # }));
527 /// # }
528 /// ```
529 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
530 self.into_keyed_stream().entries()
531 }
532
533 /// Flattens the keyed singleton into an unordered stream of just the values.
534 ///
535 /// The value for each key must be bounded, otherwise the resulting stream elements would be
536 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
537 /// into the output.
538 ///
539 /// # Example
540 /// ```rust
541 /// # #[cfg(feature = "deploy")] {
542 /// # use hydro_lang::prelude::*;
543 /// # use futures::StreamExt;
544 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
545 /// let keyed_singleton = // { 1: 2, 2: 4 }
546 /// # process
547 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
548 /// # .into_keyed()
549 /// # .first();
550 /// keyed_singleton.values()
551 /// # }, |mut stream| async move {
552 /// // 2, 4 in any order
553 /// # let mut results = Vec::new();
554 /// # for _ in 0..2 {
555 /// # results.push(stream.next().await.unwrap());
556 /// # }
557 /// # results.sort();
558 /// # assert_eq!(results, vec![2, 4]);
559 /// # }));
560 /// # }
561 /// ```
562 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
563 let map_f = q!(|(_, v)| v)
564 .splice_fn1_ctx::<(K, V), V>(&self.location)
565 .into();
566
567 Stream::new(
568 self.location.clone(),
569 HydroNode::Map {
570 f: map_f,
571 input: Box::new(self.ir_node.into_inner()),
572 metadata: self.location.new_node_metadata(Stream::<
573 V,
574 L,
575 B::UnderlyingBound,
576 NoOrder,
577 ExactlyOnce,
578 >::collection_kind()),
579 },
580 )
581 }
582
583 /// Flattens the keyed singleton into an unordered stream of just the keys.
584 ///
585 /// The value for each key must be bounded, otherwise the removal of keys would result in
586 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
587 /// into the output.
588 ///
589 /// # Example
590 /// ```rust
591 /// # #[cfg(feature = "deploy")] {
592 /// # use hydro_lang::prelude::*;
593 /// # use futures::StreamExt;
594 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
595 /// let keyed_singleton = // { 1: 2, 2: 4 }
596 /// # process
597 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
598 /// # .into_keyed()
599 /// # .first();
600 /// keyed_singleton.keys()
601 /// # }, |mut stream| async move {
602 /// // 1, 2 in any order
603 /// # let mut results = Vec::new();
604 /// # for _ in 0..2 {
605 /// # results.push(stream.next().await.unwrap());
606 /// # }
607 /// # results.sort();
608 /// # assert_eq!(results, vec![1, 2]);
609 /// # }));
610 /// # }
611 /// ```
612 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
613 self.entries().map(q!(|(k, _)| k))
614 }
615
616 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
617 /// entries whose keys are not in the provided stream.
618 ///
619 /// # Example
620 /// ```rust
621 /// # #[cfg(feature = "deploy")] {
622 /// # use hydro_lang::prelude::*;
623 /// # use futures::StreamExt;
624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625 /// let tick = process.tick();
626 /// let keyed_singleton = // { 1: 2, 2: 4 }
627 /// # process
628 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
629 /// # .into_keyed()
630 /// # .first()
631 /// # .batch(&tick, nondet!(/** test */));
632 /// let keys_to_remove = process
633 /// .source_iter(q!(vec![1]))
634 /// .batch(&tick, nondet!(/** test */));
635 /// keyed_singleton.filter_key_not_in(keys_to_remove)
636 /// # .entries().all_ticks()
637 /// # }, |mut stream| async move {
638 /// // { 2: 4 }
639 /// # for w in vec![(2, 4)] {
640 /// # assert_eq!(stream.next().await.unwrap(), w);
641 /// # }
642 /// # }));
643 /// # }
644 /// ```
645 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
646 self,
647 other: Stream<K, L, Bounded, O2, R2>,
648 ) -> Self
649 where
650 K: Hash + Eq,
651 {
652 check_matching_location(&self.location, &other.location);
653
654 KeyedSingleton::new(
655 self.location.clone(),
656 HydroNode::AntiJoin {
657 pos: Box::new(self.ir_node.into_inner()),
658 neg: Box::new(other.ir_node.into_inner()),
659 metadata: self.location.new_node_metadata(Self::collection_kind()),
660 },
661 )
662 }
663
664 /// An operator which allows you to "inspect" each value of a keyed singleton without
665 /// modifying it. The closure `f` is called on a reference to each value. This is
666 /// mainly useful for debugging, and should not be used to generate side-effects.
667 ///
668 /// # Example
669 /// ```rust
670 /// # #[cfg(feature = "deploy")] {
671 /// # use hydro_lang::prelude::*;
672 /// # use futures::StreamExt;
673 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674 /// let keyed_singleton = // { 1: 2, 2: 4 }
675 /// # process
676 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
677 /// # .into_keyed()
678 /// # .first();
679 /// keyed_singleton
680 /// .inspect(q!(|v| println!("{}", v)))
681 /// # .entries()
682 /// # }, |mut stream| async move {
683 /// // { 1: 2, 2: 4 }
684 /// # for w in vec![(1, 2), (2, 4)] {
685 /// # assert_eq!(stream.next().await.unwrap(), w);
686 /// # }
687 /// # }));
688 /// # }
689 /// ```
690 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
691 where
692 F: Fn(&V) + 'a,
693 {
694 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
695 let inspect_f = q!({
696 let orig = f;
697 move |t: &(_, _)| orig(&t.1)
698 })
699 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
700 .into();
701
702 KeyedSingleton::new(
703 self.location.clone(),
704 HydroNode::Inspect {
705 f: inspect_f,
706 input: Box::new(self.ir_node.into_inner()),
707 metadata: self.location.new_node_metadata(Self::collection_kind()),
708 },
709 )
710 }
711
712 /// An operator which allows you to "inspect" each entry of a keyed singleton without
713 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
714 /// mainly useful for debugging, and should not be used to generate side-effects.
715 ///
716 /// # Example
717 /// ```rust
718 /// # #[cfg(feature = "deploy")] {
719 /// # use hydro_lang::prelude::*;
720 /// # use futures::StreamExt;
721 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722 /// let keyed_singleton = // { 1: 2, 2: 4 }
723 /// # process
724 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
725 /// # .into_keyed()
726 /// # .first();
727 /// keyed_singleton
728 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
729 /// # .entries()
730 /// # }, |mut stream| async move {
731 /// // { 1: 2, 2: 4 }
732 /// # for w in vec![(1, 2), (2, 4)] {
733 /// # assert_eq!(stream.next().await.unwrap(), w);
734 /// # }
735 /// # }));
736 /// # }
737 /// ```
738 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
739 where
740 F: Fn(&(K, V)) + 'a,
741 {
742 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
743
744 KeyedSingleton::new(
745 self.location.clone(),
746 HydroNode::Inspect {
747 f: inspect_f,
748 input: Box::new(self.ir_node.into_inner()),
749 metadata: self.location.new_node_metadata(Self::collection_kind()),
750 },
751 )
752 }
753
754 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
755 ///
756 /// Because this method requires values to be bounded, the output [`Optional`] will only be
757 /// asynchronously updated if a new key is added that is higher than the previous max key.
758 ///
759 /// # Example
760 /// ```rust
761 /// # #[cfg(feature = "deploy")] {
762 /// # use hydro_lang::prelude::*;
763 /// # use futures::StreamExt;
764 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
765 /// let tick = process.tick();
766 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
767 /// # process
768 /// # .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
769 /// # .into_keyed()
770 /// # .first();
771 /// keyed_singleton.get_max_key()
772 /// # .sample_eager(nondet!(/** test */))
773 /// # }, |mut stream| async move {
774 /// // (2, 456)
775 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
776 /// # }));
777 /// # }
778 /// ```
779 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
780 where
781 K: Ord,
782 {
783 self.entries()
784 .assume_ordering(nondet!(
785 /// There is only one element associated with each key, and the keys are totallly
786 /// ordered so we will produce a deterministic value. We can't call
787 /// `reduce_commutative_idempotent` because the closure technically isn't commutative
788 /// in the case where both passed entries have the same key but different values.
789 ///
790 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
791 /// the two inputs do not have the same key.
792 ))
793 .reduce_idempotent(q!({
794 move |curr, new| {
795 if new.0 > curr.0 {
796 *curr = new;
797 }
798 }
799 }))
800 }
801
802 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
803 /// element, the value.
804 ///
805 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
806 ///
807 /// # Example
808 /// ```rust
809 /// # #[cfg(feature = "deploy")] {
810 /// # use hydro_lang::prelude::*;
811 /// # use futures::StreamExt;
812 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813 /// let keyed_singleton = // { 1: 2, 2: 4 }
814 /// # process
815 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
816 /// # .into_keyed()
817 /// # .first();
818 /// keyed_singleton
819 /// .clone()
820 /// .into_keyed_stream()
821 /// .interleave(
822 /// keyed_singleton.into_keyed_stream()
823 /// )
824 /// # .entries()
825 /// # }, |mut stream| async move {
826 /// /// // { 1: [2, 2], 2: [4, 4] }
827 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
828 /// # assert_eq!(stream.next().await.unwrap(), w);
829 /// # }
830 /// # }));
831 /// # }
832 /// ```
833 pub fn into_keyed_stream(
834 self,
835 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
836 KeyedStream::new(
837 self.location.clone(),
838 HydroNode::Cast {
839 inner: Box::new(self.ir_node.into_inner()),
840 metadata: self.location.new_node_metadata(KeyedStream::<
841 K,
842 V,
843 L,
844 B::UnderlyingBound,
845 TotalOrder,
846 ExactlyOnce,
847 >::collection_kind()),
848 },
849 )
850 }
851}
852
853impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
854 /// Gets the value associated with a specific key from the keyed singleton.
855 ///
856 /// # Example
857 /// ```rust
858 /// # #[cfg(feature = "deploy")] {
859 /// # use hydro_lang::prelude::*;
860 /// # use futures::StreamExt;
861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862 /// let tick = process.tick();
863 /// let keyed_data = process
864 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
865 /// .into_keyed()
866 /// .batch(&tick, nondet!(/** test */))
867 /// .first();
868 /// let key = tick.singleton(q!(1));
869 /// keyed_data.get(key).all_ticks()
870 /// # }, |mut stream| async move {
871 /// // 2
872 /// # assert_eq!(stream.next().await.unwrap(), 2);
873 /// # }));
874 /// # }
875 /// ```
876 pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
877 self.entries()
878 .join(key.into_stream().map(q!(|k| (k, ()))))
879 .map(q!(|(_, (v, _))| v))
880 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
881 .first()
882 }
883
884 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
885 /// is some additional metadata, emits a keyed stream of lookup results where the key
886 /// is the same as before, but the value is a tuple of the lookup result and the metadata
887 /// of the request. If the key is not found, no output will be produced.
888 ///
889 /// # Example
890 /// ```rust
891 /// # #[cfg(feature = "deploy")] {
892 /// # use hydro_lang::prelude::*;
893 /// # use futures::StreamExt;
894 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
895 /// let tick = process.tick();
896 /// let keyed_data = process
897 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
898 /// .into_keyed()
899 /// .batch(&tick, nondet!(/** test */))
900 /// .first();
901 /// let other_data = process
902 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
903 /// .into_keyed()
904 /// .batch(&tick, nondet!(/** test */));
905 /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
906 /// # }, |mut stream| async move {
907 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
908 /// # let mut results = vec![];
909 /// # for _ in 0..3 {
910 /// # results.push(stream.next().await.unwrap());
911 /// # }
912 /// # results.sort();
913 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
914 /// # }));
915 /// # }
916 /// ```
917 pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
918 self,
919 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
920 ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
921 self.entries()
922 .weaker_retries::<R2>()
923 .join(requests.entries())
924 .into_keyed()
925 }
926
927 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
928 /// is some additional metadata, emits a keyed stream of lookup results where the key
929 /// is the same as before, but the value is a tuple of the lookup result (as `Option<V>`)
930 /// and the metadata of the request. Unlike `get_many_if_present`, this returns all request
931 /// keys, with `None` for keys that are not found.
932 ///
933 /// # Example
934 /// ```rust
935 /// # #[cfg(feature = "deploy")] {
936 /// # use hydro_lang::prelude::*;
937 /// # use futures::StreamExt;
938 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
939 /// let tick = process.tick();
940 /// let keyed_data = process
941 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
942 /// .into_keyed()
943 /// .batch(&tick, nondet!(/** test */))
944 /// .first();
945 /// let other_data = process
946 /// .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
947 /// .into_keyed()
948 /// .batch(&tick, nondet!(/** test */));
949 /// keyed_data.get_many(other_data).entries().all_ticks()
950 /// # }, |mut stream| async move {
951 /// // { 1: [(Some(10), 100)], 2: [(Some(20), 200)], 3: [(None, 300)] } in any order
952 /// # let mut results = vec![];
953 /// # for _ in 0..3 {
954 /// # results.push(stream.next().await.unwrap());
955 /// # }
956 /// # results.sort();
957 /// # assert_eq!(results, vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]);
958 /// # }));
959 /// # }
960 /// ```
961 #[expect(clippy::type_complexity, reason = "stream types")]
962 pub fn get_many<O2: Ordering, R2: Retries, V2>(
963 self,
964 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
965 ) -> KeyedStream<K, (Option<V>, V2), Tick<L>, Bounded, NoOrder, R2>
966 where
967 K: Clone,
968 V: Clone,
969 V2: Clone,
970 {
971 let lookup_result = self.clone().get_many_if_present(requests.clone());
972 let missing_keys = requests.filter_key_not_in(self.keys()).weakest_ordering();
973
974 lookup_result
975 .map(q!(|(v, v2)| (Some(v), v2)))
976 .chain(missing_keys.map(q!(|v2| (None, v2))))
977 }
978
979 /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
980 /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
981 /// containing the value from `self` and an option of the value from `from`. If the key is not
982 /// present in `from`, the option will be [`None`].
983 ///
984 /// # Example
985 /// ```rust
986 /// # #[cfg(feature = "deploy")] {
987 /// # use hydro_lang::prelude::*;
988 /// # use futures::StreamExt;
989 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
990 /// # let tick = process.tick();
991 /// let requests = // { 1: 10, 2: 20 }
992 /// # process
993 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
994 /// # .into_keyed()
995 /// # .batch(&tick, nondet!(/** test */))
996 /// # .first();
997 /// let other_data = // { 10: 100, 11: 101 }
998 /// # process
999 /// # .source_iter(q!(vec![(10, 100), (11, 101)]))
1000 /// # .into_keyed()
1001 /// # .batch(&tick, nondet!(/** test */))
1002 /// # .first();
1003 /// requests.get_from(other_data)
1004 /// # .entries().all_ticks()
1005 /// # }, |mut stream| async move {
1006 /// // { 1: (10, Some(100)), 2: (20, None) }
1007 /// # let mut results = vec![];
1008 /// # for _ in 0..2 {
1009 /// # results.push(stream.next().await.unwrap());
1010 /// # }
1011 /// # results.sort();
1012 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1013 /// # }));
1014 /// # }
1015 /// ```
1016 pub fn get_from<V2: Clone>(
1017 self,
1018 from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
1019 ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
1020 where
1021 K: Clone,
1022 V: Hash + Eq + Clone,
1023 {
1024 let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
1025 let lookup_result = from.get_many_if_present(to_lookup.clone());
1026 let missing_values =
1027 to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
1028 let result_stream = lookup_result
1029 .entries()
1030 .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
1031 .into_keyed()
1032 .chain(
1033 missing_values
1034 .entries()
1035 .map(q!(|(v, k)| (k, (v, None))))
1036 .into_keyed(),
1037 );
1038
1039 KeyedSingleton::new(
1040 result_stream.location.clone(),
1041 HydroNode::Cast {
1042 inner: Box::new(result_stream.ir_node.into_inner()),
1043 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1044 K,
1045 (V, Option<V2>),
1046 Tick<L>,
1047 Bounded,
1048 >::collection_kind(
1049 )),
1050 },
1051 )
1052 }
1053}
1054
1055impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1056where
1057 L: Location<'a>,
1058{
1059 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1060 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1061 ///
1062 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1063 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1064 /// argument that declares where the keyed singleton will be atomically processed. Batching a
1065 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1066 /// batching into a different [`Tick`] will introduce asynchrony.
1067 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1068 let out_location = Atomic { tick: tick.clone() };
1069 KeyedSingleton::new(
1070 out_location.clone(),
1071 HydroNode::BeginAtomic {
1072 inner: Box::new(self.ir_node.into_inner()),
1073 metadata: out_location
1074 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1075 },
1076 )
1077 }
1078}
1079
1080impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1081where
1082 L: Location<'a> + NoTick,
1083{
1084 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1085 /// See [`KeyedSingleton::atomic`] for more details.
1086 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1087 KeyedSingleton::new(
1088 self.location.tick.l.clone(),
1089 HydroNode::EndAtomic {
1090 inner: Box::new(self.ir_node.into_inner()),
1091 metadata: self
1092 .location
1093 .tick
1094 .l
1095 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1096 },
1097 )
1098 }
1099}
1100
1101impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1102 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1103 /// tick `T` always has the entries of `self` at tick `T - 1`.
1104 ///
1105 /// At tick `0`, the output has no entries, since there is no previous tick.
1106 ///
1107 /// This operator enables stateful iterative processing with ticks, by sending data from one
1108 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1109 ///
1110 /// # Example
1111 /// ```rust
1112 /// # #[cfg(feature = "deploy")] {
1113 /// # use hydro_lang::prelude::*;
1114 /// # use futures::StreamExt;
1115 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1116 /// let tick = process.tick();
1117 /// # // ticks are lazy by default, forces the second tick to run
1118 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1119 /// # let batch_first_tick = process
1120 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1121 /// # .batch(&tick, nondet!(/** test */))
1122 /// # .into_keyed();
1123 /// # let batch_second_tick = process
1124 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1125 /// # .batch(&tick, nondet!(/** test */))
1126 /// # .into_keyed()
1127 /// # .defer_tick(); // appears on the second tick
1128 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1129 /// # batch_first_tick.chain(batch_second_tick).first();
1130 /// input_batch.clone().filter_key_not_in(
1131 /// input_batch.defer_tick().keys() // keys present in the previous tick
1132 /// )
1133 /// # .entries().all_ticks()
1134 /// # }, |mut stream| async move {
1135 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1136 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1137 /// # assert_eq!(stream.next().await.unwrap(), w);
1138 /// # }
1139 /// # }));
1140 /// # }
1141 /// ```
1142 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1143 KeyedSingleton::new(
1144 self.location.clone(),
1145 HydroNode::DeferTick {
1146 input: Box::new(self.ir_node.into_inner()),
1147 metadata: self
1148 .location
1149 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1150 },
1151 )
1152 }
1153}
1154
1155impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1156where
1157 L: Location<'a>,
1158{
1159 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1160 /// point in time.
1161 ///
1162 /// # Non-Determinism
1163 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1164 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1165 pub fn snapshot(
1166 self,
1167 tick: &Tick<L>,
1168 _nondet: NonDet,
1169 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1170 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1171 KeyedSingleton::new(
1172 tick.clone(),
1173 HydroNode::Batch {
1174 inner: Box::new(self.ir_node.into_inner()),
1175 metadata: tick
1176 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1177 },
1178 )
1179 }
1180}
1181
1182impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1183where
1184 L: Location<'a> + NoTick,
1185{
1186 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1187 /// state of the keyed singleton being atomically processed.
1188 ///
1189 /// # Non-Determinism
1190 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1191 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1192 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1193 KeyedSingleton::new(
1194 self.location.clone().tick,
1195 HydroNode::Batch {
1196 inner: Box::new(self.ir_node.into_inner()),
1197 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1198 K,
1199 V,
1200 Tick<L>,
1201 Bounded,
1202 >::collection_kind(
1203 )),
1204 },
1205 )
1206 }
1207}
1208
1209impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1210where
1211 L: Location<'a>,
1212{
1213 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1214 ///
1215 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1216 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1217 /// is filtered out.
1218 ///
1219 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1220 /// not modify or take ownership of the values. If you need to modify the values while filtering
1221 /// use [`KeyedSingleton::filter_map`] instead.
1222 ///
1223 /// # Example
1224 /// ```rust
1225 /// # #[cfg(feature = "deploy")] {
1226 /// # use hydro_lang::prelude::*;
1227 /// # use futures::StreamExt;
1228 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1230 /// # process
1231 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1232 /// # .into_keyed()
1233 /// # .first();
1234 /// keyed_singleton.filter(q!(|&v| v > 1))
1235 /// # .entries()
1236 /// # }, |mut stream| async move {
1237 /// // { 1: 2, 2: 4 }
1238 /// # let mut results = Vec::new();
1239 /// # for _ in 0..2 {
1240 /// # results.push(stream.next().await.unwrap());
1241 /// # }
1242 /// # results.sort();
1243 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1244 /// # }));
1245 /// # }
1246 /// ```
1247 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1248 where
1249 F: Fn(&V) -> bool + 'a,
1250 {
1251 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1252 let filter_f = q!({
1253 let orig = f;
1254 move |t: &(_, _)| orig(&t.1)
1255 })
1256 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1257 .into();
1258
1259 KeyedSingleton::new(
1260 self.location.clone(),
1261 HydroNode::Filter {
1262 f: filter_f,
1263 input: Box::new(self.ir_node.into_inner()),
1264 metadata: self
1265 .location
1266 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1267 },
1268 )
1269 }
1270
1271 /// An operator that both filters and maps values. It yields only the key-value pairs where
1272 /// the supplied closure `f` returns `Some(value)`.
1273 ///
1274 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1275 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1276 /// If it returns `None`, the key-value pair is filtered out.
1277 ///
1278 /// # Example
1279 /// ```rust
1280 /// # #[cfg(feature = "deploy")] {
1281 /// # use hydro_lang::prelude::*;
1282 /// # use futures::StreamExt;
1283 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1284 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1285 /// # process
1286 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1287 /// # .into_keyed()
1288 /// # .first();
1289 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1290 /// # .entries()
1291 /// # }, |mut stream| async move {
1292 /// // { 1: 42, 3: 100 }
1293 /// # let mut results = Vec::new();
1294 /// # for _ in 0..2 {
1295 /// # results.push(stream.next().await.unwrap());
1296 /// # }
1297 /// # results.sort();
1298 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1299 /// # }));
1300 /// # }
1301 /// ```
1302 pub fn filter_map<F, U>(
1303 self,
1304 f: impl IntoQuotedMut<'a, F, L> + Copy,
1305 ) -> KeyedSingleton<K, U, L, B>
1306 where
1307 F: Fn(V) -> Option<U> + 'a,
1308 {
1309 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1310 let filter_map_f = q!({
1311 let orig = f;
1312 move |(k, v)| orig(v).map(|o| (k, o))
1313 })
1314 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1315 .into();
1316
1317 KeyedSingleton::new(
1318 self.location.clone(),
1319 HydroNode::FilterMap {
1320 f: filter_map_f,
1321 input: Box::new(self.ir_node.into_inner()),
1322 metadata: self
1323 .location
1324 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1325 },
1326 )
1327 }
1328
1329 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1330 /// arrived since the previous batch was released.
1331 ///
1332 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1333 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1334 ///
1335 /// # Non-Determinism
1336 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1337 /// has a non-deterministic set of key-value pairs.
1338 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1339 where
1340 L: NoTick,
1341 {
1342 self.atomic(tick).batch_atomic(nondet)
1343 }
1344}
1345
1346impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1347where
1348 L: Location<'a> + NoTick,
1349{
1350 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1351 /// atomically processed.
1352 ///
1353 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1354 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1355 ///
1356 /// # Non-Determinism
1357 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1358 /// has a non-deterministic set of key-value pairs.
1359 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1360 let _ = nondet;
1361 KeyedSingleton::new(
1362 self.location.clone().tick,
1363 HydroNode::Batch {
1364 inner: Box::new(self.ir_node.into_inner()),
1365 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1366 K,
1367 V,
1368 Tick<L>,
1369 Bounded,
1370 >::collection_kind(
1371 )),
1372 },
1373 )
1374 }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379 #[cfg(feature = "deploy")]
1380 use futures::{SinkExt, StreamExt};
1381 #[cfg(feature = "deploy")]
1382 use hydro_deploy::Deployment;
1383 #[cfg(any(feature = "deploy", feature = "sim"))]
1384 use stageleft::q;
1385
1386 #[cfg(any(feature = "deploy", feature = "sim"))]
1387 use crate::compile::builder::FlowBuilder;
1388 #[cfg(any(feature = "deploy", feature = "sim"))]
1389 use crate::location::Location;
1390 #[cfg(any(feature = "deploy", feature = "sim"))]
1391 use crate::nondet::nondet;
1392
1393 #[cfg(feature = "deploy")]
1394 #[tokio::test]
1395 async fn key_count_bounded_value() {
1396 let mut deployment = Deployment::new();
1397
1398 let flow = FlowBuilder::new();
1399 let node = flow.process::<()>();
1400 let external = flow.external::<()>();
1401
1402 let (input_port, input) = node.source_external_bincode(&external);
1403 let out = input
1404 .into_keyed()
1405 .first()
1406 .key_count()
1407 .sample_eager(nondet!(/** test */))
1408 .send_bincode_external(&external);
1409
1410 let nodes = flow
1411 .with_process(&node, deployment.Localhost())
1412 .with_external(&external, deployment.Localhost())
1413 .deploy(&mut deployment);
1414
1415 deployment.deploy().await.unwrap();
1416
1417 let mut external_in = nodes.connect(input_port).await;
1418 let mut external_out = nodes.connect(out).await;
1419
1420 deployment.start().await.unwrap();
1421
1422 assert_eq!(external_out.next().await.unwrap(), 0);
1423
1424 external_in.send((1, 1)).await.unwrap();
1425 assert_eq!(external_out.next().await.unwrap(), 1);
1426
1427 external_in.send((2, 2)).await.unwrap();
1428 assert_eq!(external_out.next().await.unwrap(), 2);
1429 }
1430
1431 #[cfg(feature = "deploy")]
1432 #[tokio::test]
1433 async fn key_count_unbounded_value() {
1434 let mut deployment = Deployment::new();
1435
1436 let flow = FlowBuilder::new();
1437 let node = flow.process::<()>();
1438 let external = flow.external::<()>();
1439
1440 let (input_port, input) = node.source_external_bincode(&external);
1441 let out = input
1442 .into_keyed()
1443 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1444 .key_count()
1445 .sample_eager(nondet!(/** test */))
1446 .send_bincode_external(&external);
1447
1448 let nodes = flow
1449 .with_process(&node, deployment.Localhost())
1450 .with_external(&external, deployment.Localhost())
1451 .deploy(&mut deployment);
1452
1453 deployment.deploy().await.unwrap();
1454
1455 let mut external_in = nodes.connect(input_port).await;
1456 let mut external_out = nodes.connect(out).await;
1457
1458 deployment.start().await.unwrap();
1459
1460 assert_eq!(external_out.next().await.unwrap(), 0);
1461
1462 external_in.send((1, 1)).await.unwrap();
1463 assert_eq!(external_out.next().await.unwrap(), 1);
1464
1465 external_in.send((1, 2)).await.unwrap();
1466 assert_eq!(external_out.next().await.unwrap(), 1);
1467
1468 external_in.send((2, 2)).await.unwrap();
1469 assert_eq!(external_out.next().await.unwrap(), 2);
1470
1471 external_in.send((1, 1)).await.unwrap();
1472 assert_eq!(external_out.next().await.unwrap(), 2);
1473
1474 external_in.send((3, 1)).await.unwrap();
1475 assert_eq!(external_out.next().await.unwrap(), 3);
1476 }
1477
1478 #[cfg(feature = "deploy")]
1479 #[tokio::test]
1480 async fn into_singleton_bounded_value() {
1481 let mut deployment = Deployment::new();
1482
1483 let flow = FlowBuilder::new();
1484 let node = flow.process::<()>();
1485 let external = flow.external::<()>();
1486
1487 let (input_port, input) = node.source_external_bincode(&external);
1488 let out = input
1489 .into_keyed()
1490 .first()
1491 .into_singleton()
1492 .sample_eager(nondet!(/** test */))
1493 .send_bincode_external(&external);
1494
1495 let nodes = flow
1496 .with_process(&node, deployment.Localhost())
1497 .with_external(&external, deployment.Localhost())
1498 .deploy(&mut deployment);
1499
1500 deployment.deploy().await.unwrap();
1501
1502 let mut external_in = nodes.connect(input_port).await;
1503 let mut external_out = nodes.connect(out).await;
1504
1505 deployment.start().await.unwrap();
1506
1507 assert_eq!(
1508 external_out.next().await.unwrap(),
1509 std::collections::HashMap::new()
1510 );
1511
1512 external_in.send((1, 1)).await.unwrap();
1513 assert_eq!(
1514 external_out.next().await.unwrap(),
1515 vec![(1, 1)].into_iter().collect()
1516 );
1517
1518 external_in.send((2, 2)).await.unwrap();
1519 assert_eq!(
1520 external_out.next().await.unwrap(),
1521 vec![(1, 1), (2, 2)].into_iter().collect()
1522 );
1523 }
1524
1525 #[cfg(feature = "deploy")]
1526 #[tokio::test]
1527 async fn into_singleton_unbounded_value() {
1528 let mut deployment = Deployment::new();
1529
1530 let flow = FlowBuilder::new();
1531 let node = flow.process::<()>();
1532 let external = flow.external::<()>();
1533
1534 let (input_port, input) = node.source_external_bincode(&external);
1535 let out = input
1536 .into_keyed()
1537 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1538 .into_singleton()
1539 .sample_eager(nondet!(/** test */))
1540 .send_bincode_external(&external);
1541
1542 let nodes = flow
1543 .with_process(&node, deployment.Localhost())
1544 .with_external(&external, deployment.Localhost())
1545 .deploy(&mut deployment);
1546
1547 deployment.deploy().await.unwrap();
1548
1549 let mut external_in = nodes.connect(input_port).await;
1550 let mut external_out = nodes.connect(out).await;
1551
1552 deployment.start().await.unwrap();
1553
1554 assert_eq!(
1555 external_out.next().await.unwrap(),
1556 std::collections::HashMap::new()
1557 );
1558
1559 external_in.send((1, 1)).await.unwrap();
1560 assert_eq!(
1561 external_out.next().await.unwrap(),
1562 vec![(1, 1)].into_iter().collect()
1563 );
1564
1565 external_in.send((1, 2)).await.unwrap();
1566 assert_eq!(
1567 external_out.next().await.unwrap(),
1568 vec![(1, 2)].into_iter().collect()
1569 );
1570
1571 external_in.send((2, 2)).await.unwrap();
1572 assert_eq!(
1573 external_out.next().await.unwrap(),
1574 vec![(1, 2), (2, 1)].into_iter().collect()
1575 );
1576
1577 external_in.send((1, 1)).await.unwrap();
1578 assert_eq!(
1579 external_out.next().await.unwrap(),
1580 vec![(1, 3), (2, 1)].into_iter().collect()
1581 );
1582
1583 external_in.send((3, 1)).await.unwrap();
1584 assert_eq!(
1585 external_out.next().await.unwrap(),
1586 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1587 );
1588 }
1589
1590 #[cfg(feature = "sim")]
1591 #[test]
1592 fn sim_unbounded_singleton_snapshot() {
1593 let flow = FlowBuilder::new();
1594 let node = flow.process::<()>();
1595
1596 let (input_port, input) = node.sim_input();
1597 let output = input
1598 .into_keyed()
1599 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1600 .snapshot(&node.tick(), nondet!(/** test */))
1601 .entries()
1602 .all_ticks()
1603 .sim_output();
1604
1605 let count = flow.sim().exhaustive(async || {
1606 input_port.send((1, 123));
1607 input_port.send((1, 456));
1608 input_port.send((2, 123));
1609
1610 let all = output.collect_sorted::<Vec<_>>().await;
1611 assert_eq!(all.last().unwrap(), &(2, 1));
1612 });
1613
1614 assert_eq!(count, 8);
1615 }
1616
1617 #[cfg(feature = "deploy")]
1618 #[tokio::test]
1619 async fn get_many_outer_join() {
1620 let mut deployment = Deployment::new();
1621
1622 let flow = FlowBuilder::new();
1623 let node = flow.process::<()>();
1624 let external = flow.external::<()>();
1625
1626 let tick = node.tick();
1627 let keyed_data = node
1628 .source_iter(q!(vec![(1, 10), (2, 20)]))
1629 .into_keyed()
1630 .batch(&tick, nondet!(/** test */))
1631 .first();
1632 let requests = node
1633 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1634 .into_keyed()
1635 .batch(&tick, nondet!(/** test */));
1636
1637 let out = keyed_data
1638 .get_many(requests)
1639 .entries()
1640 .all_ticks()
1641 .send_bincode_external(&external);
1642
1643 let nodes = flow
1644 .with_process(&node, deployment.Localhost())
1645 .with_external(&external, deployment.Localhost())
1646 .deploy(&mut deployment);
1647
1648 deployment.deploy().await.unwrap();
1649
1650 let mut external_out = nodes.connect(out).await;
1651
1652 deployment.start().await.unwrap();
1653
1654 let mut results = vec![];
1655 for _ in 0..3 {
1656 results.push(external_out.next().await.unwrap());
1657 }
1658 results.sort();
1659
1660 assert_eq!(
1661 results,
1662 vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]
1663 );
1664 }
1665}