hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::hash::Hash;
4
5use stageleft::{IntoQuotedMut, QuotedWithContext, q};
6
7use super::boundedness::{Bounded, Boundedness, Unbounded};
8use super::keyed_stream::KeyedStream;
9use super::optional::Optional;
10use super::singleton::Singleton;
11use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
12use crate::forward_handle::ForwardRef;
13#[cfg(stageleft_runtime)]
14use crate::forward_handle::{CycleCollection, ReceiverComplete};
15use crate::live_collections::stream::{Ordering, Retries};
16use crate::location::dynamic::LocationId;
17use crate::location::tick::NoAtomic;
18use crate::location::{Atomic, Location, NoTick, Tick};
19use crate::manual_expr::ManualExpr;
20use crate::nondet::{NonDet, nondet};
21
22/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
23///
24/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
25/// removed / changed), this also includes an additional variant [`BoundedValue`], which indicates
26/// that entries may be added over time, but once an entry is added it will never be removed and
27/// its value will never change.
28pub trait KeyedSingletonBound {
29 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
30 type UnderlyingBound: Boundedness;
31 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
32 type ValueBound: Boundedness;
33}
34
35impl KeyedSingletonBound for Unbounded {
36 type UnderlyingBound = Unbounded;
37 type ValueBound = Unbounded;
38}
39
40impl KeyedSingletonBound for Bounded {
41 type UnderlyingBound = Bounded;
42 type ValueBound = Bounded;
43}
44
45/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
46/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
47/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
48pub struct BoundedValue;
49
50impl KeyedSingletonBound for BoundedValue {
51 type UnderlyingBound = Unbounded;
52 type ValueBound = Bounded;
53}
54
55/// Mapping from keys of type `K` to values of type `V`.
56///
57/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
58/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
59/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
60/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
61/// keys cannot be removed and the value for each key is immutable.
62///
63/// Type Parameters:
64/// - `K`: the type of the key for each entry
65/// - `V`: the type of the value for each entry
66/// - `Loc`: the [`Location`] where the keyed singleton is materialized
67/// - `Bound`: tracks whether the entries are:
68/// - [`Bounded`] (local and finite)
69/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
70/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
71pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
72 pub(crate) underlying: Stream<(K, V), Loc, Bound::UnderlyingBound, NoOrder, ExactlyOnce>,
73}
74
75impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
76 for KeyedSingleton<K, V, Loc, Bound>
77{
78 fn clone(&self) -> Self {
79 KeyedSingleton {
80 underlying: self.underlying.clone(),
81 }
82 }
83}
84
85impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
86 for KeyedSingleton<K, V, L, B>
87where
88 L: Location<'a> + NoTick,
89{
90 type Location = L;
91
92 fn create_source(ident: syn::Ident, location: L) -> Self {
93 KeyedSingleton {
94 underlying: Stream::create_source(ident, location),
95 }
96 }
97}
98
99impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
100 for KeyedSingleton<K, V, L, B>
101where
102 L: Location<'a> + NoTick,
103{
104 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
105 self.underlying.complete(ident, expected_location);
106 }
107}
108
109impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
110 KeyedSingleton<K, V, L, B>
111{
112 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
113 ///
114 /// The value for each key must be bounded, otherwise the resulting stream elements would be
115 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
116 /// into the output.
117 ///
118 /// # Example
119 /// ```rust
120 /// # use hydro_lang::prelude::*;
121 /// # use futures::StreamExt;
122 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
123 /// let keyed_singleton = // { 1: 2, 2: 4 }
124 /// # process
125 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
126 /// # .into_keyed()
127 /// # .first();
128 /// keyed_singleton.entries()
129 /// # }, |mut stream| async move {
130 /// // (1, 2), (2, 4) in any order
131 /// # let mut results = Vec::new();
132 /// # for _ in 0..2 {
133 /// # results.push(stream.next().await.unwrap());
134 /// # }
135 /// # results.sort();
136 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
137 /// # }));
138 /// ```
139 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
140 self.underlying
141 }
142
143 /// Flattens the keyed singleton into an unordered stream of just the values.
144 ///
145 /// The value for each key must be bounded, otherwise the resulting stream elements would be
146 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
147 /// into the output.
148 ///
149 /// # Example
150 /// ```rust
151 /// # use hydro_lang::prelude::*;
152 /// # use futures::StreamExt;
153 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
154 /// let keyed_singleton = // { 1: 2, 2: 4 }
155 /// # process
156 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
157 /// # .into_keyed()
158 /// # .first();
159 /// keyed_singleton.values()
160 /// # }, |mut stream| async move {
161 /// // 2, 4 in any order
162 /// # let mut results = Vec::new();
163 /// # for _ in 0..2 {
164 /// # results.push(stream.next().await.unwrap());
165 /// # }
166 /// # results.sort();
167 /// # assert_eq!(results, vec![2, 4]);
168 /// # }));
169 /// ```
170 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
171 self.entries().map(q!(|(_, v)| v))
172 }
173
174 /// Flattens the keyed singleton into an unordered stream of just the keys.
175 ///
176 /// The value for each key must be bounded, otherwise the removal of keys would result in
177 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
178 /// into the output.
179 ///
180 /// # Example
181 /// ```rust
182 /// # use hydro_lang::prelude::*;
183 /// # use futures::StreamExt;
184 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
185 /// let keyed_singleton = // { 1: 2, 2: 4 }
186 /// # process
187 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
188 /// # .into_keyed()
189 /// # .first();
190 /// keyed_singleton.keys()
191 /// # }, |mut stream| async move {
192 /// // 1, 2 in any order
193 /// # let mut results = Vec::new();
194 /// # for _ in 0..2 {
195 /// # results.push(stream.next().await.unwrap());
196 /// # }
197 /// # results.sort();
198 /// # assert_eq!(results, vec![1, 2]);
199 /// # }));
200 /// ```
201 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
202 self.entries().map(q!(|(k, _)| k))
203 }
204
205 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
206 /// entries whose keys are not in the provided stream.
207 ///
208 /// # Example
209 /// ```rust
210 /// # use hydro_lang::prelude::*;
211 /// # use futures::StreamExt;
212 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
213 /// let tick = process.tick();
214 /// let keyed_singleton = // { 1: 2, 2: 4 }
215 /// # process
216 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
217 /// # .into_keyed()
218 /// # .first()
219 /// # .snapshot(&tick, nondet!(/** test */));
220 /// let keys_to_remove = process
221 /// .source_iter(q!(vec![1]))
222 /// .batch(&tick, nondet!(/** test */));
223 /// keyed_singleton.filter_key_not_in(keys_to_remove)
224 /// # .entries().all_ticks()
225 /// # }, |mut stream| async move {
226 /// // { 2: 4 }
227 /// # for w in vec![(2, 4)] {
228 /// # assert_eq!(stream.next().await.unwrap(), w);
229 /// # }
230 /// # }));
231 /// ```
232 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
233 self,
234 other: Stream<K, L, Bounded, O2, R2>,
235 ) -> Self
236 where
237 K: Hash + Eq,
238 {
239 KeyedSingleton {
240 underlying: self.entries().anti_join(other),
241 }
242 }
243
244 /// An operator which allows you to "inspect" each value of a keyed singleton without
245 /// modifying it. The closure `f` is called on a reference to each value. This is
246 /// mainly useful for debugging, and should not be used to generate side-effects.
247 ///
248 /// # Example
249 /// ```rust
250 /// # use hydro_lang::prelude::*;
251 /// # use futures::StreamExt;
252 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
253 /// let keyed_singleton = // { 1: 2, 2: 4 }
254 /// # process
255 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
256 /// # .into_keyed()
257 /// # .first();
258 /// keyed_singleton
259 /// .inspect(q!(|v| println!("{}", v)))
260 /// # .entries()
261 /// # }, |mut stream| async move {
262 /// // { 1: 2, 2: 4 }
263 /// # for w in vec![(1, 2), (2, 4)] {
264 /// # assert_eq!(stream.next().await.unwrap(), w);
265 /// # }
266 /// # }));
267 /// ```
268 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
269 where
270 F: Fn(&V) + 'a,
271 {
272 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
273 KeyedSingleton {
274 underlying: self.underlying.inspect(q!({
275 let orig = f;
276 move |(_k, v)| orig(v)
277 })),
278 }
279 }
280
281 /// An operator which allows you to "inspect" each entry of a keyed singleton without
282 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
283 /// mainly useful for debugging, and should not be used to generate side-effects.
284 ///
285 /// # Example
286 /// ```rust
287 /// # use hydro_lang::prelude::*;
288 /// # use futures::StreamExt;
289 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
290 /// let keyed_singleton = // { 1: 2, 2: 4 }
291 /// # process
292 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
293 /// # .into_keyed()
294 /// # .first();
295 /// keyed_singleton
296 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
297 /// # .entries()
298 /// # }, |mut stream| async move {
299 /// // { 1: 2, 2: 4 }
300 /// # for w in vec![(1, 2), (2, 4)] {
301 /// # assert_eq!(stream.next().await.unwrap(), w);
302 /// # }
303 /// # }));
304 /// ```
305 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> KeyedSingleton<K, V, L, B>
306 where
307 F: Fn(&(K, V)) + 'a,
308 {
309 KeyedSingleton {
310 underlying: self.underlying.inspect(f),
311 }
312 }
313
314 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
315 /// element, the value.
316 ///
317 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
318 ///
319 /// # Example
320 /// ```rust
321 /// # use hydro_lang::prelude::*;
322 /// # use futures::StreamExt;
323 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
324 /// let keyed_singleton = // { 1: 2, 2: 4 }
325 /// # process
326 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
327 /// # .into_keyed()
328 /// # .first();
329 /// keyed_singleton
330 /// .clone()
331 /// .into_keyed_stream()
332 /// .interleave(
333 /// keyed_singleton.into_keyed_stream()
334 /// )
335 /// # .entries()
336 /// # }, |mut stream| async move {
337 /// /// // { 1: [2, 2], 2: [4, 4] }
338 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
339 /// # assert_eq!(stream.next().await.unwrap(), w);
340 /// # }
341 /// # }));
342 /// ```
343 pub fn into_keyed_stream(
344 self,
345 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
346 self.underlying
347 .into_keyed()
348 .assume_ordering(nondet!(/** only one element per key */))
349 }
350}
351
352impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
353 /// Transforms each value by invoking `f` on each element, with keys staying the same
354 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
355 ///
356 /// If you do not want to modify the stream and instead only want to view
357 /// each item use [`KeyedStream::inspect`] instead.
358 ///
359 /// # Example
360 /// ```rust
361 /// # use hydro_lang::prelude::*;
362 /// # use futures::StreamExt;
363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364 /// let keyed_singleton = // { 1: 2, 2: 4 }
365 /// # process
366 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
367 /// # .into_keyed()
368 /// # .first();
369 /// keyed_singleton.map(q!(|v| v + 1))
370 /// # .entries()
371 /// # }, |mut stream| async move {
372 /// // { 1: 3, 2: 5 }
373 /// # let mut results = Vec::new();
374 /// # for _ in 0..2 {
375 /// # results.push(stream.next().await.unwrap());
376 /// # }
377 /// # results.sort();
378 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
379 /// # }));
380 /// ```
381 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
382 where
383 F: Fn(V) -> U + 'a,
384 {
385 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
386 KeyedSingleton {
387 underlying: self.underlying.map(q!({
388 let orig = f;
389 move |(k, v)| (k, orig(v))
390 })),
391 }
392 }
393
394 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
395 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
396 ///
397 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
398 /// the new value `U`. The key remains unchanged in the output.
399 ///
400 /// # Example
401 /// ```rust
402 /// # use hydro_lang::prelude::*;
403 /// # use futures::StreamExt;
404 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405 /// let keyed_singleton = // { 1: 2, 2: 4 }
406 /// # process
407 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
408 /// # .into_keyed()
409 /// # .first();
410 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
411 /// # .entries()
412 /// # }, |mut stream| async move {
413 /// // { 1: 3, 2: 6 }
414 /// # let mut results = Vec::new();
415 /// # for _ in 0..2 {
416 /// # results.push(stream.next().await.unwrap());
417 /// # }
418 /// # results.sort();
419 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
420 /// # }));
421 /// ```
422 pub fn map_with_key<U, F>(
423 self,
424 f: impl IntoQuotedMut<'a, F, L> + Copy,
425 ) -> KeyedSingleton<K, U, L, B>
426 where
427 F: Fn((K, V)) -> U + 'a,
428 K: Clone,
429 {
430 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
431 KeyedSingleton {
432 underlying: self.underlying.map(q!({
433 let orig = f;
434 move |(k, v)| {
435 let out = orig((k.clone(), v));
436 (k, out)
437 }
438 })),
439 }
440 }
441
442 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
443 ///
444 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
445 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
446 /// is filtered out.
447 ///
448 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
449 /// not modify or take ownership of the values. If you need to modify the values while filtering
450 /// use [`KeyedSingleton::filter_map`] instead.
451 ///
452 /// # Example
453 /// ```rust
454 /// # use hydro_lang::prelude::*;
455 /// # use futures::StreamExt;
456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
458 /// # process
459 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
460 /// # .into_keyed()
461 /// # .first();
462 /// keyed_singleton.filter(q!(|&v| v > 1))
463 /// # .entries()
464 /// # }, |mut stream| async move {
465 /// // { 1: 2, 2: 4 }
466 /// # let mut results = Vec::new();
467 /// # for _ in 0..2 {
468 /// # results.push(stream.next().await.unwrap());
469 /// # }
470 /// # results.sort();
471 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
472 /// # }));
473 /// ```
474 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
475 where
476 F: Fn(&V) -> bool + 'a,
477 {
478 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
479 KeyedSingleton {
480 underlying: self.underlying.filter(q!({
481 let orig = f;
482 move |(_k, v)| orig(v)
483 })),
484 }
485 }
486
487 /// An operator that both filters and maps values. It yields only the key-value pairs where
488 /// the supplied closure `f` returns `Some(value)`.
489 ///
490 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
491 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
492 /// If it returns `None`, the key-value pair is filtered out.
493 ///
494 /// # Example
495 /// ```rust
496 /// # use hydro_lang::prelude::*;
497 /// # use futures::StreamExt;
498 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
499 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
500 /// # process
501 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
502 /// # .into_keyed()
503 /// # .first();
504 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
505 /// # .entries()
506 /// # }, |mut stream| async move {
507 /// // { 1: 42, 3: 100 }
508 /// # let mut results = Vec::new();
509 /// # for _ in 0..2 {
510 /// # results.push(stream.next().await.unwrap());
511 /// # }
512 /// # results.sort();
513 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
514 /// # }));
515 /// ```
516 pub fn filter_map<F, U>(
517 self,
518 f: impl IntoQuotedMut<'a, F, L> + Copy,
519 ) -> KeyedSingleton<K, U, L, B>
520 where
521 F: Fn(V) -> Option<U> + 'a,
522 {
523 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
524 KeyedSingleton {
525 underlying: self.underlying.filter_map(q!({
526 let orig = f;
527 move |(k, v)| orig(v).map(|v| (k, v))
528 })),
529 }
530 }
531
532 /// Gets the number of keys in the keyed singleton.
533 ///
534 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
535 /// since keys may be added / removed over time. When the set of keys changes, the count will
536 /// be asynchronously updated.
537 ///
538 /// # Example
539 /// ```rust
540 /// # use hydro_lang::prelude::*;
541 /// # use futures::StreamExt;
542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543 /// # let tick = process.tick();
544 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
545 /// # process
546 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
547 /// # .into_keyed()
548 /// # .batch(&tick, nondet!(/** test */))
549 /// # .first();
550 /// keyed_singleton.key_count()
551 /// # .all_ticks()
552 /// # }, |mut stream| async move {
553 /// // 3
554 /// # assert_eq!(stream.next().await.unwrap(), 3);
555 /// # }));
556 /// ```
557 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
558 self.underlying.count()
559 }
560
561 /// An operator which allows you to "name" a `HydroNode`.
562 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
563 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
564 {
565 let mut node = self.underlying.ir_node.borrow_mut();
566 let metadata = node.metadata_mut();
567 metadata.tag = Some(name.to_string());
568 }
569 self
570 }
571}
572
573impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
574 /// Gets the value associated with a specific key from the keyed singleton.
575 ///
576 /// # Example
577 /// ```rust
578 /// # use hydro_lang::prelude::*;
579 /// # use futures::StreamExt;
580 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
581 /// let tick = process.tick();
582 /// let keyed_data = process
583 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
584 /// .into_keyed()
585 /// .batch(&tick, nondet!(/** test */))
586 /// .first();
587 /// let key = tick.singleton(q!(1));
588 /// keyed_data.get(key).all_ticks()
589 /// # }, |mut stream| async move {
590 /// // 2
591 /// # assert_eq!(stream.next().await.unwrap(), 2);
592 /// # }));
593 /// ```
594 pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
595 self.entries()
596 .join(key.into_stream().map(q!(|k| (k, ()))))
597 .map(q!(|(_, (v, _))| v))
598 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
599 .first()
600 }
601
602 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
603 /// is some additional metadata, emits a keyed stream of lookup results where the key
604 /// is the same as before, but the value is a tuple of the lookup result and the metadata
605 /// of the request. If the key is not found, no output will be produced.
606 ///
607 /// # Example
608 /// ```rust
609 /// # use hydro_lang::prelude::*;
610 /// # use futures::StreamExt;
611 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
612 /// let tick = process.tick();
613 /// let keyed_data = process
614 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
615 /// .into_keyed()
616 /// .batch(&tick, nondet!(/** test */))
617 /// .first();
618 /// let other_data = process
619 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
620 /// .into_keyed()
621 /// .batch(&tick, nondet!(/** test */));
622 /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
623 /// # }, |mut stream| async move {
624 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
625 /// # let mut results = vec![];
626 /// # for _ in 0..3 {
627 /// # results.push(stream.next().await.unwrap());
628 /// # }
629 /// # results.sort();
630 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
631 /// # }));
632 /// ```
633 pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
634 self,
635 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
636 ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
637 self.entries()
638 .weaker_retries::<R2>()
639 .join(requests.entries())
640 .into_keyed()
641 }
642
643 /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
644 /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
645 /// containing the value from `self` and an option of the value from `from`. If the key is not
646 /// present in `from`, the option will be [`None`].
647 ///
648 /// # Example
649 /// ```rust
650 /// # use hydro_lang::prelude::*;
651 /// # use futures::StreamExt;
652 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
653 /// # let tick = process.tick();
654 /// let requests = // { 1: 10, 2: 20 }
655 /// # process
656 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
657 /// # .into_keyed()
658 /// # .batch(&tick, nondet!(/** test */))
659 /// # .first();
660 /// let other_data = // { 10: 100, 11: 101 }
661 /// # process
662 /// # .source_iter(q!(vec![(10, 100), (11, 101)]))
663 /// # .into_keyed()
664 /// # .batch(&tick, nondet!(/** test */))
665 /// # .first();
666 /// requests.get_from(other_data)
667 /// # .entries().all_ticks()
668 /// # }, |mut stream| async move {
669 /// // { 1: (10, Some(100)), 2: (20, None) }
670 /// # let mut results = vec![];
671 /// # for _ in 0..2 {
672 /// # results.push(stream.next().await.unwrap());
673 /// # }
674 /// # results.sort();
675 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
676 /// # }));
677 /// ```
678 pub fn get_from<V2: Clone>(
679 self,
680 from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
681 ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
682 where
683 K: Clone,
684 V: Hash + Eq + Clone,
685 {
686 let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
687 let lookup_result = from.get_many_if_present(to_lookup.clone());
688 let missing_values =
689 to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
690 KeyedSingleton {
691 underlying: lookup_result
692 .entries()
693 .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
694 .chain(missing_values.entries().map(q!(|(v, k)| (k, (v, None))))),
695 }
696 }
697}
698
699impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
700where
701 L: Location<'a> + NoTick + NoAtomic,
702{
703 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
704 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
705 ///
706 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
707 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
708 /// argument that declares where the keyed singleton will be atomically processed. Batching a
709 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
710 /// batching into a different [`Tick`] will introduce asynchrony.
711 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
712 KeyedSingleton {
713 underlying: self.underlying.atomic(tick),
714 }
715 }
716
717 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
718 /// point in time.
719 ///
720 /// # Non-Determinism
721 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
722 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
723 pub fn snapshot(
724 self,
725 tick: &Tick<L>,
726 nondet: NonDet,
727 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
728 self.atomic(tick).snapshot(nondet)
729 }
730}
731
732impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
733where
734 L: Location<'a> + NoTick + NoAtomic,
735{
736 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
737 /// state of the keyed singleton being atomically processed.
738 ///
739 /// # Non-Determinism
740 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
741 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
742 pub fn snapshot(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
743 KeyedSingleton {
744 underlying: Stream::new(
745 self.underlying.location.tick,
746 // no need to unpersist due to top-level replay
747 self.underlying.ir_node.into_inner(),
748 ),
749 }
750 }
751
752 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
753 /// See [`KeyedSingleton::atomic`] for more details.
754 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
755 KeyedSingleton {
756 underlying: self.underlying.end_atomic(),
757 }
758 }
759}
760
761impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
762 /// Asynchronously yields this keyed singleton outside the tick, which will
763 /// be asynchronously updated with the latest set of entries inside the tick.
764 ///
765 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
766 /// tick that tracks the inner value. This is useful for getting the value as of the
767 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
768 ///
769 /// The entire set of entries are propagated on each tick, which means that if a tick
770 /// does not have a key "XYZ" that was present in the previous tick, the entry for "XYZ" will
771 /// also be removed from the output.
772 ///
773 /// # Example
774 /// ```rust
775 /// # use hydro_lang::prelude::*;
776 /// # use futures::StreamExt;
777 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
778 /// let tick = process.tick();
779 /// # // ticks are lazy by default, forces the second tick to run
780 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
781 /// # let batch_first_tick = process
782 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
783 /// # .batch(&tick, nondet!(/** test */))
784 /// # .into_keyed();
785 /// # let batch_second_tick = process
786 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
787 /// # .batch(&tick, nondet!(/** test */))
788 /// # .into_keyed()
789 /// # .defer_tick(); // appears on the second tick
790 /// # let input_batch = batch_first_tick.chain(batch_second_tick).first();
791 /// input_batch // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
792 /// .latest()
793 /// # .snapshot(&tick, nondet!(/** test */))
794 /// # .entries()
795 /// # .all_ticks()
796 /// # }, |mut stream| async move {
797 /// // asynchronously changes from { 1: 2, 2: 3 } ~> { 2: 4, 3: 5 }
798 /// # for w in vec![(1, 2), (2, 3), (2, 4), (3, 5)] {
799 /// # assert_eq!(stream.next().await.unwrap(), w);
800 /// # }
801 /// # }));
802 /// ```
803 pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
804 KeyedSingleton {
805 underlying: Stream::new(
806 self.underlying.location.outer().clone(),
807 // no need to persist due to top-level replay
808 self.underlying.ir_node.into_inner(),
809 ),
810 }
811 }
812
813 /// Synchronously yields this keyed singleton outside the tick as an unbounded keyed singleton,
814 /// which will be updated with the latest set of entries inside the tick.
815 ///
816 /// Unlike [`KeyedSingleton::latest`], this preserves synchronous execution, as the output
817 /// keyed singleton is emitted in an [`Atomic`] context that will process elements synchronously
818 /// with the input keyed singleton's [`Tick`] context.
819 pub fn latest_atomic(self) -> KeyedSingleton<K, V, Atomic<L>, Unbounded> {
820 KeyedSingleton {
821 underlying: Stream::new(
822 Atomic {
823 tick: self.underlying.location,
824 },
825 // no need to persist due to top-level replay
826 self.underlying.ir_node.into_inner(),
827 ),
828 }
829 }
830
831 #[expect(missing_docs, reason = "TODO")]
832 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
833 KeyedSingleton {
834 underlying: self.underlying.defer_tick(),
835 }
836 }
837}
838
839impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
840where
841 L: Location<'a> + NoTick + NoAtomic,
842{
843 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
844 /// arrived since the previous batch was released.
845 ///
846 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
847 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
848 ///
849 /// # Non-Determinism
850 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
851 /// has a non-deterministic set of key-value pairs.
852 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
853 self.atomic(tick).batch(nondet)
854 }
855}
856
857impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
858where
859 L: Location<'a> + NoTick + NoAtomic,
860{
861 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
862 /// atomically processed.
863 ///
864 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
865 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
866 ///
867 /// # Non-Determinism
868 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
869 /// has a non-deterministic set of key-value pairs.
870 pub fn batch(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
871 KeyedSingleton {
872 underlying: self.underlying.batch(nondet),
873 }
874 }
875}