hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46    L: Location<'a>,
47{
48    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49        Singleton::new(singleton.location, singleton.ir_node.into_inner())
50    }
51}
52
53impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
60        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
61            location.clone(),
62            HydroNode::DeferTick {
63                input: Box::new(HydroNode::CycleSource {
64                    ident,
65                    metadata: location.new_node_metadata(Self::collection_kind()),
66                }),
67                metadata: location
68                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
69            },
70        );
71
72        from_previous_tick.unwrap_or(initial)
73    }
74}
75
76impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
77where
78    L: Location<'a>,
79{
80    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
81        assert_eq!(
82            Location::id(&self.location),
83            expected_location,
84            "locations do not match"
85        );
86        self.location
87            .flow_state()
88            .borrow_mut()
89            .push_root(HydroRoot::CycleSink {
90                ident,
91                input: Box::new(self.ir_node.into_inner()),
92                op_metadata: HydroIrOpMetadata::new(),
93            });
94    }
95}
96
97impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
104        Singleton::new(
105            location.clone(),
106            HydroNode::CycleSource {
107                ident,
108                metadata: location.new_node_metadata(Self::collection_kind()),
109            },
110        )
111    }
112}
113
114impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116    L: Location<'a>,
117{
118    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
119        assert_eq!(
120            Location::id(&self.location),
121            expected_location,
122            "locations do not match"
123        );
124        self.location
125            .flow_state()
126            .borrow_mut()
127            .push_root(HydroRoot::CycleSink {
128                ident,
129                input: Box::new(self.ir_node.into_inner()),
130                op_metadata: HydroIrOpMetadata::new(),
131            });
132    }
133}
134
135impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
136where
137    L: Location<'a> + NoTick,
138{
139    type Location = L;
140
141    fn create_source(ident: syn::Ident, location: L) -> Self {
142        Singleton::new(
143            location.clone(),
144            HydroNode::CycleSource {
145                ident,
146                metadata: location.new_node_metadata(Self::collection_kind()),
147            },
148        )
149    }
150}
151
152impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
174where
175    T: Clone,
176    L: Location<'a>,
177{
178    fn clone(&self) -> Self {
179        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
180            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
181            *self.ir_node.borrow_mut() = HydroNode::Tee {
182                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
183                metadata: self.location.new_node_metadata(Self::collection_kind()),
184            };
185        }
186
187        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
188            Singleton {
189                location: self.location.clone(),
190                ir_node: HydroNode::Tee {
191                    inner: TeeNode(inner.0.clone()),
192                    metadata: metadata.clone(),
193                }
194                .into(),
195                _phantom: PhantomData,
196            }
197        } else {
198            unreachable!()
199        }
200    }
201}
202
203#[cfg(stageleft_runtime)]
204fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
205    me: Singleton<T, Tick<L>, B>,
206    other: O,
207) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
208where
209    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
210{
211    check_matching_location(
212        &me.location,
213        &Singleton::<T, Tick<L>, B>::other_location(&other),
214    );
215
216    Singleton::<T, Tick<L>, B>::make(
217        me.location.clone(),
218        HydroNode::CrossSingleton {
219            left: Box::new(me.ir_node.into_inner()),
220            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
221            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
222                bound: B::BOUND_KIND,
223                element_type: stageleft::quote_type::<
224                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
225                >()
226                .into(),
227            }),
228        },
229    )
230}
231
232impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
233where
234    L: Location<'a>,
235{
236    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
237        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
238        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
239        Singleton {
240            location,
241            ir_node: RefCell::new(ir_node),
242            _phantom: PhantomData,
243        }
244    }
245
246    pub(crate) fn collection_kind() -> CollectionKind {
247        CollectionKind::Singleton {
248            bound: B::BOUND_KIND,
249            element_type: stageleft::quote_type::<T>().into(),
250        }
251    }
252
253    /// Returns the [`Location`] where this singleton is being materialized.
254    pub fn location(&self) -> &L {
255        &self.location
256    }
257
258    /// Transforms the singleton value by applying a function `f` to it,
259    /// continuously as the input is updated.
260    ///
261    /// # Example
262    /// ```rust
263    /// # #[cfg(feature = "deploy")] {
264    /// # use hydro_lang::prelude::*;
265    /// # use futures::StreamExt;
266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267    /// let tick = process.tick();
268    /// let singleton = tick.singleton(q!(5));
269    /// singleton.map(q!(|v| v * 2)).all_ticks()
270    /// # }, |mut stream| async move {
271    /// // 10
272    /// # assert_eq!(stream.next().await.unwrap(), 10);
273    /// # }));
274    /// # }
275    /// ```
276    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
277    where
278        F: Fn(T) -> U + 'a,
279    {
280        let f = f.splice_fn1_ctx(&self.location).into();
281        Singleton::new(
282            self.location.clone(),
283            HydroNode::Map {
284                f,
285                input: Box::new(self.ir_node.into_inner()),
286                metadata: self
287                    .location
288                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
289            },
290        )
291    }
292
293    /// Transforms the singleton value by applying a function `f` to it and then flattening
294    /// the result into a stream, preserving the order of elements.
295    ///
296    /// The function `f` is applied to the singleton value to produce an iterator, and all items
297    /// from that iterator are emitted in the output stream in deterministic order.
298    ///
299    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
300    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
301    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
302    ///
303    /// # Example
304    /// ```rust
305    /// # #[cfg(feature = "deploy")] {
306    /// # use hydro_lang::prelude::*;
307    /// # use futures::StreamExt;
308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309    /// let tick = process.tick();
310    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
311    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
312    /// # }, |mut stream| async move {
313    /// // 1, 2, 3
314    /// # for w in vec![1, 2, 3] {
315    /// #     assert_eq!(stream.next().await.unwrap(), w);
316    /// # }
317    /// # }));
318    /// # }
319    /// ```
320    pub fn flat_map_ordered<U, I, F>(
321        self,
322        f: impl IntoQuotedMut<'a, F, L>,
323    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
324    where
325        I: IntoIterator<Item = U>,
326        F: Fn(T) -> I + 'a,
327    {
328        let f = f.splice_fn1_ctx(&self.location).into();
329        Stream::new(
330            self.location.clone(),
331            HydroNode::FlatMap {
332                f,
333                input: Box::new(self.ir_node.into_inner()),
334                metadata: self.location.new_node_metadata(
335                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
336                ),
337            },
338        )
339    }
340
341    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
342    /// for the output type `I` to produce items in any order.
343    ///
344    /// The function `f` is applied to the singleton value to produce an iterator, and all items
345    /// from that iterator are emitted in the output stream in non-deterministic order.
346    ///
347    /// # Example
348    /// ```rust
349    /// # #[cfg(feature = "deploy")] {
350    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
351    /// # use futures::StreamExt;
352    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
353    /// let tick = process.tick();
354    /// let singleton = tick.singleton(q!(
355    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
356    /// ));
357    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
358    /// # }, |mut stream| async move {
359    /// // 1, 2, 3, but in no particular order
360    /// # let mut results = Vec::new();
361    /// # for _ in 0..3 {
362    /// #     results.push(stream.next().await.unwrap());
363    /// # }
364    /// # results.sort();
365    /// # assert_eq!(results, vec![1, 2, 3]);
366    /// # }));
367    /// # }
368    /// ```
369    pub fn flat_map_unordered<U, I, F>(
370        self,
371        f: impl IntoQuotedMut<'a, F, L>,
372    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
373    where
374        I: IntoIterator<Item = U>,
375        F: Fn(T) -> I + 'a,
376    {
377        let f = f.splice_fn1_ctx(&self.location).into();
378        Stream::new(
379            self.location.clone(),
380            HydroNode::FlatMap {
381                f,
382                input: Box::new(self.ir_node.into_inner()),
383                metadata: self
384                    .location
385                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
386            },
387        )
388    }
389
390    /// Flattens the singleton value into a stream, preserving the order of elements.
391    ///
392    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
393    /// are emitted in the output stream in deterministic order.
394    ///
395    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
396    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
397    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
398    ///
399    /// # Example
400    /// ```rust
401    /// # #[cfg(feature = "deploy")] {
402    /// # use hydro_lang::prelude::*;
403    /// # use futures::StreamExt;
404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405    /// let tick = process.tick();
406    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
407    /// singleton.flatten_ordered().all_ticks()
408    /// # }, |mut stream| async move {
409    /// // 1, 2, 3
410    /// # for w in vec![1, 2, 3] {
411    /// #     assert_eq!(stream.next().await.unwrap(), w);
412    /// # }
413    /// # }));
414    /// # }
415    /// ```
416    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
417    where
418        T: IntoIterator<Item = U>,
419    {
420        self.flat_map_ordered(q!(|x| x))
421    }
422
423    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
424    /// for the element type `T` to produce items in any order.
425    ///
426    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
427    /// are emitted in the output stream in non-deterministic order.
428    ///
429    /// # Example
430    /// ```rust
431    /// # #[cfg(feature = "deploy")] {
432    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
433    /// # use futures::StreamExt;
434    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
435    /// let tick = process.tick();
436    /// let singleton = tick.singleton(q!(
437    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
438    /// ));
439    /// singleton.flatten_unordered().all_ticks()
440    /// # }, |mut stream| async move {
441    /// // 1, 2, 3, but in no particular order
442    /// # let mut results = Vec::new();
443    /// # for _ in 0..3 {
444    /// #     results.push(stream.next().await.unwrap());
445    /// # }
446    /// # results.sort();
447    /// # assert_eq!(results, vec![1, 2, 3]);
448    /// # }));
449    /// # }
450    /// ```
451    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
452    where
453        T: IntoIterator<Item = U>,
454    {
455        self.flat_map_unordered(q!(|x| x))
456    }
457
458    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
459    ///
460    /// If the predicate returns `true`, the output optional contains the same value.
461    /// If the predicate returns `false`, the output optional is empty.
462    ///
463    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
464    /// not modify or take ownership of the value. If you need to modify the value while filtering
465    /// use [`Singleton::filter_map`] instead.
466    ///
467    /// # Example
468    /// ```rust
469    /// # #[cfg(feature = "deploy")] {
470    /// # use hydro_lang::prelude::*;
471    /// # use futures::StreamExt;
472    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
473    /// let tick = process.tick();
474    /// let singleton = tick.singleton(q!(5));
475    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
476    /// # }, |mut stream| async move {
477    /// // 5
478    /// # assert_eq!(stream.next().await.unwrap(), 5);
479    /// # }));
480    /// # }
481    /// ```
482    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
483    where
484        F: Fn(&T) -> bool + 'a,
485    {
486        let f = f.splice_fn1_borrow_ctx(&self.location).into();
487        Optional::new(
488            self.location.clone(),
489            HydroNode::Filter {
490                f,
491                input: Box::new(self.ir_node.into_inner()),
492                metadata: self
493                    .location
494                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
495            },
496        )
497    }
498
499    /// An operator that both filters and maps. It yields the value only if the supplied
500    /// closure `f` returns `Some(value)`.
501    ///
502    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
503    /// If the closure returns `None`, the output optional is empty.
504    ///
505    /// # Example
506    /// ```rust
507    /// # #[cfg(feature = "deploy")] {
508    /// # use hydro_lang::prelude::*;
509    /// # use futures::StreamExt;
510    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
511    /// let tick = process.tick();
512    /// let singleton = tick.singleton(q!("42"));
513    /// singleton
514    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
515    ///     .all_ticks()
516    /// # }, |mut stream| async move {
517    /// // 42
518    /// # assert_eq!(stream.next().await.unwrap(), 42);
519    /// # }));
520    /// # }
521    /// ```
522    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
523    where
524        F: Fn(T) -> Option<U> + 'a,
525    {
526        let f = f.splice_fn1_ctx(&self.location).into();
527        Optional::new(
528            self.location.clone(),
529            HydroNode::FilterMap {
530                f,
531                input: Box::new(self.ir_node.into_inner()),
532                metadata: self
533                    .location
534                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
535            },
536        )
537    }
538
539    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
540    ///
541    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
542    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
543    /// non-null. This is useful for combining several pieces of state together.
544    ///
545    /// # Example
546    /// ```rust
547    /// # #[cfg(feature = "deploy")] {
548    /// # use hydro_lang::prelude::*;
549    /// # use futures::StreamExt;
550    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
551    /// let tick = process.tick();
552    /// let numbers = process
553    ///   .source_iter(q!(vec![123, 456]))
554    ///   .batch(&tick, nondet!(/** test */));
555    /// let count = numbers.clone().count(); // Singleton
556    /// let max = numbers.max(); // Optional
557    /// count.zip(max).all_ticks()
558    /// # }, |mut stream| async move {
559    /// // [(2, 456)]
560    /// # for w in vec![(2, 456)] {
561    /// #     assert_eq!(stream.next().await.unwrap(), w);
562    /// # }
563    /// # }));
564    /// # }
565    /// ```
566    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
567    where
568        Self: ZipResult<'a, O, Location = L>,
569    {
570        check_matching_location(&self.location, &Self::other_location(&other));
571
572        if L::is_top_level()
573            && let Some(tick) = self.location.try_tick()
574        {
575            let out = zip_inside_tick(
576                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
577                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
578                    Self::other_location(&other),
579                    Self::other_ir_node(other),
580                )
581                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
582            )
583            .latest();
584
585            Self::make(out.location, out.ir_node.into_inner())
586        } else {
587            Self::make(
588                self.location.clone(),
589                HydroNode::CrossSingleton {
590                    left: Box::new(self.ir_node.into_inner()),
591                    right: Box::new(Self::other_ir_node(other)),
592                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
593                        bound: B::BOUND_KIND,
594                        element_type: stageleft::quote_type::<
595                            <Self as ZipResult<'a, O>>::ElementType,
596                        >()
597                        .into(),
598                    }),
599                },
600            )
601        }
602    }
603
604    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
605    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
606    ///
607    /// Useful for conditionally processing, such as only emitting a singleton's value outside
608    /// a tick if some other condition is satisfied.
609    ///
610    /// # Example
611    /// ```rust
612    /// # #[cfg(feature = "deploy")] {
613    /// # use hydro_lang::prelude::*;
614    /// # use futures::StreamExt;
615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616    /// let tick = process.tick();
617    /// // ticks are lazy by default, forces the second tick to run
618    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
619    ///
620    /// let batch_first_tick = process
621    ///   .source_iter(q!(vec![1]))
622    ///   .batch(&tick, nondet!(/** test */));
623    /// let batch_second_tick = process
624    ///   .source_iter(q!(vec![1, 2, 3]))
625    ///   .batch(&tick, nondet!(/** test */))
626    ///   .defer_tick(); // appears on the second tick
627    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
628    /// batch_first_tick.chain(batch_second_tick).count()
629    ///   .filter_if_some(some_on_first_tick)
630    ///   .all_ticks()
631    /// # }, |mut stream| async move {
632    /// // [1]
633    /// # for w in vec![1] {
634    /// #     assert_eq!(stream.next().await.unwrap(), w);
635    /// # }
636    /// # }));
637    /// # }
638    /// ```
639    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
640        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
641            .map(q!(|(d, _signal)| d))
642    }
643
644    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
645    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
646    ///
647    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
648    /// the condition.
649    ///
650    /// # Example
651    /// ```rust
652    /// # #[cfg(feature = "deploy")] {
653    /// # use hydro_lang::prelude::*;
654    /// # use futures::StreamExt;
655    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
656    /// let tick = process.tick();
657    /// // ticks are lazy by default, forces the second tick to run
658    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
659    ///
660    /// let batch_first_tick = process
661    ///   .source_iter(q!(vec![1]))
662    ///   .batch(&tick, nondet!(/** test */));
663    /// let batch_second_tick = process
664    ///   .source_iter(q!(vec![1, 2, 3]))
665    ///   .batch(&tick, nondet!(/** test */))
666    ///   .defer_tick(); // appears on the second tick
667    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
668    /// batch_first_tick.chain(batch_second_tick).count()
669    ///   .filter_if_none(some_on_first_tick)
670    ///   .all_ticks()
671    /// # }, |mut stream| async move {
672    /// // [3]
673    /// # for w in vec![3] {
674    /// #     assert_eq!(stream.next().await.unwrap(), w);
675    /// # }
676    /// # }));
677    /// # }
678    /// ```
679    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
680        self.filter_if_some(
681            other
682                .map(q!(|_| ()))
683                .into_singleton()
684                .filter(q!(|o| o.is_none())),
685        )
686    }
687
688    /// An operator which allows you to "name" a `HydroNode`.
689    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
690    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
691        {
692            let mut node = self.ir_node.borrow_mut();
693            let metadata = node.metadata_mut();
694            metadata.tag = Some(name.to_string());
695        }
696        self
697    }
698}
699
700impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
701where
702    L: Location<'a> + NoTick,
703{
704    /// Returns a singleton value corresponding to the latest snapshot of the singleton
705    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
706    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
707    /// all snapshots of this singleton into the atomic-associated tick will observe the
708    /// same value each tick.
709    ///
710    /// # Non-Determinism
711    /// Because this picks a snapshot of a singleton whose value is continuously changing,
712    /// the output singleton has a non-deterministic value since the snapshot can be at an
713    /// arbitrary point in time.
714    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
715        Singleton::new(
716            self.location.clone().tick,
717            HydroNode::Batch {
718                inner: Box::new(self.ir_node.into_inner()),
719                metadata: self
720                    .location
721                    .tick
722                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
723            },
724        )
725    }
726
727    /// Returns this singleton back into a top-level, asynchronous execution context where updates
728    /// to the value will be asynchronously propagated.
729    pub fn end_atomic(self) -> Singleton<T, L, B> {
730        Singleton::new(
731            self.location.tick.l.clone(),
732            HydroNode::EndAtomic {
733                inner: Box::new(self.ir_node.into_inner()),
734                metadata: self
735                    .location
736                    .tick
737                    .l
738                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
739            },
740        )
741    }
742}
743
744impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
745where
746    L: Location<'a>,
747{
748    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
749    /// will observe the same version of the value and will be executed synchronously before any
750    /// outputs are yielded (in [`Optional::end_atomic`]).
751    ///
752    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
753    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
754    /// a different version).
755    ///
756    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
757    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
758    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
759    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
760        let out_location = Atomic { tick: tick.clone() };
761        Singleton::new(
762            out_location.clone(),
763            HydroNode::BeginAtomic {
764                inner: Box::new(self.ir_node.into_inner()),
765                metadata: out_location
766                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
767            },
768        )
769    }
770
771    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
772    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
773    /// relevant data that contributed to the snapshot at tick `t`.
774    ///
775    /// # Non-Determinism
776    /// Because this picks a snapshot of a singleton whose value is continuously changing,
777    /// the output singleton has a non-deterministic value since the snapshot can be at an
778    /// arbitrary point in time.
779    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
780        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
781        Singleton::new(
782            tick.clone(),
783            HydroNode::Batch {
784                inner: Box::new(self.ir_node.into_inner()),
785                metadata: tick
786                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
787            },
788        )
789    }
790
791    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
792    /// with order corresponding to increasing prefixes of data contributing to the singleton.
793    ///
794    /// # Non-Determinism
795    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
796    /// to non-deterministic batching and arrival of inputs, the output stream is
797    /// non-deterministic.
798    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
799    where
800        L: NoTick,
801    {
802        let tick = self.location.tick();
803        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
804    }
805
806    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
807    /// value taken at various points in time. Because the input singleton may be
808    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
809    /// represent the value of the singleton given some prefix of the streams leading up to
810    /// it.
811    ///
812    /// # Non-Determinism
813    /// The output stream is non-deterministic in which elements are sampled, since this
814    /// is controlled by a clock.
815    pub fn sample_every(
816        self,
817        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
818        nondet: NonDet,
819    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
820    where
821        L: NoTick + NoAtomic,
822    {
823        let samples = self.location.source_interval(interval, nondet);
824        let tick = self.location.tick();
825
826        self.snapshot(&tick, nondet)
827            .filter_if_some(samples.batch(&tick, nondet).first())
828            .all_ticks()
829            .weakest_retries()
830    }
831}
832
833impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
834where
835    L: Location<'a>,
836{
837    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
838    /// which will stream the value computed in _each_ tick as a separate stream element.
839    ///
840    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
841    /// producing one element in the output for each tick. This is useful for batched computations,
842    /// where the results from each tick must be combined together.
843    ///
844    /// # Example
845    /// ```rust
846    /// # #[cfg(feature = "deploy")] {
847    /// # use hydro_lang::prelude::*;
848    /// # use futures::StreamExt;
849    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
850    /// let tick = process.tick();
851    /// # // ticks are lazy by default, forces the second tick to run
852    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
853    /// # let batch_first_tick = process
854    /// #   .source_iter(q!(vec![1]))
855    /// #   .batch(&tick, nondet!(/** test */));
856    /// # let batch_second_tick = process
857    /// #   .source_iter(q!(vec![1, 2, 3]))
858    /// #   .batch(&tick, nondet!(/** test */))
859    /// #   .defer_tick(); // appears on the second tick
860    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
861    /// input_batch // first tick: [1], second tick: [1, 2, 3]
862    ///     .count()
863    ///     .all_ticks()
864    /// # }, |mut stream| async move {
865    /// // [1, 3]
866    /// # for w in vec![1, 3] {
867    /// #     assert_eq!(stream.next().await.unwrap(), w);
868    /// # }
869    /// # }));
870    /// # }
871    /// ```
872    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
873        self.into_stream().all_ticks()
874    }
875
876    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
877    /// which will stream the value computed in _each_ tick as a separate stream element.
878    ///
879    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
880    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
881    /// singleton's [`Tick`] context.
882    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
883        self.into_stream().all_ticks_atomic()
884    }
885
886    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
887    /// be asynchronously updated with the latest value of the singleton inside the tick.
888    ///
889    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
890    /// tick that tracks the inner value. This is useful for getting the value as of the
891    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
892    ///
893    /// # Example
894    /// ```rust
895    /// # #[cfg(feature = "deploy")] {
896    /// # use hydro_lang::prelude::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// # // ticks are lazy by default, forces the second tick to run
901    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902    /// # let batch_first_tick = process
903    /// #   .source_iter(q!(vec![1]))
904    /// #   .batch(&tick, nondet!(/** test */));
905    /// # let batch_second_tick = process
906    /// #   .source_iter(q!(vec![1, 2, 3]))
907    /// #   .batch(&tick, nondet!(/** test */))
908    /// #   .defer_tick(); // appears on the second tick
909    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
910    /// input_batch // first tick: [1], second tick: [1, 2, 3]
911    ///     .count()
912    ///     .latest()
913    /// # .sample_eager(nondet!(/** test */))
914    /// # }, |mut stream| async move {
915    /// // asynchronously changes from 1 ~> 3
916    /// # for w in vec![1, 3] {
917    /// #     assert_eq!(stream.next().await.unwrap(), w);
918    /// # }
919    /// # }));
920    /// # }
921    /// ```
922    pub fn latest(self) -> Singleton<T, L, Unbounded> {
923        Singleton::new(
924            self.location.outer().clone(),
925            HydroNode::YieldConcat {
926                inner: Box::new(self.ir_node.into_inner()),
927                metadata: self
928                    .location
929                    .outer()
930                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
931            },
932        )
933    }
934
935    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
936    /// be updated with the latest value of the singleton inside the tick.
937    ///
938    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
939    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
940    /// singleton's [`Tick`] context.
941    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
942        let out_location = Atomic {
943            tick: self.location.clone(),
944        };
945        Singleton::new(
946            out_location.clone(),
947            HydroNode::YieldConcat {
948                inner: Box::new(self.ir_node.into_inner()),
949                metadata: out_location
950                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
951            },
952        )
953    }
954
955    #[deprecated(note = "use .into_stream().persist()")]
956    #[expect(missing_docs, reason = "deprecated")]
957    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
958    where
959        T: Clone,
960    {
961        self.into_stream().persist()
962    }
963
964    /// Converts this singleton into a [`Stream`] containing a single element, the value.
965    ///
966    /// # Example
967    /// ```rust
968    /// # #[cfg(feature = "deploy")] {
969    /// # use hydro_lang::prelude::*;
970    /// # use futures::StreamExt;
971    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
972    /// let tick = process.tick();
973    /// let batch_input = process
974    ///   .source_iter(q!(vec![123, 456]))
975    ///   .batch(&tick, nondet!(/** test */));
976    /// batch_input.clone().chain(
977    ///   batch_input.count().into_stream()
978    /// ).all_ticks()
979    /// # }, |mut stream| async move {
980    /// // [123, 456, 2]
981    /// # for w in vec![123, 456, 2] {
982    /// #     assert_eq!(stream.next().await.unwrap(), w);
983    /// # }
984    /// # }));
985    /// # }
986    /// ```
987    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
988        Stream::new(
989            self.location.clone(),
990            HydroNode::Cast {
991                inner: Box::new(self.ir_node.into_inner()),
992                metadata: self.location.new_node_metadata(Stream::<
993                    T,
994                    Tick<L>,
995                    Bounded,
996                    TotalOrder,
997                    ExactlyOnce,
998                >::collection_kind()),
999            },
1000        )
1001    }
1002}
1003
1004#[doc(hidden)]
1005/// Helper trait that determines the output collection type for [`Singleton::zip`].
1006///
1007/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1008/// [`Singleton`].
1009#[sealed::sealed]
1010pub trait ZipResult<'a, Other> {
1011    /// The output collection type.
1012    type Out;
1013    /// The type of the tupled output value.
1014    type ElementType;
1015    /// The type of the other collection's value.
1016    type OtherType;
1017    /// The location where the tupled result will be materialized.
1018    type Location: Location<'a>;
1019
1020    /// The location of the second input to the `zip`.
1021    fn other_location(other: &Other) -> Self::Location;
1022    /// The IR node of the second input to the `zip`.
1023    fn other_ir_node(other: Other) -> HydroNode;
1024
1025    /// Constructs the output live collection given an IR node containing the zip result.
1026    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1027}
1028
1029#[sealed::sealed]
1030impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1031where
1032    L: Location<'a>,
1033{
1034    type Out = Singleton<(T, U), L, B>;
1035    type ElementType = (T, U);
1036    type OtherType = U;
1037    type Location = L;
1038
1039    fn other_location(other: &Singleton<U, L, B>) -> L {
1040        other.location.clone()
1041    }
1042
1043    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1044        other.ir_node.into_inner()
1045    }
1046
1047    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1048        Singleton::new(
1049            location.clone(),
1050            HydroNode::Cast {
1051                inner: Box::new(ir_node),
1052                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1053            },
1054        )
1055    }
1056}
1057
1058#[sealed::sealed]
1059impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1060where
1061    L: Location<'a>,
1062{
1063    type Out = Optional<(T, U), L, B>;
1064    type ElementType = (T, U);
1065    type OtherType = U;
1066    type Location = L;
1067
1068    fn other_location(other: &Optional<U, L, B>) -> L {
1069        other.location.clone()
1070    }
1071
1072    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1073        other.ir_node.into_inner()
1074    }
1075
1076    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1077        Optional::new(location, ir_node)
1078    }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    #[cfg(feature = "deploy")]
1084    use futures::{SinkExt, StreamExt};
1085    #[cfg(feature = "deploy")]
1086    use hydro_deploy::Deployment;
1087    #[cfg(any(feature = "deploy", feature = "sim"))]
1088    use stageleft::q;
1089
1090    #[cfg(any(feature = "deploy", feature = "sim"))]
1091    use crate::compile::builder::FlowBuilder;
1092    #[cfg(feature = "deploy")]
1093    use crate::live_collections::stream::ExactlyOnce;
1094    #[cfg(any(feature = "deploy", feature = "sim"))]
1095    use crate::location::Location;
1096    #[cfg(any(feature = "deploy", feature = "sim"))]
1097    use crate::nondet::nondet;
1098
1099    #[cfg(feature = "deploy")]
1100    #[tokio::test]
1101    async fn tick_cycle_cardinality() {
1102        let mut deployment = Deployment::new();
1103
1104        let flow = FlowBuilder::new();
1105        let node = flow.process::<()>();
1106        let external = flow.external::<()>();
1107
1108        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1109
1110        let node_tick = node.tick();
1111        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1112        let counts = singleton
1113            .clone()
1114            .into_stream()
1115            .count()
1116            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1117            .all_ticks()
1118            .send_bincode_external(&external);
1119        complete_cycle.complete_next_tick(singleton);
1120
1121        let nodes = flow
1122            .with_process(&node, deployment.Localhost())
1123            .with_external(&external, deployment.Localhost())
1124            .deploy(&mut deployment);
1125
1126        deployment.deploy().await.unwrap();
1127
1128        let mut tick_trigger = nodes.connect(input_send).await;
1129        let mut external_out = nodes.connect(counts).await;
1130
1131        deployment.start().await.unwrap();
1132
1133        tick_trigger.send(()).await.unwrap();
1134
1135        assert_eq!(external_out.next().await.unwrap(), 1);
1136
1137        tick_trigger.send(()).await.unwrap();
1138
1139        assert_eq!(external_out.next().await.unwrap(), 1);
1140    }
1141
1142    #[cfg(feature = "sim")]
1143    #[test]
1144    #[should_panic]
1145    fn sim_fold_intermediate_states() {
1146        let flow = FlowBuilder::new();
1147        let node = flow.process::<()>();
1148
1149        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1150        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1151
1152        let tick = node.tick();
1153        let batch = folded.snapshot(&tick, nondet!(/** test */));
1154        let out_recv = batch.all_ticks().sim_output();
1155
1156        flow.sim().exhaustive(async || {
1157            assert_eq!(out_recv.next().await.unwrap(), 10);
1158        });
1159    }
1160
1161    #[cfg(feature = "sim")]
1162    #[test]
1163    fn sim_fold_intermediate_state_count() {
1164        let flow = FlowBuilder::new();
1165        let node = flow.process::<()>();
1166
1167        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1168        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1169
1170        let tick = node.tick();
1171        let batch = folded.snapshot(&tick, nondet!(/** test */));
1172        let out_recv = batch.all_ticks().sim_output();
1173
1174        let instance_count = flow.sim().exhaustive(async || {
1175            let out = out_recv.collect::<Vec<_>>().await;
1176            assert_eq!(out.last(), Some(&10));
1177        });
1178
1179        assert_eq!(
1180            instance_count,
1181            16 // 2^4 possible subsets of intermediates (including initial state)
1182        )
1183    }
1184
1185    #[cfg(feature = "sim")]
1186    #[test]
1187    fn sim_fold_no_repeat_initial() {
1188        // check that we don't repeat the initial state of the fold in autonomous decisions
1189
1190        let flow = FlowBuilder::new();
1191        let node = flow.process::<()>();
1192
1193        let (in_port, input) = node.sim_input();
1194        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1195
1196        let tick = node.tick();
1197        let batch = folded.snapshot(&tick, nondet!(/** test */));
1198        let out_recv = batch.all_ticks().sim_output();
1199
1200        flow.sim().exhaustive(async || {
1201            assert_eq!(out_recv.next().await.unwrap(), 0);
1202
1203            in_port.send(123);
1204
1205            assert_eq!(out_recv.next().await.unwrap(), 123);
1206        });
1207    }
1208
1209    #[cfg(feature = "sim")]
1210    #[test]
1211    #[should_panic]
1212    fn sim_fold_repeats_snapshots() {
1213        // when the tick is driven by a snapshot AND something else, the snapshot can
1214        // "stutter" and repeat the same state multiple times
1215
1216        let flow = FlowBuilder::new();
1217        let node = flow.process::<()>();
1218
1219        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1220        let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1221
1222        let tick = node.tick();
1223        let batch = source_iter
1224            .batch(&tick, nondet!(/** test */))
1225            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1226        let out_recv = batch.all_ticks().sim_output();
1227
1228        flow.sim().exhaustive(async || {
1229            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1230            {
1231                panic!("repeated snapshot");
1232            }
1233        });
1234    }
1235
1236    #[cfg(feature = "sim")]
1237    #[test]
1238    fn sim_fold_repeats_snapshots_count() {
1239        // check the number of instances
1240        let flow = FlowBuilder::new();
1241        let node = flow.process::<()>();
1242
1243        let source_iter = node.source_iter(q!(vec![1, 2]));
1244        let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1245
1246        let tick = node.tick();
1247        let batch = source_iter
1248            .batch(&tick, nondet!(/** test */))
1249            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1250        let out_recv = batch.all_ticks().sim_output();
1251
1252        let count = flow.sim().exhaustive(async || {
1253            let _ = out_recv.collect::<Vec<_>>().await;
1254        });
1255
1256        assert_eq!(count, 52);
1257        // don't have a combinatorial explanation for this number yet, but checked via logs
1258    }
1259
1260    #[cfg(feature = "sim")]
1261    #[test]
1262    fn sim_top_level_singleton_exhaustive() {
1263        // ensures that top-level singletons have only one snapshot
1264        let flow = FlowBuilder::new();
1265        let node = flow.process::<()>();
1266
1267        let singleton = node.singleton(q!(1));
1268        let tick = node.tick();
1269        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1270        let out_recv = batch.all_ticks().sim_output();
1271
1272        let count = flow.sim().exhaustive(async || {
1273            let _ = out_recv.collect::<Vec<_>>().await;
1274        });
1275
1276        assert_eq!(count, 1);
1277    }
1278
1279    #[cfg(feature = "sim")]
1280    #[test]
1281    fn sim_top_level_singleton_join_count() {
1282        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1283        // exploration
1284
1285        let flow = FlowBuilder::new();
1286        let node = flow.process::<()>();
1287
1288        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1289        let tick = node.tick();
1290        let batch = source_iter
1291            .batch(&tick, nondet!(/** test */))
1292            .cross_singleton(
1293                node.singleton(q!(123))
1294                    .snapshot(&tick, nondet!(/** test */)),
1295            );
1296        let out_recv = batch.all_ticks().sim_output();
1297
1298        let instance_count = flow.sim().exhaustive(async || {
1299            let _ = out_recv.collect::<Vec<_>>().await;
1300        });
1301
1302        assert_eq!(
1303            instance_count,
1304            16 // 2^4 ways to split up (including a possibly empty first batch)
1305        )
1306    }
1307}