hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, DeferTick, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::NonDet;
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46 L: Location<'a>,
47{
48 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49 Singleton::new(singleton.location, singleton.ir_node.into_inner())
50 }
51}
52
53impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 fn defer_tick(self) -> Self {
58 Singleton::defer_tick(self)
59 }
60}
61
62impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
63where
64 L: Location<'a>,
65{
66 type Location = Tick<L>;
67
68 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
69 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
70 location.clone(),
71 HydroNode::DeferTick {
72 input: Box::new(HydroNode::CycleSource {
73 ident,
74 metadata: location.new_node_metadata::<T>(),
75 }),
76 metadata: location.new_node_metadata::<T>(),
77 },
78 );
79
80 from_previous_tick.unwrap_or(initial)
81 }
82}
83
84impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
85where
86 L: Location<'a>,
87{
88 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
89 assert_eq!(
90 Location::id(&self.location),
91 expected_location,
92 "locations do not match"
93 );
94 self.location
95 .flow_state()
96 .borrow_mut()
97 .push_root(HydroRoot::CycleSink {
98 ident,
99 input: Box::new(self.ir_node.into_inner()),
100 out_location: Location::id(&self.location),
101 op_metadata: HydroIrOpMetadata::new(),
102 });
103 }
104}
105
106impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
107where
108 L: Location<'a>,
109{
110 type Location = Tick<L>;
111
112 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
113 Singleton::new(
114 location.clone(),
115 HydroNode::CycleSource {
116 ident,
117 metadata: location.new_node_metadata::<T>(),
118 },
119 )
120 }
121}
122
123impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
124where
125 L: Location<'a>,
126{
127 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
128 assert_eq!(
129 Location::id(&self.location),
130 expected_location,
131 "locations do not match"
132 );
133 self.location
134 .flow_state()
135 .borrow_mut()
136 .push_root(HydroRoot::CycleSink {
137 ident,
138 input: Box::new(self.ir_node.into_inner()),
139 out_location: Location::id(&self.location),
140 op_metadata: HydroIrOpMetadata::new(),
141 });
142 }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
146where
147 L: Location<'a> + NoTick,
148{
149 type Location = L;
150
151 fn create_source(ident: syn::Ident, location: L) -> Self {
152 Singleton::new(
153 location.clone(),
154 HydroNode::Persist {
155 inner: Box::new(HydroNode::CycleSource {
156 ident,
157 metadata: location.new_node_metadata::<T>(),
158 }),
159 metadata: location.new_node_metadata::<T>(),
160 },
161 )
162 }
163}
164
165impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
166where
167 L: Location<'a> + NoTick,
168{
169 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
170 assert_eq!(
171 Location::id(&self.location),
172 expected_location,
173 "locations do not match"
174 );
175 let metadata = self.location.new_node_metadata::<T>();
176 self.location
177 .flow_state()
178 .borrow_mut()
179 .push_root(HydroRoot::CycleSink {
180 ident,
181 input: Box::new(HydroNode::Unpersist {
182 inner: Box::new(self.ir_node.into_inner()),
183 metadata: metadata.clone(),
184 }),
185 out_location: Location::id(&self.location),
186 op_metadata: HydroIrOpMetadata::new(),
187 });
188 }
189}
190
191impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
192where
193 T: Clone,
194 L: Location<'a>,
195{
196 fn clone(&self) -> Self {
197 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
198 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
199 *self.ir_node.borrow_mut() = HydroNode::Tee {
200 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
201 metadata: self.location.new_node_metadata::<T>(),
202 };
203 }
204
205 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
206 Singleton {
207 location: self.location.clone(),
208 ir_node: HydroNode::Tee {
209 inner: TeeNode(inner.0.clone()),
210 metadata: metadata.clone(),
211 }
212 .into(),
213 _phantom: PhantomData,
214 }
215 } else {
216 unreachable!()
217 }
218 }
219}
220
221impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
222where
223 L: Location<'a>,
224{
225 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
226 Singleton {
227 location,
228 ir_node: RefCell::new(ir_node),
229 _phantom: PhantomData,
230 }
231 }
232
233 /// Transforms the singleton value by applying a function `f` to it,
234 /// continuously as the input is updated.
235 ///
236 /// # Example
237 /// ```rust
238 /// # use hydro_lang::prelude::*;
239 /// # use futures::StreamExt;
240 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
241 /// let tick = process.tick();
242 /// let singleton = tick.singleton(q!(5));
243 /// singleton.map(q!(|v| v * 2)).all_ticks()
244 /// # }, |mut stream| async move {
245 /// // 10
246 /// # assert_eq!(stream.next().await.unwrap(), 10);
247 /// # }));
248 /// ```
249 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
250 where
251 F: Fn(T) -> U + 'a,
252 {
253 let f = f.splice_fn1_ctx(&self.location).into();
254 Singleton::new(
255 self.location.clone(),
256 HydroNode::Map {
257 f,
258 input: Box::new(self.ir_node.into_inner()),
259 metadata: self.location.new_node_metadata::<U>(),
260 },
261 )
262 }
263
264 /// Transforms the singleton value by applying a function `f` to it and then flattening
265 /// the result into a stream, preserving the order of elements.
266 ///
267 /// The function `f` is applied to the singleton value to produce an iterator, and all items
268 /// from that iterator are emitted in the output stream in deterministic order.
269 ///
270 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
271 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
272 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
273 ///
274 /// # Example
275 /// ```rust
276 /// # use hydro_lang::prelude::*;
277 /// # use futures::StreamExt;
278 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
279 /// let tick = process.tick();
280 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
281 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
282 /// # }, |mut stream| async move {
283 /// // 1, 2, 3
284 /// # for w in vec![1, 2, 3] {
285 /// # assert_eq!(stream.next().await.unwrap(), w);
286 /// # }
287 /// # }));
288 /// ```
289 pub fn flat_map_ordered<U, I, F>(
290 self,
291 f: impl IntoQuotedMut<'a, F, L>,
292 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
293 where
294 I: IntoIterator<Item = U>,
295 F: Fn(T) -> I + 'a,
296 {
297 let f = f.splice_fn1_ctx(&self.location).into();
298 Stream::new(
299 self.location.clone(),
300 HydroNode::FlatMap {
301 f,
302 input: Box::new(self.ir_node.into_inner()),
303 metadata: self.location.new_node_metadata::<U>(),
304 },
305 )
306 }
307
308 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
309 /// for the output type `I` to produce items in any order.
310 ///
311 /// The function `f` is applied to the singleton value to produce an iterator, and all items
312 /// from that iterator are emitted in the output stream in non-deterministic order.
313 ///
314 /// # Example
315 /// ```rust
316 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
317 /// # use futures::StreamExt;
318 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
319 /// let tick = process.tick();
320 /// let singleton = tick.singleton(q!(
321 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
322 /// ));
323 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
324 /// # }, |mut stream| async move {
325 /// // 1, 2, 3, but in no particular order
326 /// # let mut results = Vec::new();
327 /// # for _ in 0..3 {
328 /// # results.push(stream.next().await.unwrap());
329 /// # }
330 /// # results.sort();
331 /// # assert_eq!(results, vec![1, 2, 3]);
332 /// # }));
333 /// ```
334 pub fn flat_map_unordered<U, I, F>(
335 self,
336 f: impl IntoQuotedMut<'a, F, L>,
337 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
338 where
339 I: IntoIterator<Item = U>,
340 F: Fn(T) -> I + 'a,
341 {
342 let f = f.splice_fn1_ctx(&self.location).into();
343 Stream::new(
344 self.location.clone(),
345 HydroNode::FlatMap {
346 f,
347 input: Box::new(self.ir_node.into_inner()),
348 metadata: self.location.new_node_metadata::<U>(),
349 },
350 )
351 }
352
353 /// Flattens the singleton value into a stream, preserving the order of elements.
354 ///
355 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
356 /// are emitted in the output stream in deterministic order.
357 ///
358 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
359 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
360 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
361 ///
362 /// # Example
363 /// ```rust
364 /// # use hydro_lang::prelude::*;
365 /// # use futures::StreamExt;
366 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367 /// let tick = process.tick();
368 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
369 /// singleton.flatten_ordered().all_ticks()
370 /// # }, |mut stream| async move {
371 /// // 1, 2, 3
372 /// # for w in vec![1, 2, 3] {
373 /// # assert_eq!(stream.next().await.unwrap(), w);
374 /// # }
375 /// # }));
376 /// ```
377 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
378 where
379 T: IntoIterator<Item = U>,
380 {
381 self.flat_map_ordered(q!(|x| x))
382 }
383
384 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
385 /// for the element type `T` to produce items in any order.
386 ///
387 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
388 /// are emitted in the output stream in non-deterministic order.
389 ///
390 /// # Example
391 /// ```rust
392 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
393 /// # use futures::StreamExt;
394 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
395 /// let tick = process.tick();
396 /// let singleton = tick.singleton(q!(
397 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
398 /// ));
399 /// singleton.flatten_unordered().all_ticks()
400 /// # }, |mut stream| async move {
401 /// // 1, 2, 3, but in no particular order
402 /// # let mut results = Vec::new();
403 /// # for _ in 0..3 {
404 /// # results.push(stream.next().await.unwrap());
405 /// # }
406 /// # results.sort();
407 /// # assert_eq!(results, vec![1, 2, 3]);
408 /// # }));
409 /// ```
410 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
411 where
412 T: IntoIterator<Item = U>,
413 {
414 self.flat_map_unordered(q!(|x| x))
415 }
416
417 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
418 ///
419 /// If the predicate returns `true`, the output optional contains the same value.
420 /// If the predicate returns `false`, the output optional is empty.
421 ///
422 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
423 /// not modify or take ownership of the value. If you need to modify the value while filtering
424 /// use [`Singleton::filter_map`] instead.
425 ///
426 /// # Example
427 /// ```rust
428 /// # use hydro_lang::prelude::*;
429 /// # use futures::StreamExt;
430 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
431 /// let tick = process.tick();
432 /// let singleton = tick.singleton(q!(5));
433 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
434 /// # }, |mut stream| async move {
435 /// // 5
436 /// # assert_eq!(stream.next().await.unwrap(), 5);
437 /// # }));
438 /// ```
439 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
440 where
441 F: Fn(&T) -> bool + 'a,
442 {
443 let f = f.splice_fn1_borrow_ctx(&self.location).into();
444 Optional::new(
445 self.location.clone(),
446 HydroNode::Filter {
447 f,
448 input: Box::new(self.ir_node.into_inner()),
449 metadata: self.location.new_node_metadata::<T>(),
450 },
451 )
452 }
453
454 /// An operator that both filters and maps. It yields the value only if the supplied
455 /// closure `f` returns `Some(value)`.
456 ///
457 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
458 /// If the closure returns `None`, the output optional is empty.
459 ///
460 /// # Example
461 /// ```rust
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// let tick = process.tick();
466 /// let singleton = tick.singleton(q!("42"));
467 /// singleton
468 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
469 /// .all_ticks()
470 /// # }, |mut stream| async move {
471 /// // 42
472 /// # assert_eq!(stream.next().await.unwrap(), 42);
473 /// # }));
474 /// ```
475 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
476 where
477 F: Fn(T) -> Option<U> + 'a,
478 {
479 let f = f.splice_fn1_ctx(&self.location).into();
480 Optional::new(
481 self.location.clone(),
482 HydroNode::FilterMap {
483 f,
484 input: Box::new(self.ir_node.into_inner()),
485 metadata: self.location.new_node_metadata::<U>(),
486 },
487 )
488 }
489
490 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
491 ///
492 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
493 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
494 /// non-null. This is useful for combining several pieces of state together.
495 ///
496 /// # Example
497 /// ```rust
498 /// # use hydro_lang::prelude::*;
499 /// # use futures::StreamExt;
500 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
501 /// let tick = process.tick();
502 /// let numbers = process
503 /// .source_iter(q!(vec![123, 456]))
504 /// .batch(&tick, nondet!(/** test */));
505 /// let count = numbers.clone().count(); // Singleton
506 /// let max = numbers.max(); // Optional
507 /// count.zip(max).all_ticks()
508 /// # }, |mut stream| async move {
509 /// // [(2, 456)]
510 /// # for w in vec![(2, 456)] {
511 /// # assert_eq!(stream.next().await.unwrap(), w);
512 /// # }
513 /// # }));
514 /// ```
515 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
516 where
517 Self: ZipResult<'a, O, Location = L>,
518 {
519 check_matching_location(&self.location, &Self::other_location(&other));
520
521 if L::is_top_level() {
522 let left_ir_node = self.ir_node.into_inner();
523 let left_ir_node_metadata = left_ir_node.metadata().clone();
524 let right_ir_node = Self::other_ir_node(other);
525 let right_ir_node_metadata = right_ir_node.metadata().clone();
526
527 Self::make(
528 self.location.clone(),
529 HydroNode::Persist {
530 inner: Box::new(HydroNode::CrossSingleton {
531 left: Box::new(HydroNode::Unpersist {
532 inner: Box::new(left_ir_node),
533 metadata: left_ir_node_metadata,
534 }),
535 right: Box::new(HydroNode::Unpersist {
536 inner: Box::new(right_ir_node),
537 metadata: right_ir_node_metadata,
538 }),
539 metadata: self
540 .location
541 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
542 }),
543 metadata: self
544 .location
545 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
546 },
547 )
548 } else {
549 Self::make(
550 self.location.clone(),
551 HydroNode::CrossSingleton {
552 left: Box::new(self.ir_node.into_inner()),
553 right: Box::new(Self::other_ir_node(other)),
554 metadata: self
555 .location
556 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
557 },
558 )
559 }
560 }
561
562 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
563 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
564 ///
565 /// Useful for conditionally processing, such as only emitting a singleton's value outside
566 /// a tick if some other condition is satisfied.
567 ///
568 /// # Example
569 /// ```rust
570 /// # use hydro_lang::prelude::*;
571 /// # use futures::StreamExt;
572 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573 /// let tick = process.tick();
574 /// // ticks are lazy by default, forces the second tick to run
575 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
576 ///
577 /// let batch_first_tick = process
578 /// .source_iter(q!(vec![1]))
579 /// .batch(&tick, nondet!(/** test */));
580 /// let batch_second_tick = process
581 /// .source_iter(q!(vec![1, 2, 3]))
582 /// .batch(&tick, nondet!(/** test */))
583 /// .defer_tick(); // appears on the second tick
584 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
585 /// batch_first_tick.chain(batch_second_tick).count()
586 /// .filter_if_some(some_on_first_tick)
587 /// .all_ticks()
588 /// # }, |mut stream| async move {
589 /// // [1]
590 /// # for w in vec![1] {
591 /// # assert_eq!(stream.next().await.unwrap(), w);
592 /// # }
593 /// # }));
594 /// ```
595 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
596 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
597 .map(q!(|(d, _signal)| d))
598 }
599
600 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
601 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
602 ///
603 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
604 /// the condition.
605 ///
606 /// # Example
607 /// ```rust
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// let tick = process.tick();
612 /// // ticks are lazy by default, forces the second tick to run
613 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
614 ///
615 /// let batch_first_tick = process
616 /// .source_iter(q!(vec![1]))
617 /// .batch(&tick, nondet!(/** test */));
618 /// let batch_second_tick = process
619 /// .source_iter(q!(vec![1, 2, 3]))
620 /// .batch(&tick, nondet!(/** test */))
621 /// .defer_tick(); // appears on the second tick
622 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
623 /// batch_first_tick.chain(batch_second_tick).count()
624 /// .filter_if_none(some_on_first_tick)
625 /// .all_ticks()
626 /// # }, |mut stream| async move {
627 /// // [3]
628 /// # for w in vec![3] {
629 /// # assert_eq!(stream.next().await.unwrap(), w);
630 /// # }
631 /// # }));
632 /// ```
633 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
634 self.filter_if_some(
635 other
636 .map(q!(|_| ()))
637 .into_singleton()
638 .filter(q!(|o| o.is_none())),
639 )
640 }
641
642 /// An operator which allows you to "name" a `HydroNode`.
643 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
644 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
645 {
646 let mut node = self.ir_node.borrow_mut();
647 let metadata = node.metadata_mut();
648 metadata.tag = Some(name.to_string());
649 }
650 self
651 }
652}
653
654impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
655where
656 L: Location<'a> + NoTick,
657{
658 /// Returns a singleton value corresponding to the latest snapshot of the singleton
659 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
660 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
661 /// all snapshots of this singleton into the atomic-associated tick will observe the
662 /// same value each tick.
663 ///
664 /// # Non-Determinism
665 /// Because this picks a snapshot of a singleton whose value is continuously changing,
666 /// the output singleton has a non-deterministic value since the snapshot can be at an
667 /// arbitrary point in time.
668 pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
669 Singleton::new(
670 self.location.clone().tick,
671 HydroNode::Unpersist {
672 inner: Box::new(self.ir_node.into_inner()),
673 metadata: self.location.new_node_metadata::<T>(),
674 },
675 )
676 }
677
678 /// Returns this singleton back into a top-level, asynchronous execution context where updates
679 /// to the value will be asynchronously propagated.
680 pub fn end_atomic(self) -> Optional<T, L, B> {
681 Optional::new(self.location.tick.l, self.ir_node.into_inner())
682 }
683}
684
685impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
686where
687 L: Location<'a> + NoTick + NoAtomic,
688{
689 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
690 /// will observe the same version of the value and will be executed synchronously before any
691 /// outputs are yielded (in [`Optional::end_atomic`]).
692 ///
693 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
694 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
695 /// a different version).
696 ///
697 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
698 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
699 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
700 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
701 Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
702 }
703
704 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
705 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
706 /// relevant data that contributed to the snapshot at tick `t`.
707 ///
708 /// # Non-Determinism
709 /// Because this picks a snapshot of a singleton whose value is continuously changing,
710 /// the output singleton has a non-deterministic value since the snapshot can be at an
711 /// arbitrary point in time.
712 pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
713 where
714 L: NoTick,
715 {
716 self.atomic(tick).snapshot(nondet)
717 }
718
719 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
720 /// with order corresponding to increasing prefixes of data contributing to the singleton.
721 ///
722 /// # Non-Determinism
723 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
724 /// to non-deterministic batching and arrival of inputs, the output stream is
725 /// non-deterministic.
726 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
727 let tick = self.location.tick();
728 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
729 }
730
731 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
732 /// value taken at various points in time. Because the input singleton may be
733 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
734 /// represent the value of the singleton given some prefix of the streams leading up to
735 /// it.
736 ///
737 /// # Non-Determinism
738 /// The output stream is non-deterministic in which elements are sampled, since this
739 /// is controlled by a clock.
740 pub fn sample_every(
741 self,
742 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
743 nondet: NonDet,
744 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
745 let samples = self.location.source_interval(interval, nondet);
746 let tick = self.location.tick();
747
748 self.snapshot(&tick, nondet)
749 .filter_if_some(samples.batch(&tick, nondet).first())
750 .all_ticks()
751 .weakest_retries()
752 }
753}
754
755impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
756where
757 L: Location<'a>,
758{
759 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
760 /// which will stream the value computed in _each_ tick as a separate stream element.
761 ///
762 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
763 /// producing one element in the output for each tick. This is useful for batched computations,
764 /// where the results from each tick must be combined together.
765 ///
766 /// # Example
767 /// ```rust
768 /// # use hydro_lang::prelude::*;
769 /// # use futures::StreamExt;
770 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
771 /// let tick = process.tick();
772 /// # // ticks are lazy by default, forces the second tick to run
773 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
774 /// # let batch_first_tick = process
775 /// # .source_iter(q!(vec![1]))
776 /// # .batch(&tick, nondet!(/** test */));
777 /// # let batch_second_tick = process
778 /// # .source_iter(q!(vec![1, 2, 3]))
779 /// # .batch(&tick, nondet!(/** test */))
780 /// # .defer_tick(); // appears on the second tick
781 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
782 /// input_batch // first tick: [1], second tick: [1, 2, 3]
783 /// .count()
784 /// .all_ticks()
785 /// # }, |mut stream| async move {
786 /// // [1, 3]
787 /// # for w in vec![1, 3] {
788 /// # assert_eq!(stream.next().await.unwrap(), w);
789 /// # }
790 /// # }));
791 /// ```
792 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
793 self.into_stream().all_ticks()
794 }
795
796 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
797 /// which will stream the value computed in _each_ tick as a separate stream element.
798 ///
799 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
800 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
801 /// singleton's [`Tick`] context.
802 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
803 self.into_stream().all_ticks_atomic()
804 }
805
806 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
807 /// be asynchronously updated with the latest value of the singleton inside the tick.
808 ///
809 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
810 /// tick that tracks the inner value. This is useful for getting the value as of the
811 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
812 ///
813 /// # Example
814 /// ```rust
815 /// # use hydro_lang::prelude::*;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818 /// let tick = process.tick();
819 /// # // ticks are lazy by default, forces the second tick to run
820 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
821 /// # let batch_first_tick = process
822 /// # .source_iter(q!(vec![1]))
823 /// # .batch(&tick, nondet!(/** test */));
824 /// # let batch_second_tick = process
825 /// # .source_iter(q!(vec![1, 2, 3]))
826 /// # .batch(&tick, nondet!(/** test */))
827 /// # .defer_tick(); // appears on the second tick
828 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
829 /// input_batch // first tick: [1], second tick: [1, 2, 3]
830 /// .count()
831 /// .latest()
832 /// # .sample_eager(nondet!(/** test */))
833 /// # }, |mut stream| async move {
834 /// // asynchronously changes from 1 ~> 3
835 /// # for w in vec![1, 3] {
836 /// # assert_eq!(stream.next().await.unwrap(), w);
837 /// # }
838 /// # }));
839 /// ```
840 pub fn latest(self) -> Singleton<T, L, Unbounded> {
841 Singleton::new(
842 self.location.outer().clone(),
843 HydroNode::Persist {
844 inner: Box::new(self.ir_node.into_inner()),
845 metadata: self.location.new_node_metadata::<T>(),
846 },
847 )
848 }
849
850 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
851 /// be updated with the latest value of the singleton inside the tick.
852 ///
853 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
854 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
855 /// singleton's [`Tick`] context.
856 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
857 Singleton::new(
858 Atomic {
859 tick: self.location.clone(),
860 },
861 HydroNode::Persist {
862 inner: Box::new(self.ir_node.into_inner()),
863 metadata: self.location.new_node_metadata::<T>(),
864 },
865 )
866 }
867
868 #[expect(missing_docs, reason = "TODO")]
869 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
870 Singleton::new(
871 self.location.clone(),
872 HydroNode::DeferTick {
873 input: Box::new(self.ir_node.into_inner()),
874 metadata: self.location.new_node_metadata::<T>(),
875 },
876 )
877 }
878
879 #[deprecated(note = "use .into_stream().persist()")]
880 #[expect(missing_docs, reason = "deprecated")]
881 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
882 Stream::new(
883 self.location.clone(),
884 HydroNode::Persist {
885 inner: Box::new(self.ir_node.into_inner()),
886 metadata: self.location.new_node_metadata::<T>(),
887 },
888 )
889 }
890
891 /// Converts this singleton into a [`Stream`] containing a single element, the value.
892 ///
893 /// # Example
894 /// ```rust
895 /// # use hydro_lang::prelude::*;
896 /// # use futures::StreamExt;
897 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
898 /// let tick = process.tick();
899 /// let batch_input = process
900 /// .source_iter(q!(vec![123, 456]))
901 /// .batch(&tick, nondet!(/** test */));
902 /// batch_input.clone().chain(
903 /// batch_input.count().into_stream()
904 /// ).all_ticks()
905 /// # }, |mut stream| async move {
906 /// // [123, 456, 2]
907 /// # for w in vec![123, 456, 2] {
908 /// # assert_eq!(stream.next().await.unwrap(), w);
909 /// # }
910 /// # }));
911 /// ```
912 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
913 Stream::new(self.location, self.ir_node.into_inner())
914 }
915}
916
917#[doc(hidden)]
918/// Helper trait that determines the output collection type for [`Singleton::zip`].
919///
920/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
921/// [`Singleton`].
922#[sealed::sealed]
923pub trait ZipResult<'a, Other> {
924 /// The output collection type.
925 type Out;
926 /// The type of the tupled output value.
927 type ElementType;
928 /// The location where the tupled result will be materialized.
929 type Location: Location<'a>;
930
931 /// The location of the second input to the `zip`.
932 fn other_location(other: &Other) -> Self::Location;
933 /// The IR node of the second input to the `zip`.
934 fn other_ir_node(other: Other) -> HydroNode;
935
936 /// Constructs the output live collection given an IR node containing the zip result.
937 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
938}
939
940#[sealed::sealed]
941impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
942where
943 L: Location<'a>,
944{
945 type Out = Singleton<(T, U), L, B>;
946 type ElementType = (T, U);
947 type Location = L;
948
949 fn other_location(other: &Singleton<U, L, B>) -> L {
950 other.location.clone()
951 }
952
953 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
954 other.ir_node.into_inner()
955 }
956
957 fn make(location: L, ir_node: HydroNode) -> Self::Out {
958 Singleton::new(location, ir_node)
959 }
960}
961
962#[sealed::sealed]
963impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
964where
965 L: Location<'a>,
966{
967 type Out = Optional<(T, U), L, B>;
968 type ElementType = (T, U);
969 type Location = L;
970
971 fn other_location(other: &Optional<U, L, B>) -> L {
972 other.location.clone()
973 }
974
975 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
976 other.ir_node.into_inner()
977 }
978
979 fn make(location: L, ir_node: HydroNode) -> Self::Out {
980 Optional::new(location, ir_node)
981 }
982}
983
984#[cfg(test)]
985mod tests {
986 use futures::{SinkExt, StreamExt};
987 use hydro_deploy::Deployment;
988 use stageleft::q;
989
990 use crate::compile::builder::FlowBuilder;
991 use crate::location::Location;
992 use crate::nondet::nondet;
993
994 #[tokio::test]
995 async fn tick_cycle_cardinality() {
996 let mut deployment = Deployment::new();
997
998 let flow = FlowBuilder::new();
999 let node = flow.process::<()>();
1000 let external = flow.external::<()>();
1001
1002 let (input_send, input) = node.source_external_bincode(&external);
1003
1004 let node_tick = node.tick();
1005 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1006 let counts = singleton
1007 .clone()
1008 .into_stream()
1009 .count()
1010 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1011 .all_ticks()
1012 .send_bincode_external(&external);
1013 complete_cycle.complete_next_tick(singleton);
1014
1015 let nodes = flow
1016 .with_process(&node, deployment.Localhost())
1017 .with_external(&external, deployment.Localhost())
1018 .deploy(&mut deployment);
1019
1020 deployment.deploy().await.unwrap();
1021
1022 let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
1023 let mut external_out = nodes.connect_source_bincode(counts).await;
1024
1025 deployment.start().await.unwrap();
1026
1027 tick_trigger.send(()).await.unwrap();
1028
1029 assert_eq!(external_out.next().await.unwrap(), 1);
1030
1031 tick_trigger.send(()).await.unwrap();
1032
1033 assert_eq!(external_out.next().await.unwrap(), 1);
1034 }
1035}