hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, DeferTick, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::NonDet;
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46    L: Location<'a>,
47{
48    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49        Singleton::new(singleton.location, singleton.ir_node.into_inner())
50    }
51}
52
53impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    fn defer_tick(self) -> Self {
58        Singleton::defer_tick(self)
59    }
60}
61
62impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
63where
64    L: Location<'a>,
65{
66    type Location = Tick<L>;
67
68    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
69        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
70            location.clone(),
71            HydroNode::DeferTick {
72                input: Box::new(HydroNode::CycleSource {
73                    ident,
74                    metadata: location.new_node_metadata::<T>(),
75                }),
76                metadata: location.new_node_metadata::<T>(),
77            },
78        );
79
80        from_previous_tick.unwrap_or(initial)
81    }
82}
83
84impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
85where
86    L: Location<'a>,
87{
88    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
89        assert_eq!(
90            Location::id(&self.location),
91            expected_location,
92            "locations do not match"
93        );
94        self.location
95            .flow_state()
96            .borrow_mut()
97            .push_root(HydroRoot::CycleSink {
98                ident,
99                input: Box::new(self.ir_node.into_inner()),
100                out_location: Location::id(&self.location),
101                op_metadata: HydroIrOpMetadata::new(),
102            });
103    }
104}
105
106impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
107where
108    L: Location<'a>,
109{
110    type Location = Tick<L>;
111
112    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
113        Singleton::new(
114            location.clone(),
115            HydroNode::CycleSource {
116                ident,
117                metadata: location.new_node_metadata::<T>(),
118            },
119        )
120    }
121}
122
123impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
124where
125    L: Location<'a>,
126{
127    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
128        assert_eq!(
129            Location::id(&self.location),
130            expected_location,
131            "locations do not match"
132        );
133        self.location
134            .flow_state()
135            .borrow_mut()
136            .push_root(HydroRoot::CycleSink {
137                ident,
138                input: Box::new(self.ir_node.into_inner()),
139                out_location: Location::id(&self.location),
140                op_metadata: HydroIrOpMetadata::new(),
141            });
142    }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
146where
147    L: Location<'a> + NoTick,
148{
149    type Location = L;
150
151    fn create_source(ident: syn::Ident, location: L) -> Self {
152        Singleton::new(
153            location.clone(),
154            HydroNode::Persist {
155                inner: Box::new(HydroNode::CycleSource {
156                    ident,
157                    metadata: location.new_node_metadata::<T>(),
158                }),
159                metadata: location.new_node_metadata::<T>(),
160            },
161        )
162    }
163}
164
165impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
166where
167    L: Location<'a> + NoTick,
168{
169    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
170        assert_eq!(
171            Location::id(&self.location),
172            expected_location,
173            "locations do not match"
174        );
175        let metadata = self.location.new_node_metadata::<T>();
176        self.location
177            .flow_state()
178            .borrow_mut()
179            .push_root(HydroRoot::CycleSink {
180                ident,
181                input: Box::new(HydroNode::Unpersist {
182                    inner: Box::new(self.ir_node.into_inner()),
183                    metadata: metadata.clone(),
184                }),
185                out_location: Location::id(&self.location),
186                op_metadata: HydroIrOpMetadata::new(),
187            });
188    }
189}
190
191impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
192where
193    T: Clone,
194    L: Location<'a>,
195{
196    fn clone(&self) -> Self {
197        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
198            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
199            *self.ir_node.borrow_mut() = HydroNode::Tee {
200                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
201                metadata: self.location.new_node_metadata::<T>(),
202            };
203        }
204
205        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
206            Singleton {
207                location: self.location.clone(),
208                ir_node: HydroNode::Tee {
209                    inner: TeeNode(inner.0.clone()),
210                    metadata: metadata.clone(),
211                }
212                .into(),
213                _phantom: PhantomData,
214            }
215        } else {
216            unreachable!()
217        }
218    }
219}
220
221impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
222where
223    L: Location<'a>,
224{
225    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
226        Singleton {
227            location,
228            ir_node: RefCell::new(ir_node),
229            _phantom: PhantomData,
230        }
231    }
232
233    /// Transforms the singleton value by applying a function `f` to it,
234    /// continuously as the input is updated.
235    ///
236    /// # Example
237    /// ```rust
238    /// # use hydro_lang::prelude::*;
239    /// # use futures::StreamExt;
240    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
241    /// let tick = process.tick();
242    /// let singleton = tick.singleton(q!(5));
243    /// singleton.map(q!(|v| v * 2)).all_ticks()
244    /// # }, |mut stream| async move {
245    /// // 10
246    /// # assert_eq!(stream.next().await.unwrap(), 10);
247    /// # }));
248    /// ```
249    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
250    where
251        F: Fn(T) -> U + 'a,
252    {
253        let f = f.splice_fn1_ctx(&self.location).into();
254        Singleton::new(
255            self.location.clone(),
256            HydroNode::Map {
257                f,
258                input: Box::new(self.ir_node.into_inner()),
259                metadata: self.location.new_node_metadata::<U>(),
260            },
261        )
262    }
263
264    /// Transforms the singleton value by applying a function `f` to it and then flattening
265    /// the result into a stream, preserving the order of elements.
266    ///
267    /// The function `f` is applied to the singleton value to produce an iterator, and all items
268    /// from that iterator are emitted in the output stream in deterministic order.
269    ///
270    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
271    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
272    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
273    ///
274    /// # Example
275    /// ```rust
276    /// # use hydro_lang::prelude::*;
277    /// # use futures::StreamExt;
278    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
279    /// let tick = process.tick();
280    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
281    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
282    /// # }, |mut stream| async move {
283    /// // 1, 2, 3
284    /// # for w in vec![1, 2, 3] {
285    /// #     assert_eq!(stream.next().await.unwrap(), w);
286    /// # }
287    /// # }));
288    /// ```
289    pub fn flat_map_ordered<U, I, F>(
290        self,
291        f: impl IntoQuotedMut<'a, F, L>,
292    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
293    where
294        I: IntoIterator<Item = U>,
295        F: Fn(T) -> I + 'a,
296    {
297        let f = f.splice_fn1_ctx(&self.location).into();
298        Stream::new(
299            self.location.clone(),
300            HydroNode::FlatMap {
301                f,
302                input: Box::new(self.ir_node.into_inner()),
303                metadata: self.location.new_node_metadata::<U>(),
304            },
305        )
306    }
307
308    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
309    /// for the output type `I` to produce items in any order.
310    ///
311    /// The function `f` is applied to the singleton value to produce an iterator, and all items
312    /// from that iterator are emitted in the output stream in non-deterministic order.
313    ///
314    /// # Example
315    /// ```rust
316    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
317    /// # use futures::StreamExt;
318    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
319    /// let tick = process.tick();
320    /// let singleton = tick.singleton(q!(
321    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
322    /// ));
323    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
324    /// # }, |mut stream| async move {
325    /// // 1, 2, 3, but in no particular order
326    /// # let mut results = Vec::new();
327    /// # for _ in 0..3 {
328    /// #     results.push(stream.next().await.unwrap());
329    /// # }
330    /// # results.sort();
331    /// # assert_eq!(results, vec![1, 2, 3]);
332    /// # }));
333    /// ```
334    pub fn flat_map_unordered<U, I, F>(
335        self,
336        f: impl IntoQuotedMut<'a, F, L>,
337    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
338    where
339        I: IntoIterator<Item = U>,
340        F: Fn(T) -> I + 'a,
341    {
342        let f = f.splice_fn1_ctx(&self.location).into();
343        Stream::new(
344            self.location.clone(),
345            HydroNode::FlatMap {
346                f,
347                input: Box::new(self.ir_node.into_inner()),
348                metadata: self.location.new_node_metadata::<U>(),
349            },
350        )
351    }
352
353    /// Flattens the singleton value into a stream, preserving the order of elements.
354    ///
355    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
356    /// are emitted in the output stream in deterministic order.
357    ///
358    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
359    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
360    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
361    ///
362    /// # Example
363    /// ```rust
364    /// # use hydro_lang::prelude::*;
365    /// # use futures::StreamExt;
366    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367    /// let tick = process.tick();
368    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
369    /// singleton.flatten_ordered().all_ticks()
370    /// # }, |mut stream| async move {
371    /// // 1, 2, 3
372    /// # for w in vec![1, 2, 3] {
373    /// #     assert_eq!(stream.next().await.unwrap(), w);
374    /// # }
375    /// # }));
376    /// ```
377    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
378    where
379        T: IntoIterator<Item = U>,
380    {
381        self.flat_map_ordered(q!(|x| x))
382    }
383
384    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
385    /// for the element type `T` to produce items in any order.
386    ///
387    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
388    /// are emitted in the output stream in non-deterministic order.
389    ///
390    /// # Example
391    /// ```rust
392    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
393    /// # use futures::StreamExt;
394    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
395    /// let tick = process.tick();
396    /// let singleton = tick.singleton(q!(
397    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
398    /// ));
399    /// singleton.flatten_unordered().all_ticks()
400    /// # }, |mut stream| async move {
401    /// // 1, 2, 3, but in no particular order
402    /// # let mut results = Vec::new();
403    /// # for _ in 0..3 {
404    /// #     results.push(stream.next().await.unwrap());
405    /// # }
406    /// # results.sort();
407    /// # assert_eq!(results, vec![1, 2, 3]);
408    /// # }));
409    /// ```
410    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
411    where
412        T: IntoIterator<Item = U>,
413    {
414        self.flat_map_unordered(q!(|x| x))
415    }
416
417    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
418    ///
419    /// If the predicate returns `true`, the output optional contains the same value.
420    /// If the predicate returns `false`, the output optional is empty.
421    ///
422    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
423    /// not modify or take ownership of the value. If you need to modify the value while filtering
424    /// use [`Singleton::filter_map`] instead.
425    ///
426    /// # Example
427    /// ```rust
428    /// # use hydro_lang::prelude::*;
429    /// # use futures::StreamExt;
430    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
431    /// let tick = process.tick();
432    /// let singleton = tick.singleton(q!(5));
433    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
434    /// # }, |mut stream| async move {
435    /// // 5
436    /// # assert_eq!(stream.next().await.unwrap(), 5);
437    /// # }));
438    /// ```
439    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
440    where
441        F: Fn(&T) -> bool + 'a,
442    {
443        let f = f.splice_fn1_borrow_ctx(&self.location).into();
444        Optional::new(
445            self.location.clone(),
446            HydroNode::Filter {
447                f,
448                input: Box::new(self.ir_node.into_inner()),
449                metadata: self.location.new_node_metadata::<T>(),
450            },
451        )
452    }
453
454    /// An operator that both filters and maps. It yields the value only if the supplied
455    /// closure `f` returns `Some(value)`.
456    ///
457    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
458    /// If the closure returns `None`, the output optional is empty.
459    ///
460    /// # Example
461    /// ```rust
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// let tick = process.tick();
466    /// let singleton = tick.singleton(q!("42"));
467    /// singleton
468    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
469    ///     .all_ticks()
470    /// # }, |mut stream| async move {
471    /// // 42
472    /// # assert_eq!(stream.next().await.unwrap(), 42);
473    /// # }));
474    /// ```
475    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
476    where
477        F: Fn(T) -> Option<U> + 'a,
478    {
479        let f = f.splice_fn1_ctx(&self.location).into();
480        Optional::new(
481            self.location.clone(),
482            HydroNode::FilterMap {
483                f,
484                input: Box::new(self.ir_node.into_inner()),
485                metadata: self.location.new_node_metadata::<U>(),
486            },
487        )
488    }
489
490    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
491    ///
492    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
493    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
494    /// non-null. This is useful for combining several pieces of state together.
495    ///
496    /// # Example
497    /// ```rust
498    /// # use hydro_lang::prelude::*;
499    /// # use futures::StreamExt;
500    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
501    /// let tick = process.tick();
502    /// let numbers = process
503    ///   .source_iter(q!(vec![123, 456]))
504    ///   .batch(&tick, nondet!(/** test */));
505    /// let count = numbers.clone().count(); // Singleton
506    /// let max = numbers.max(); // Optional
507    /// count.zip(max).all_ticks()
508    /// # }, |mut stream| async move {
509    /// // [(2, 456)]
510    /// # for w in vec![(2, 456)] {
511    /// #     assert_eq!(stream.next().await.unwrap(), w);
512    /// # }
513    /// # }));
514    /// ```
515    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
516    where
517        Self: ZipResult<'a, O, Location = L>,
518    {
519        check_matching_location(&self.location, &Self::other_location(&other));
520
521        if L::is_top_level() {
522            let left_ir_node = self.ir_node.into_inner();
523            let left_ir_node_metadata = left_ir_node.metadata().clone();
524            let right_ir_node = Self::other_ir_node(other);
525            let right_ir_node_metadata = right_ir_node.metadata().clone();
526
527            Self::make(
528                self.location.clone(),
529                HydroNode::Persist {
530                    inner: Box::new(HydroNode::CrossSingleton {
531                        left: Box::new(HydroNode::Unpersist {
532                            inner: Box::new(left_ir_node),
533                            metadata: left_ir_node_metadata,
534                        }),
535                        right: Box::new(HydroNode::Unpersist {
536                            inner: Box::new(right_ir_node),
537                            metadata: right_ir_node_metadata,
538                        }),
539                        metadata: self
540                            .location
541                            .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
542                    }),
543                    metadata: self
544                        .location
545                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
546                },
547            )
548        } else {
549            Self::make(
550                self.location.clone(),
551                HydroNode::CrossSingleton {
552                    left: Box::new(self.ir_node.into_inner()),
553                    right: Box::new(Self::other_ir_node(other)),
554                    metadata: self
555                        .location
556                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
557                },
558            )
559        }
560    }
561
562    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
563    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
564    ///
565    /// Useful for conditionally processing, such as only emitting a singleton's value outside
566    /// a tick if some other condition is satisfied.
567    ///
568    /// # Example
569    /// ```rust
570    /// # use hydro_lang::prelude::*;
571    /// # use futures::StreamExt;
572    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573    /// let tick = process.tick();
574    /// // ticks are lazy by default, forces the second tick to run
575    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
576    ///
577    /// let batch_first_tick = process
578    ///   .source_iter(q!(vec![1]))
579    ///   .batch(&tick, nondet!(/** test */));
580    /// let batch_second_tick = process
581    ///   .source_iter(q!(vec![1, 2, 3]))
582    ///   .batch(&tick, nondet!(/** test */))
583    ///   .defer_tick(); // appears on the second tick
584    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
585    /// batch_first_tick.chain(batch_second_tick).count()
586    ///   .filter_if_some(some_on_first_tick)
587    ///   .all_ticks()
588    /// # }, |mut stream| async move {
589    /// // [1]
590    /// # for w in vec![1] {
591    /// #     assert_eq!(stream.next().await.unwrap(), w);
592    /// # }
593    /// # }));
594    /// ```
595    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
596        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
597            .map(q!(|(d, _signal)| d))
598    }
599
600    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
601    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
602    ///
603    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
604    /// the condition.
605    ///
606    /// # Example
607    /// ```rust
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// let tick = process.tick();
612    /// // ticks are lazy by default, forces the second tick to run
613    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
614    ///
615    /// let batch_first_tick = process
616    ///   .source_iter(q!(vec![1]))
617    ///   .batch(&tick, nondet!(/** test */));
618    /// let batch_second_tick = process
619    ///   .source_iter(q!(vec![1, 2, 3]))
620    ///   .batch(&tick, nondet!(/** test */))
621    ///   .defer_tick(); // appears on the second tick
622    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
623    /// batch_first_tick.chain(batch_second_tick).count()
624    ///   .filter_if_none(some_on_first_tick)
625    ///   .all_ticks()
626    /// # }, |mut stream| async move {
627    /// // [3]
628    /// # for w in vec![3] {
629    /// #     assert_eq!(stream.next().await.unwrap(), w);
630    /// # }
631    /// # }));
632    /// ```
633    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
634        self.filter_if_some(
635            other
636                .map(q!(|_| ()))
637                .into_singleton()
638                .filter(q!(|o| o.is_none())),
639        )
640    }
641
642    /// An operator which allows you to "name" a `HydroNode`.
643    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
644    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
645        {
646            let mut node = self.ir_node.borrow_mut();
647            let metadata = node.metadata_mut();
648            metadata.tag = Some(name.to_string());
649        }
650        self
651    }
652}
653
654impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
655where
656    L: Location<'a> + NoTick,
657{
658    /// Returns a singleton value corresponding to the latest snapshot of the singleton
659    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
660    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
661    /// all snapshots of this singleton into the atomic-associated tick will observe the
662    /// same value each tick.
663    ///
664    /// # Non-Determinism
665    /// Because this picks a snapshot of a singleton whose value is continuously changing,
666    /// the output singleton has a non-deterministic value since the snapshot can be at an
667    /// arbitrary point in time.
668    pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
669        Singleton::new(
670            self.location.clone().tick,
671            HydroNode::Unpersist {
672                inner: Box::new(self.ir_node.into_inner()),
673                metadata: self.location.new_node_metadata::<T>(),
674            },
675        )
676    }
677
678    /// Returns this singleton back into a top-level, asynchronous execution context where updates
679    /// to the value will be asynchronously propagated.
680    pub fn end_atomic(self) -> Optional<T, L, B> {
681        Optional::new(self.location.tick.l, self.ir_node.into_inner())
682    }
683}
684
685impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
686where
687    L: Location<'a> + NoTick + NoAtomic,
688{
689    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
690    /// will observe the same version of the value and will be executed synchronously before any
691    /// outputs are yielded (in [`Optional::end_atomic`]).
692    ///
693    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
694    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
695    /// a different version).
696    ///
697    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
698    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
699    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
700    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
701        Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
702    }
703
704    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
705    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
706    /// relevant data that contributed to the snapshot at tick `t`.
707    ///
708    /// # Non-Determinism
709    /// Because this picks a snapshot of a singleton whose value is continuously changing,
710    /// the output singleton has a non-deterministic value since the snapshot can be at an
711    /// arbitrary point in time.
712    pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
713    where
714        L: NoTick,
715    {
716        self.atomic(tick).snapshot(nondet)
717    }
718
719    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
720    /// with order corresponding to increasing prefixes of data contributing to the singleton.
721    ///
722    /// # Non-Determinism
723    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
724    /// to non-deterministic batching and arrival of inputs, the output stream is
725    /// non-deterministic.
726    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
727        let tick = self.location.tick();
728        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
729    }
730
731    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
732    /// value taken at various points in time. Because the input singleton may be
733    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
734    /// represent the value of the singleton given some prefix of the streams leading up to
735    /// it.
736    ///
737    /// # Non-Determinism
738    /// The output stream is non-deterministic in which elements are sampled, since this
739    /// is controlled by a clock.
740    pub fn sample_every(
741        self,
742        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
743        nondet: NonDet,
744    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
745        let samples = self.location.source_interval(interval, nondet);
746        let tick = self.location.tick();
747
748        self.snapshot(&tick, nondet)
749            .filter_if_some(samples.batch(&tick, nondet).first())
750            .all_ticks()
751            .weakest_retries()
752    }
753}
754
755impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
756where
757    L: Location<'a>,
758{
759    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
760    /// which will stream the value computed in _each_ tick as a separate stream element.
761    ///
762    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
763    /// producing one element in the output for each tick. This is useful for batched computations,
764    /// where the results from each tick must be combined together.
765    ///
766    /// # Example
767    /// ```rust
768    /// # use hydro_lang::prelude::*;
769    /// # use futures::StreamExt;
770    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
771    /// let tick = process.tick();
772    /// # // ticks are lazy by default, forces the second tick to run
773    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
774    /// # let batch_first_tick = process
775    /// #   .source_iter(q!(vec![1]))
776    /// #   .batch(&tick, nondet!(/** test */));
777    /// # let batch_second_tick = process
778    /// #   .source_iter(q!(vec![1, 2, 3]))
779    /// #   .batch(&tick, nondet!(/** test */))
780    /// #   .defer_tick(); // appears on the second tick
781    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
782    /// input_batch // first tick: [1], second tick: [1, 2, 3]
783    ///     .count()
784    ///     .all_ticks()
785    /// # }, |mut stream| async move {
786    /// // [1, 3]
787    /// # for w in vec![1, 3] {
788    /// #     assert_eq!(stream.next().await.unwrap(), w);
789    /// # }
790    /// # }));
791    /// ```
792    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
793        self.into_stream().all_ticks()
794    }
795
796    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
797    /// which will stream the value computed in _each_ tick as a separate stream element.
798    ///
799    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
800    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
801    /// singleton's [`Tick`] context.
802    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
803        self.into_stream().all_ticks_atomic()
804    }
805
806    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
807    /// be asynchronously updated with the latest value of the singleton inside the tick.
808    ///
809    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
810    /// tick that tracks the inner value. This is useful for getting the value as of the
811    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
812    ///
813    /// # Example
814    /// ```rust
815    /// # use hydro_lang::prelude::*;
816    /// # use futures::StreamExt;
817    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818    /// let tick = process.tick();
819    /// # // ticks are lazy by default, forces the second tick to run
820    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
821    /// # let batch_first_tick = process
822    /// #   .source_iter(q!(vec![1]))
823    /// #   .batch(&tick, nondet!(/** test */));
824    /// # let batch_second_tick = process
825    /// #   .source_iter(q!(vec![1, 2, 3]))
826    /// #   .batch(&tick, nondet!(/** test */))
827    /// #   .defer_tick(); // appears on the second tick
828    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
829    /// input_batch // first tick: [1], second tick: [1, 2, 3]
830    ///     .count()
831    ///     .latest()
832    /// # .sample_eager(nondet!(/** test */))
833    /// # }, |mut stream| async move {
834    /// // asynchronously changes from 1 ~> 3
835    /// # for w in vec![1, 3] {
836    /// #     assert_eq!(stream.next().await.unwrap(), w);
837    /// # }
838    /// # }));
839    /// ```
840    pub fn latest(self) -> Singleton<T, L, Unbounded> {
841        Singleton::new(
842            self.location.outer().clone(),
843            HydroNode::Persist {
844                inner: Box::new(self.ir_node.into_inner()),
845                metadata: self.location.new_node_metadata::<T>(),
846            },
847        )
848    }
849
850    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
851    /// be updated with the latest value of the singleton inside the tick.
852    ///
853    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
854    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
855    /// singleton's [`Tick`] context.
856    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
857        Singleton::new(
858            Atomic {
859                tick: self.location.clone(),
860            },
861            HydroNode::Persist {
862                inner: Box::new(self.ir_node.into_inner()),
863                metadata: self.location.new_node_metadata::<T>(),
864            },
865        )
866    }
867
868    #[expect(missing_docs, reason = "TODO")]
869    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
870        Singleton::new(
871            self.location.clone(),
872            HydroNode::DeferTick {
873                input: Box::new(self.ir_node.into_inner()),
874                metadata: self.location.new_node_metadata::<T>(),
875            },
876        )
877    }
878
879    #[deprecated(note = "use .into_stream().persist()")]
880    #[expect(missing_docs, reason = "deprecated")]
881    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
882        Stream::new(
883            self.location.clone(),
884            HydroNode::Persist {
885                inner: Box::new(self.ir_node.into_inner()),
886                metadata: self.location.new_node_metadata::<T>(),
887            },
888        )
889    }
890
891    /// Converts this singleton into a [`Stream`] containing a single element, the value.
892    ///
893    /// # Example
894    /// ```rust
895    /// # use hydro_lang::prelude::*;
896    /// # use futures::StreamExt;
897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
898    /// let tick = process.tick();
899    /// let batch_input = process
900    ///   .source_iter(q!(vec![123, 456]))
901    ///   .batch(&tick, nondet!(/** test */));
902    /// batch_input.clone().chain(
903    ///   batch_input.count().into_stream()
904    /// ).all_ticks()
905    /// # }, |mut stream| async move {
906    /// // [123, 456, 2]
907    /// # for w in vec![123, 456, 2] {
908    /// #     assert_eq!(stream.next().await.unwrap(), w);
909    /// # }
910    /// # }));
911    /// ```
912    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
913        Stream::new(self.location, self.ir_node.into_inner())
914    }
915}
916
917#[doc(hidden)]
918/// Helper trait that determines the output collection type for [`Singleton::zip`].
919///
920/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
921/// [`Singleton`].
922#[sealed::sealed]
923pub trait ZipResult<'a, Other> {
924    /// The output collection type.
925    type Out;
926    /// The type of the tupled output value.
927    type ElementType;
928    /// The location where the tupled result will be materialized.
929    type Location: Location<'a>;
930
931    /// The location of the second input to the `zip`.
932    fn other_location(other: &Other) -> Self::Location;
933    /// The IR node of the second input to the `zip`.
934    fn other_ir_node(other: Other) -> HydroNode;
935
936    /// Constructs the output live collection given an IR node containing the zip result.
937    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
938}
939
940#[sealed::sealed]
941impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
942where
943    L: Location<'a>,
944{
945    type Out = Singleton<(T, U), L, B>;
946    type ElementType = (T, U);
947    type Location = L;
948
949    fn other_location(other: &Singleton<U, L, B>) -> L {
950        other.location.clone()
951    }
952
953    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
954        other.ir_node.into_inner()
955    }
956
957    fn make(location: L, ir_node: HydroNode) -> Self::Out {
958        Singleton::new(location, ir_node)
959    }
960}
961
962#[sealed::sealed]
963impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
964where
965    L: Location<'a>,
966{
967    type Out = Optional<(T, U), L, B>;
968    type ElementType = (T, U);
969    type Location = L;
970
971    fn other_location(other: &Optional<U, L, B>) -> L {
972        other.location.clone()
973    }
974
975    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
976        other.ir_node.into_inner()
977    }
978
979    fn make(location: L, ir_node: HydroNode) -> Self::Out {
980        Optional::new(location, ir_node)
981    }
982}
983
984#[cfg(test)]
985mod tests {
986    use futures::{SinkExt, StreamExt};
987    use hydro_deploy::Deployment;
988    use stageleft::q;
989
990    use crate::compile::builder::FlowBuilder;
991    use crate::location::Location;
992    use crate::nondet::nondet;
993
994    #[tokio::test]
995    async fn tick_cycle_cardinality() {
996        let mut deployment = Deployment::new();
997
998        let flow = FlowBuilder::new();
999        let node = flow.process::<()>();
1000        let external = flow.external::<()>();
1001
1002        let (input_send, input) = node.source_external_bincode(&external);
1003
1004        let node_tick = node.tick();
1005        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1006        let counts = singleton
1007            .clone()
1008            .into_stream()
1009            .count()
1010            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1011            .all_ticks()
1012            .send_bincode_external(&external);
1013        complete_cycle.complete_next_tick(singleton);
1014
1015        let nodes = flow
1016            .with_process(&node, deployment.Localhost())
1017            .with_external(&external, deployment.Localhost())
1018            .deploy(&mut deployment);
1019
1020        deployment.deploy().await.unwrap();
1021
1022        let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
1023        let mut external_out = nodes.connect_source_bincode(counts).await;
1024
1025        deployment.start().await.unwrap();
1026
1027        tick_trigger.send(()).await.unwrap();
1028
1029        assert_eq!(external_out.next().await.unwrap(), 1);
1030
1031        tick_trigger.send(()).await.unwrap();
1032
1033        assert_eq!(external_out.next().await.unwrap(), 1);
1034    }
1035}