hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46    L: Location<'a>,
47{
48    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49        Singleton::new(singleton.location, singleton.ir_node.into_inner())
50    }
51}
52
53impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
60        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
61            location.clone(),
62            HydroNode::DeferTick {
63                input: Box::new(HydroNode::CycleSource {
64                    ident,
65                    metadata: location.new_node_metadata(Self::collection_kind()),
66                }),
67                metadata: location
68                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
69            },
70        );
71
72        from_previous_tick.unwrap_or(initial)
73    }
74}
75
76impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
77where
78    L: Location<'a>,
79{
80    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
81        assert_eq!(
82            Location::id(&self.location),
83            expected_location,
84            "locations do not match"
85        );
86        self.location
87            .flow_state()
88            .borrow_mut()
89            .push_root(HydroRoot::CycleSink {
90                ident,
91                input: Box::new(self.ir_node.into_inner()),
92                op_metadata: HydroIrOpMetadata::new(),
93            });
94    }
95}
96
97impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
104        Singleton::new(
105            location.clone(),
106            HydroNode::CycleSource {
107                ident,
108                metadata: location.new_node_metadata(Self::collection_kind()),
109            },
110        )
111    }
112}
113
114impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116    L: Location<'a>,
117{
118    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
119        assert_eq!(
120            Location::id(&self.location),
121            expected_location,
122            "locations do not match"
123        );
124        self.location
125            .flow_state()
126            .borrow_mut()
127            .push_root(HydroRoot::CycleSink {
128                ident,
129                input: Box::new(self.ir_node.into_inner()),
130                op_metadata: HydroIrOpMetadata::new(),
131            });
132    }
133}
134
135impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
136where
137    L: Location<'a> + NoTick,
138{
139    type Location = L;
140
141    fn create_source(ident: syn::Ident, location: L) -> Self {
142        Singleton::new(
143            location.clone(),
144            HydroNode::CycleSource {
145                ident,
146                metadata: location.new_node_metadata(Self::collection_kind()),
147            },
148        )
149    }
150}
151
152impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
174where
175    T: Clone,
176    L: Location<'a>,
177{
178    fn clone(&self) -> Self {
179        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
180            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
181            *self.ir_node.borrow_mut() = HydroNode::Tee {
182                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
183                metadata: self.location.new_node_metadata(Self::collection_kind()),
184            };
185        }
186
187        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
188            Singleton {
189                location: self.location.clone(),
190                ir_node: HydroNode::Tee {
191                    inner: TeeNode(inner.0.clone()),
192                    metadata: metadata.clone(),
193                }
194                .into(),
195                _phantom: PhantomData,
196            }
197        } else {
198            unreachable!()
199        }
200    }
201}
202
203#[cfg(stageleft_runtime)]
204fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
205    me: Singleton<T, Tick<L>, B>,
206    other: O,
207) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
208where
209    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
210{
211    check_matching_location(
212        &me.location,
213        &Singleton::<T, Tick<L>, B>::other_location(&other),
214    );
215
216    Singleton::<T, Tick<L>, B>::make(
217        me.location.clone(),
218        HydroNode::CrossSingleton {
219            left: Box::new(me.ir_node.into_inner()),
220            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
221            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
222                bound: B::BOUND_KIND,
223                element_type: stageleft::quote_type::<
224                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
225                >()
226                .into(),
227            }),
228        },
229    )
230}
231
232impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
233where
234    L: Location<'a>,
235{
236    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
237        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
238        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
239        Singleton {
240            location,
241            ir_node: RefCell::new(ir_node),
242            _phantom: PhantomData,
243        }
244    }
245
246    pub(crate) fn collection_kind() -> CollectionKind {
247        CollectionKind::Singleton {
248            bound: B::BOUND_KIND,
249            element_type: stageleft::quote_type::<T>().into(),
250        }
251    }
252
253    /// Transforms the singleton value by applying a function `f` to it,
254    /// continuously as the input is updated.
255    ///
256    /// # Example
257    /// ```rust
258    /// # use hydro_lang::prelude::*;
259    /// # use futures::StreamExt;
260    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
261    /// let tick = process.tick();
262    /// let singleton = tick.singleton(q!(5));
263    /// singleton.map(q!(|v| v * 2)).all_ticks()
264    /// # }, |mut stream| async move {
265    /// // 10
266    /// # assert_eq!(stream.next().await.unwrap(), 10);
267    /// # }));
268    /// ```
269    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
270    where
271        F: Fn(T) -> U + 'a,
272    {
273        let f = f.splice_fn1_ctx(&self.location).into();
274        Singleton::new(
275            self.location.clone(),
276            HydroNode::Map {
277                f,
278                input: Box::new(self.ir_node.into_inner()),
279                metadata: self
280                    .location
281                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
282            },
283        )
284    }
285
286    /// Transforms the singleton value by applying a function `f` to it and then flattening
287    /// the result into a stream, preserving the order of elements.
288    ///
289    /// The function `f` is applied to the singleton value to produce an iterator, and all items
290    /// from that iterator are emitted in the output stream in deterministic order.
291    ///
292    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
293    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
294    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
295    ///
296    /// # Example
297    /// ```rust
298    /// # use hydro_lang::prelude::*;
299    /// # use futures::StreamExt;
300    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
301    /// let tick = process.tick();
302    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
303    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
304    /// # }, |mut stream| async move {
305    /// // 1, 2, 3
306    /// # for w in vec![1, 2, 3] {
307    /// #     assert_eq!(stream.next().await.unwrap(), w);
308    /// # }
309    /// # }));
310    /// ```
311    pub fn flat_map_ordered<U, I, F>(
312        self,
313        f: impl IntoQuotedMut<'a, F, L>,
314    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
315    where
316        I: IntoIterator<Item = U>,
317        F: Fn(T) -> I + 'a,
318    {
319        let f = f.splice_fn1_ctx(&self.location).into();
320        Stream::new(
321            self.location.clone(),
322            HydroNode::FlatMap {
323                f,
324                input: Box::new(self.ir_node.into_inner()),
325                metadata: self.location.new_node_metadata(
326                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
327                ),
328            },
329        )
330    }
331
332    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
333    /// for the output type `I` to produce items in any order.
334    ///
335    /// The function `f` is applied to the singleton value to produce an iterator, and all items
336    /// from that iterator are emitted in the output stream in non-deterministic order.
337    ///
338    /// # Example
339    /// ```rust
340    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
341    /// # use futures::StreamExt;
342    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
343    /// let tick = process.tick();
344    /// let singleton = tick.singleton(q!(
345    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
346    /// ));
347    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
348    /// # }, |mut stream| async move {
349    /// // 1, 2, 3, but in no particular order
350    /// # let mut results = Vec::new();
351    /// # for _ in 0..3 {
352    /// #     results.push(stream.next().await.unwrap());
353    /// # }
354    /// # results.sort();
355    /// # assert_eq!(results, vec![1, 2, 3]);
356    /// # }));
357    /// ```
358    pub fn flat_map_unordered<U, I, F>(
359        self,
360        f: impl IntoQuotedMut<'a, F, L>,
361    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
362    where
363        I: IntoIterator<Item = U>,
364        F: Fn(T) -> I + 'a,
365    {
366        let f = f.splice_fn1_ctx(&self.location).into();
367        Stream::new(
368            self.location.clone(),
369            HydroNode::FlatMap {
370                f,
371                input: Box::new(self.ir_node.into_inner()),
372                metadata: self
373                    .location
374                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
375            },
376        )
377    }
378
379    /// Flattens the singleton value into a stream, preserving the order of elements.
380    ///
381    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
382    /// are emitted in the output stream in deterministic order.
383    ///
384    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
385    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
386    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
387    ///
388    /// # Example
389    /// ```rust
390    /// # use hydro_lang::prelude::*;
391    /// # use futures::StreamExt;
392    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
393    /// let tick = process.tick();
394    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
395    /// singleton.flatten_ordered().all_ticks()
396    /// # }, |mut stream| async move {
397    /// // 1, 2, 3
398    /// # for w in vec![1, 2, 3] {
399    /// #     assert_eq!(stream.next().await.unwrap(), w);
400    /// # }
401    /// # }));
402    /// ```
403    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
404    where
405        T: IntoIterator<Item = U>,
406    {
407        self.flat_map_ordered(q!(|x| x))
408    }
409
410    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
411    /// for the element type `T` to produce items in any order.
412    ///
413    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
414    /// are emitted in the output stream in non-deterministic order.
415    ///
416    /// # Example
417    /// ```rust
418    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
419    /// # use futures::StreamExt;
420    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
421    /// let tick = process.tick();
422    /// let singleton = tick.singleton(q!(
423    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
424    /// ));
425    /// singleton.flatten_unordered().all_ticks()
426    /// # }, |mut stream| async move {
427    /// // 1, 2, 3, but in no particular order
428    /// # let mut results = Vec::new();
429    /// # for _ in 0..3 {
430    /// #     results.push(stream.next().await.unwrap());
431    /// # }
432    /// # results.sort();
433    /// # assert_eq!(results, vec![1, 2, 3]);
434    /// # }));
435    /// ```
436    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
437    where
438        T: IntoIterator<Item = U>,
439    {
440        self.flat_map_unordered(q!(|x| x))
441    }
442
443    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
444    ///
445    /// If the predicate returns `true`, the output optional contains the same value.
446    /// If the predicate returns `false`, the output optional is empty.
447    ///
448    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
449    /// not modify or take ownership of the value. If you need to modify the value while filtering
450    /// use [`Singleton::filter_map`] instead.
451    ///
452    /// # Example
453    /// ```rust
454    /// # use hydro_lang::prelude::*;
455    /// # use futures::StreamExt;
456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457    /// let tick = process.tick();
458    /// let singleton = tick.singleton(q!(5));
459    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
460    /// # }, |mut stream| async move {
461    /// // 5
462    /// # assert_eq!(stream.next().await.unwrap(), 5);
463    /// # }));
464    /// ```
465    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
466    where
467        F: Fn(&T) -> bool + 'a,
468    {
469        let f = f.splice_fn1_borrow_ctx(&self.location).into();
470        Optional::new(
471            self.location.clone(),
472            HydroNode::Filter {
473                f,
474                input: Box::new(self.ir_node.into_inner()),
475                metadata: self
476                    .location
477                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
478            },
479        )
480    }
481
482    /// An operator that both filters and maps. It yields the value only if the supplied
483    /// closure `f` returns `Some(value)`.
484    ///
485    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
486    /// If the closure returns `None`, the output optional is empty.
487    ///
488    /// # Example
489    /// ```rust
490    /// # use hydro_lang::prelude::*;
491    /// # use futures::StreamExt;
492    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
493    /// let tick = process.tick();
494    /// let singleton = tick.singleton(q!("42"));
495    /// singleton
496    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
497    ///     .all_ticks()
498    /// # }, |mut stream| async move {
499    /// // 42
500    /// # assert_eq!(stream.next().await.unwrap(), 42);
501    /// # }));
502    /// ```
503    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
504    where
505        F: Fn(T) -> Option<U> + 'a,
506    {
507        let f = f.splice_fn1_ctx(&self.location).into();
508        Optional::new(
509            self.location.clone(),
510            HydroNode::FilterMap {
511                f,
512                input: Box::new(self.ir_node.into_inner()),
513                metadata: self
514                    .location
515                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
516            },
517        )
518    }
519
520    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
521    ///
522    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
523    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
524    /// non-null. This is useful for combining several pieces of state together.
525    ///
526    /// # Example
527    /// ```rust
528    /// # use hydro_lang::prelude::*;
529    /// # use futures::StreamExt;
530    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531    /// let tick = process.tick();
532    /// let numbers = process
533    ///   .source_iter(q!(vec![123, 456]))
534    ///   .batch(&tick, nondet!(/** test */));
535    /// let count = numbers.clone().count(); // Singleton
536    /// let max = numbers.max(); // Optional
537    /// count.zip(max).all_ticks()
538    /// # }, |mut stream| async move {
539    /// // [(2, 456)]
540    /// # for w in vec![(2, 456)] {
541    /// #     assert_eq!(stream.next().await.unwrap(), w);
542    /// # }
543    /// # }));
544    /// ```
545    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
546    where
547        Self: ZipResult<'a, O, Location = L>,
548    {
549        check_matching_location(&self.location, &Self::other_location(&other));
550
551        if L::is_top_level()
552            && let Some(tick) = self.location.try_tick()
553        {
554            let out = zip_inside_tick(
555                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
556                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
557                    Self::other_location(&other),
558                    Self::other_ir_node(other),
559                )
560                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
561            )
562            .latest();
563
564            Self::make(out.location, out.ir_node.into_inner())
565        } else {
566            Self::make(
567                self.location.clone(),
568                HydroNode::CrossSingleton {
569                    left: Box::new(self.ir_node.into_inner()),
570                    right: Box::new(Self::other_ir_node(other)),
571                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
572                        bound: B::BOUND_KIND,
573                        element_type: stageleft::quote_type::<
574                            <Self as ZipResult<'a, O>>::ElementType,
575                        >()
576                        .into(),
577                    }),
578                },
579            )
580        }
581    }
582
583    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
584    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
585    ///
586    /// Useful for conditionally processing, such as only emitting a singleton's value outside
587    /// a tick if some other condition is satisfied.
588    ///
589    /// # Example
590    /// ```rust
591    /// # use hydro_lang::prelude::*;
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594    /// let tick = process.tick();
595    /// // ticks are lazy by default, forces the second tick to run
596    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
597    ///
598    /// let batch_first_tick = process
599    ///   .source_iter(q!(vec![1]))
600    ///   .batch(&tick, nondet!(/** test */));
601    /// let batch_second_tick = process
602    ///   .source_iter(q!(vec![1, 2, 3]))
603    ///   .batch(&tick, nondet!(/** test */))
604    ///   .defer_tick(); // appears on the second tick
605    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
606    /// batch_first_tick.chain(batch_second_tick).count()
607    ///   .filter_if_some(some_on_first_tick)
608    ///   .all_ticks()
609    /// # }, |mut stream| async move {
610    /// // [1]
611    /// # for w in vec![1] {
612    /// #     assert_eq!(stream.next().await.unwrap(), w);
613    /// # }
614    /// # }));
615    /// ```
616    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
617        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
618            .map(q!(|(d, _signal)| d))
619    }
620
621    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
622    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
623    ///
624    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
625    /// the condition.
626    ///
627    /// # Example
628    /// ```rust
629    /// # use hydro_lang::prelude::*;
630    /// # use futures::StreamExt;
631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
632    /// let tick = process.tick();
633    /// // ticks are lazy by default, forces the second tick to run
634    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
635    ///
636    /// let batch_first_tick = process
637    ///   .source_iter(q!(vec![1]))
638    ///   .batch(&tick, nondet!(/** test */));
639    /// let batch_second_tick = process
640    ///   .source_iter(q!(vec![1, 2, 3]))
641    ///   .batch(&tick, nondet!(/** test */))
642    ///   .defer_tick(); // appears on the second tick
643    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
644    /// batch_first_tick.chain(batch_second_tick).count()
645    ///   .filter_if_none(some_on_first_tick)
646    ///   .all_ticks()
647    /// # }, |mut stream| async move {
648    /// // [3]
649    /// # for w in vec![3] {
650    /// #     assert_eq!(stream.next().await.unwrap(), w);
651    /// # }
652    /// # }));
653    /// ```
654    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
655        self.filter_if_some(
656            other
657                .map(q!(|_| ()))
658                .into_singleton()
659                .filter(q!(|o| o.is_none())),
660        )
661    }
662
663    /// An operator which allows you to "name" a `HydroNode`.
664    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
665    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
666        {
667            let mut node = self.ir_node.borrow_mut();
668            let metadata = node.metadata_mut();
669            metadata.tag = Some(name.to_string());
670        }
671        self
672    }
673}
674
675impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
676where
677    L: Location<'a> + NoTick,
678{
679    /// Returns a singleton value corresponding to the latest snapshot of the singleton
680    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
681    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
682    /// all snapshots of this singleton into the atomic-associated tick will observe the
683    /// same value each tick.
684    ///
685    /// # Non-Determinism
686    /// Because this picks a snapshot of a singleton whose value is continuously changing,
687    /// the output singleton has a non-deterministic value since the snapshot can be at an
688    /// arbitrary point in time.
689    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
690        Singleton::new(
691            self.location.clone().tick,
692            HydroNode::Batch {
693                inner: Box::new(self.ir_node.into_inner()),
694                metadata: self
695                    .location
696                    .tick
697                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
698            },
699        )
700    }
701
702    /// Returns this singleton back into a top-level, asynchronous execution context where updates
703    /// to the value will be asynchronously propagated.
704    pub fn end_atomic(self) -> Singleton<T, L, B> {
705        Singleton::new(
706            self.location.tick.l.clone(),
707            HydroNode::EndAtomic {
708                inner: Box::new(self.ir_node.into_inner()),
709                metadata: self
710                    .location
711                    .tick
712                    .l
713                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
714            },
715        )
716    }
717}
718
719impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
720where
721    L: Location<'a>,
722{
723    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
724    /// will observe the same version of the value and will be executed synchronously before any
725    /// outputs are yielded (in [`Optional::end_atomic`]).
726    ///
727    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
728    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
729    /// a different version).
730    ///
731    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
732    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
733    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
734    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
735        let out_location = Atomic { tick: tick.clone() };
736        Singleton::new(
737            out_location.clone(),
738            HydroNode::BeginAtomic {
739                inner: Box::new(self.ir_node.into_inner()),
740                metadata: out_location
741                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
742            },
743        )
744    }
745
746    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
747    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
748    /// relevant data that contributed to the snapshot at tick `t`.
749    ///
750    /// # Non-Determinism
751    /// Because this picks a snapshot of a singleton whose value is continuously changing,
752    /// the output singleton has a non-deterministic value since the snapshot can be at an
753    /// arbitrary point in time.
754    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
755        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
756        Singleton::new(
757            tick.clone(),
758            HydroNode::Batch {
759                inner: Box::new(self.ir_node.into_inner()),
760                metadata: tick
761                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
762            },
763        )
764    }
765
766    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
767    /// with order corresponding to increasing prefixes of data contributing to the singleton.
768    ///
769    /// # Non-Determinism
770    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
771    /// to non-deterministic batching and arrival of inputs, the output stream is
772    /// non-deterministic.
773    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
774    where
775        L: NoTick,
776    {
777        let tick = self.location.tick();
778        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
779    }
780
781    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
782    /// value taken at various points in time. Because the input singleton may be
783    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
784    /// represent the value of the singleton given some prefix of the streams leading up to
785    /// it.
786    ///
787    /// # Non-Determinism
788    /// The output stream is non-deterministic in which elements are sampled, since this
789    /// is controlled by a clock.
790    pub fn sample_every(
791        self,
792        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
793        nondet: NonDet,
794    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
795    where
796        L: NoTick + NoAtomic,
797    {
798        let samples = self.location.source_interval(interval, nondet);
799        let tick = self.location.tick();
800
801        self.snapshot(&tick, nondet)
802            .filter_if_some(samples.batch(&tick, nondet).first())
803            .all_ticks()
804            .weakest_retries()
805    }
806}
807
808impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
809where
810    L: Location<'a>,
811{
812    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
813    /// which will stream the value computed in _each_ tick as a separate stream element.
814    ///
815    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
816    /// producing one element in the output for each tick. This is useful for batched computations,
817    /// where the results from each tick must be combined together.
818    ///
819    /// # Example
820    /// ```rust
821    /// # use hydro_lang::prelude::*;
822    /// # use futures::StreamExt;
823    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
824    /// let tick = process.tick();
825    /// # // ticks are lazy by default, forces the second tick to run
826    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
827    /// # let batch_first_tick = process
828    /// #   .source_iter(q!(vec![1]))
829    /// #   .batch(&tick, nondet!(/** test */));
830    /// # let batch_second_tick = process
831    /// #   .source_iter(q!(vec![1, 2, 3]))
832    /// #   .batch(&tick, nondet!(/** test */))
833    /// #   .defer_tick(); // appears on the second tick
834    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
835    /// input_batch // first tick: [1], second tick: [1, 2, 3]
836    ///     .count()
837    ///     .all_ticks()
838    /// # }, |mut stream| async move {
839    /// // [1, 3]
840    /// # for w in vec![1, 3] {
841    /// #     assert_eq!(stream.next().await.unwrap(), w);
842    /// # }
843    /// # }));
844    /// ```
845    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
846        self.into_stream().all_ticks()
847    }
848
849    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
850    /// which will stream the value computed in _each_ tick as a separate stream element.
851    ///
852    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
853    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
854    /// singleton's [`Tick`] context.
855    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
856        self.into_stream().all_ticks_atomic()
857    }
858
859    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
860    /// be asynchronously updated with the latest value of the singleton inside the tick.
861    ///
862    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
863    /// tick that tracks the inner value. This is useful for getting the value as of the
864    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
865    ///
866    /// # Example
867    /// ```rust
868    /// # use hydro_lang::prelude::*;
869    /// # use futures::StreamExt;
870    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
871    /// let tick = process.tick();
872    /// # // ticks are lazy by default, forces the second tick to run
873    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
874    /// # let batch_first_tick = process
875    /// #   .source_iter(q!(vec![1]))
876    /// #   .batch(&tick, nondet!(/** test */));
877    /// # let batch_second_tick = process
878    /// #   .source_iter(q!(vec![1, 2, 3]))
879    /// #   .batch(&tick, nondet!(/** test */))
880    /// #   .defer_tick(); // appears on the second tick
881    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
882    /// input_batch // first tick: [1], second tick: [1, 2, 3]
883    ///     .count()
884    ///     .latest()
885    /// # .sample_eager(nondet!(/** test */))
886    /// # }, |mut stream| async move {
887    /// // asynchronously changes from 1 ~> 3
888    /// # for w in vec![1, 3] {
889    /// #     assert_eq!(stream.next().await.unwrap(), w);
890    /// # }
891    /// # }));
892    /// ```
893    pub fn latest(self) -> Singleton<T, L, Unbounded> {
894        Singleton::new(
895            self.location.outer().clone(),
896            HydroNode::YieldConcat {
897                inner: Box::new(self.ir_node.into_inner()),
898                metadata: self
899                    .location
900                    .outer()
901                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
902            },
903        )
904    }
905
906    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
907    /// be updated with the latest value of the singleton inside the tick.
908    ///
909    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
910    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
911    /// singleton's [`Tick`] context.
912    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
913        let out_location = Atomic {
914            tick: self.location.clone(),
915        };
916        Singleton::new(
917            out_location.clone(),
918            HydroNode::YieldConcat {
919                inner: Box::new(self.ir_node.into_inner()),
920                metadata: out_location
921                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
922            },
923        )
924    }
925
926    #[deprecated(note = "use .into_stream().persist()")]
927    #[expect(missing_docs, reason = "deprecated")]
928    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
929        Stream::new(
930            self.location.clone(),
931            HydroNode::Persist {
932                inner: Box::new(self.ir_node.into_inner()),
933                metadata: self.location.new_node_metadata(Stream::<
934                    T,
935                    Tick<L>,
936                    Bounded,
937                    TotalOrder,
938                    ExactlyOnce,
939                >::collection_kind()),
940            },
941        )
942    }
943
944    /// Converts this singleton into a [`Stream`] containing a single element, the value.
945    ///
946    /// # Example
947    /// ```rust
948    /// # use hydro_lang::prelude::*;
949    /// # use futures::StreamExt;
950    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
951    /// let tick = process.tick();
952    /// let batch_input = process
953    ///   .source_iter(q!(vec![123, 456]))
954    ///   .batch(&tick, nondet!(/** test */));
955    /// batch_input.clone().chain(
956    ///   batch_input.count().into_stream()
957    /// ).all_ticks()
958    /// # }, |mut stream| async move {
959    /// // [123, 456, 2]
960    /// # for w in vec![123, 456, 2] {
961    /// #     assert_eq!(stream.next().await.unwrap(), w);
962    /// # }
963    /// # }));
964    /// ```
965    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
966        Stream::new(
967            self.location.clone(),
968            HydroNode::Cast {
969                inner: Box::new(self.ir_node.into_inner()),
970                metadata: self.location.new_node_metadata(Stream::<
971                    T,
972                    Tick<L>,
973                    Bounded,
974                    TotalOrder,
975                    ExactlyOnce,
976                >::collection_kind()),
977            },
978        )
979    }
980}
981
982#[doc(hidden)]
983/// Helper trait that determines the output collection type for [`Singleton::zip`].
984///
985/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
986/// [`Singleton`].
987#[sealed::sealed]
988pub trait ZipResult<'a, Other> {
989    /// The output collection type.
990    type Out;
991    /// The type of the tupled output value.
992    type ElementType;
993    /// The type of the other collection's value.
994    type OtherType;
995    /// The location where the tupled result will be materialized.
996    type Location: Location<'a>;
997
998    /// The location of the second input to the `zip`.
999    fn other_location(other: &Other) -> Self::Location;
1000    /// The IR node of the second input to the `zip`.
1001    fn other_ir_node(other: Other) -> HydroNode;
1002
1003    /// Constructs the output live collection given an IR node containing the zip result.
1004    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1005}
1006
1007#[sealed::sealed]
1008impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1009where
1010    L: Location<'a>,
1011{
1012    type Out = Singleton<(T, U), L, B>;
1013    type ElementType = (T, U);
1014    type OtherType = U;
1015    type Location = L;
1016
1017    fn other_location(other: &Singleton<U, L, B>) -> L {
1018        other.location.clone()
1019    }
1020
1021    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1022        other.ir_node.into_inner()
1023    }
1024
1025    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1026        Singleton::new(
1027            location.clone(),
1028            HydroNode::Cast {
1029                inner: Box::new(ir_node),
1030                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1031            },
1032        )
1033    }
1034}
1035
1036#[sealed::sealed]
1037impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1038where
1039    L: Location<'a>,
1040{
1041    type Out = Optional<(T, U), L, B>;
1042    type ElementType = (T, U);
1043    type OtherType = U;
1044    type Location = L;
1045
1046    fn other_location(other: &Optional<U, L, B>) -> L {
1047        other.location.clone()
1048    }
1049
1050    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1051        other.ir_node.into_inner()
1052    }
1053
1054    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1055        Optional::new(location, ir_node)
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use futures::{SinkExt, StreamExt};
1062    use hydro_deploy::Deployment;
1063    use stageleft::q;
1064
1065    use crate::compile::builder::FlowBuilder;
1066    use crate::live_collections::stream::ExactlyOnce;
1067    use crate::location::Location;
1068    use crate::nondet::nondet;
1069
1070    #[tokio::test]
1071    async fn tick_cycle_cardinality() {
1072        let mut deployment = Deployment::new();
1073
1074        let flow = FlowBuilder::new();
1075        let node = flow.process::<()>();
1076        let external = flow.external::<()>();
1077
1078        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1079
1080        let node_tick = node.tick();
1081        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1082        let counts = singleton
1083            .clone()
1084            .into_stream()
1085            .count()
1086            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1087            .all_ticks()
1088            .send_bincode_external(&external);
1089        complete_cycle.complete_next_tick(singleton);
1090
1091        let nodes = flow
1092            .with_process(&node, deployment.Localhost())
1093            .with_external(&external, deployment.Localhost())
1094            .deploy(&mut deployment);
1095
1096        deployment.deploy().await.unwrap();
1097
1098        let mut tick_trigger = nodes.connect(input_send).await;
1099        let mut external_out = nodes.connect(counts).await;
1100
1101        deployment.start().await.unwrap();
1102
1103        tick_trigger.send(()).await.unwrap();
1104
1105        assert_eq!(external_out.next().await.unwrap(), 1);
1106
1107        tick_trigger.send(()).await.unwrap();
1108
1109        assert_eq!(external_out.next().await.unwrap(), 1);
1110    }
1111}