hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A single Rust value that can asynchronously change over time.
25///
26/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
27/// [`Unbounded`], the value will asynchronously change over time.
28///
29/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
30/// a single number that will asynchronously change as events are processed. Singletons also appear
31/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
32/// such as getting the length of a batch of requests.
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this singleton
36/// - `Loc`: the [`Location`] where the singleton is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Singleton<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41
42    _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
46where
47    T: Clone,
48    L: Location<'a> + NoTick,
49{
50    fn from(value: Singleton<T, L, Bounded>) -> Self {
51        let tick = value.location().tick();
52        value.clone_into_tick(&tick).latest()
53    }
54}
55
56impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
57where
58    L: Location<'a>,
59{
60    type Location = Tick<L>;
61
62    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
63        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
64            location.clone(),
65            HydroNode::DeferTick {
66                input: Box::new(HydroNode::CycleSource {
67                    ident,
68                    metadata: location.new_node_metadata(Self::collection_kind()),
69                }),
70                metadata: location
71                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
72            },
73        );
74
75        from_previous_tick.unwrap_or(initial)
76    }
77}
78
79impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
80where
81    L: Location<'a>,
82{
83    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
84        assert_eq!(
85            Location::id(&self.location),
86            expected_location,
87            "locations do not match"
88        );
89        self.location
90            .flow_state()
91            .borrow_mut()
92            .push_root(HydroRoot::CycleSink {
93                ident,
94                input: Box::new(self.ir_node.into_inner()),
95                op_metadata: HydroIrOpMetadata::new(),
96            });
97    }
98}
99
100impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
101where
102    L: Location<'a>,
103{
104    type Location = Tick<L>;
105
106    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
107        Singleton::new(
108            location.clone(),
109            HydroNode::CycleSource {
110                ident,
111                metadata: location.new_node_metadata(Self::collection_kind()),
112            },
113        )
114    }
115}
116
117impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
118where
119    L: Location<'a>,
120{
121    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
122        assert_eq!(
123            Location::id(&self.location),
124            expected_location,
125            "locations do not match"
126        );
127        self.location
128            .flow_state()
129            .borrow_mut()
130            .push_root(HydroRoot::CycleSink {
131                ident,
132                input: Box::new(self.ir_node.into_inner()),
133                op_metadata: HydroIrOpMetadata::new(),
134            });
135    }
136}
137
138impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
139where
140    L: Location<'a> + NoTick,
141{
142    type Location = L;
143
144    fn create_source(ident: syn::Ident, location: L) -> Self {
145        Singleton::new(
146            location.clone(),
147            HydroNode::CycleSource {
148                ident,
149                metadata: location.new_node_metadata(Self::collection_kind()),
150            },
151        )
152    }
153}
154
155impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
156where
157    L: Location<'a> + NoTick,
158{
159    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
160        assert_eq!(
161            Location::id(&self.location),
162            expected_location,
163            "locations do not match"
164        );
165        self.location
166            .flow_state()
167            .borrow_mut()
168            .push_root(HydroRoot::CycleSink {
169                ident,
170                input: Box::new(self.ir_node.into_inner()),
171                op_metadata: HydroIrOpMetadata::new(),
172            });
173    }
174}
175
176impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
177where
178    T: Clone,
179    L: Location<'a>,
180{
181    fn clone(&self) -> Self {
182        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
183            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
184            *self.ir_node.borrow_mut() = HydroNode::Tee {
185                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
186                metadata: self.location.new_node_metadata(Self::collection_kind()),
187            };
188        }
189
190        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
191            Singleton {
192                location: self.location.clone(),
193                ir_node: HydroNode::Tee {
194                    inner: TeeNode(inner.0.clone()),
195                    metadata: metadata.clone(),
196                }
197                .into(),
198                _phantom: PhantomData,
199            }
200        } else {
201            unreachable!()
202        }
203    }
204}
205
206#[cfg(stageleft_runtime)]
207fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
208    me: Singleton<T, Tick<L>, B>,
209    other: O,
210) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
211where
212    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
213{
214    check_matching_location(
215        &me.location,
216        &Singleton::<T, Tick<L>, B>::other_location(&other),
217    );
218
219    Singleton::<T, Tick<L>, B>::make(
220        me.location.clone(),
221        HydroNode::CrossSingleton {
222            left: Box::new(me.ir_node.into_inner()),
223            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
224            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
225                bound: B::BOUND_KIND,
226                element_type: stageleft::quote_type::<
227                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
228                >()
229                .into(),
230            }),
231        },
232    )
233}
234
235impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
236where
237    L: Location<'a>,
238{
239    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
240        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
241        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
242        Singleton {
243            location,
244            ir_node: RefCell::new(ir_node),
245            _phantom: PhantomData,
246        }
247    }
248
249    pub(crate) fn collection_kind() -> CollectionKind {
250        CollectionKind::Singleton {
251            bound: B::BOUND_KIND,
252            element_type: stageleft::quote_type::<T>().into(),
253        }
254    }
255
256    /// Returns the [`Location`] where this singleton is being materialized.
257    pub fn location(&self) -> &L {
258        &self.location
259    }
260
261    /// Transforms the singleton value by applying a function `f` to it,
262    /// continuously as the input is updated.
263    ///
264    /// # Example
265    /// ```rust
266    /// # #[cfg(feature = "deploy")] {
267    /// # use hydro_lang::prelude::*;
268    /// # use futures::StreamExt;
269    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
270    /// let tick = process.tick();
271    /// let singleton = tick.singleton(q!(5));
272    /// singleton.map(q!(|v| v * 2)).all_ticks()
273    /// # }, |mut stream| async move {
274    /// // 10
275    /// # assert_eq!(stream.next().await.unwrap(), 10);
276    /// # }));
277    /// # }
278    /// ```
279    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
280    where
281        F: Fn(T) -> U + 'a,
282    {
283        let f = f.splice_fn1_ctx(&self.location).into();
284        Singleton::new(
285            self.location.clone(),
286            HydroNode::Map {
287                f,
288                input: Box::new(self.ir_node.into_inner()),
289                metadata: self
290                    .location
291                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
292            },
293        )
294    }
295
296    /// Transforms the singleton value by applying a function `f` to it and then flattening
297    /// the result into a stream, preserving the order of elements.
298    ///
299    /// The function `f` is applied to the singleton value to produce an iterator, and all items
300    /// from that iterator are emitted in the output stream in deterministic order.
301    ///
302    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
303    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
304    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
305    ///
306    /// # Example
307    /// ```rust
308    /// # #[cfg(feature = "deploy")] {
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312    /// let tick = process.tick();
313    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
314    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
315    /// # }, |mut stream| async move {
316    /// // 1, 2, 3
317    /// # for w in vec![1, 2, 3] {
318    /// #     assert_eq!(stream.next().await.unwrap(), w);
319    /// # }
320    /// # }));
321    /// # }
322    /// ```
323    pub fn flat_map_ordered<U, I, F>(
324        self,
325        f: impl IntoQuotedMut<'a, F, L>,
326    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
327    where
328        I: IntoIterator<Item = U>,
329        F: Fn(T) -> I + 'a,
330    {
331        let f = f.splice_fn1_ctx(&self.location).into();
332        Stream::new(
333            self.location.clone(),
334            HydroNode::FlatMap {
335                f,
336                input: Box::new(self.ir_node.into_inner()),
337                metadata: self.location.new_node_metadata(
338                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
339                ),
340            },
341        )
342    }
343
344    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
345    /// for the output type `I` to produce items in any order.
346    ///
347    /// The function `f` is applied to the singleton value to produce an iterator, and all items
348    /// from that iterator are emitted in the output stream in non-deterministic order.
349    ///
350    /// # Example
351    /// ```rust
352    /// # #[cfg(feature = "deploy")] {
353    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
354    /// # use futures::StreamExt;
355    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
356    /// let tick = process.tick();
357    /// let singleton = tick.singleton(q!(
358    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
359    /// ));
360    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
361    /// # }, |mut stream| async move {
362    /// // 1, 2, 3, but in no particular order
363    /// # let mut results = Vec::new();
364    /// # for _ in 0..3 {
365    /// #     results.push(stream.next().await.unwrap());
366    /// # }
367    /// # results.sort();
368    /// # assert_eq!(results, vec![1, 2, 3]);
369    /// # }));
370    /// # }
371    /// ```
372    pub fn flat_map_unordered<U, I, F>(
373        self,
374        f: impl IntoQuotedMut<'a, F, L>,
375    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
376    where
377        I: IntoIterator<Item = U>,
378        F: Fn(T) -> I + 'a,
379    {
380        let f = f.splice_fn1_ctx(&self.location).into();
381        Stream::new(
382            self.location.clone(),
383            HydroNode::FlatMap {
384                f,
385                input: Box::new(self.ir_node.into_inner()),
386                metadata: self
387                    .location
388                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
389            },
390        )
391    }
392
393    /// Flattens the singleton value into a stream, preserving the order of elements.
394    ///
395    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
396    /// are emitted in the output stream in deterministic order.
397    ///
398    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
399    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
400    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
401    ///
402    /// # Example
403    /// ```rust
404    /// # #[cfg(feature = "deploy")] {
405    /// # use hydro_lang::prelude::*;
406    /// # use futures::StreamExt;
407    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
408    /// let tick = process.tick();
409    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
410    /// singleton.flatten_ordered().all_ticks()
411    /// # }, |mut stream| async move {
412    /// // 1, 2, 3
413    /// # for w in vec![1, 2, 3] {
414    /// #     assert_eq!(stream.next().await.unwrap(), w);
415    /// # }
416    /// # }));
417    /// # }
418    /// ```
419    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
420    where
421        T: IntoIterator<Item = U>,
422    {
423        self.flat_map_ordered(q!(|x| x))
424    }
425
426    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
427    /// for the element type `T` to produce items in any order.
428    ///
429    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
430    /// are emitted in the output stream in non-deterministic order.
431    ///
432    /// # Example
433    /// ```rust
434    /// # #[cfg(feature = "deploy")] {
435    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
436    /// # use futures::StreamExt;
437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
438    /// let tick = process.tick();
439    /// let singleton = tick.singleton(q!(
440    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
441    /// ));
442    /// singleton.flatten_unordered().all_ticks()
443    /// # }, |mut stream| async move {
444    /// // 1, 2, 3, but in no particular order
445    /// # let mut results = Vec::new();
446    /// # for _ in 0..3 {
447    /// #     results.push(stream.next().await.unwrap());
448    /// # }
449    /// # results.sort();
450    /// # assert_eq!(results, vec![1, 2, 3]);
451    /// # }));
452    /// # }
453    /// ```
454    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
455    where
456        T: IntoIterator<Item = U>,
457    {
458        self.flat_map_unordered(q!(|x| x))
459    }
460
461    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
462    ///
463    /// If the predicate returns `true`, the output optional contains the same value.
464    /// If the predicate returns `false`, the output optional is empty.
465    ///
466    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
467    /// not modify or take ownership of the value. If you need to modify the value while filtering
468    /// use [`Singleton::filter_map`] instead.
469    ///
470    /// # Example
471    /// ```rust
472    /// # #[cfg(feature = "deploy")] {
473    /// # use hydro_lang::prelude::*;
474    /// # use futures::StreamExt;
475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
476    /// let tick = process.tick();
477    /// let singleton = tick.singleton(q!(5));
478    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
479    /// # }, |mut stream| async move {
480    /// // 5
481    /// # assert_eq!(stream.next().await.unwrap(), 5);
482    /// # }));
483    /// # }
484    /// ```
485    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
486    where
487        F: Fn(&T) -> bool + 'a,
488    {
489        let f = f.splice_fn1_borrow_ctx(&self.location).into();
490        Optional::new(
491            self.location.clone(),
492            HydroNode::Filter {
493                f,
494                input: Box::new(self.ir_node.into_inner()),
495                metadata: self
496                    .location
497                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
498            },
499        )
500    }
501
502    /// An operator that both filters and maps. It yields the value only if the supplied
503    /// closure `f` returns `Some(value)`.
504    ///
505    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
506    /// If the closure returns `None`, the output optional is empty.
507    ///
508    /// # Example
509    /// ```rust
510    /// # #[cfg(feature = "deploy")] {
511    /// # use hydro_lang::prelude::*;
512    /// # use futures::StreamExt;
513    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
514    /// let tick = process.tick();
515    /// let singleton = tick.singleton(q!("42"));
516    /// singleton
517    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
518    ///     .all_ticks()
519    /// # }, |mut stream| async move {
520    /// // 42
521    /// # assert_eq!(stream.next().await.unwrap(), 42);
522    /// # }));
523    /// # }
524    /// ```
525    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
526    where
527        F: Fn(T) -> Option<U> + 'a,
528    {
529        let f = f.splice_fn1_ctx(&self.location).into();
530        Optional::new(
531            self.location.clone(),
532            HydroNode::FilterMap {
533                f,
534                input: Box::new(self.ir_node.into_inner()),
535                metadata: self
536                    .location
537                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
538            },
539        )
540    }
541
542    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
543    ///
544    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
545    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
546    /// non-null. This is useful for combining several pieces of state together.
547    ///
548    /// # Example
549    /// ```rust
550    /// # #[cfg(feature = "deploy")] {
551    /// # use hydro_lang::prelude::*;
552    /// # use futures::StreamExt;
553    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
554    /// let tick = process.tick();
555    /// let numbers = process
556    ///   .source_iter(q!(vec![123, 456]))
557    ///   .batch(&tick, nondet!(/** test */));
558    /// let count = numbers.clone().count(); // Singleton
559    /// let max = numbers.max(); // Optional
560    /// count.zip(max).all_ticks()
561    /// # }, |mut stream| async move {
562    /// // [(2, 456)]
563    /// # for w in vec![(2, 456)] {
564    /// #     assert_eq!(stream.next().await.unwrap(), w);
565    /// # }
566    /// # }));
567    /// # }
568    /// ```
569    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
570    where
571        Self: ZipResult<'a, O, Location = L>,
572    {
573        check_matching_location(&self.location, &Self::other_location(&other));
574
575        if L::is_top_level()
576            && let Some(tick) = self.location.try_tick()
577        {
578            let out = zip_inside_tick(
579                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
580                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
581                    Self::other_location(&other),
582                    Self::other_ir_node(other),
583                )
584                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
585            )
586            .latest();
587
588            Self::make(out.location, out.ir_node.into_inner())
589        } else {
590            Self::make(
591                self.location.clone(),
592                HydroNode::CrossSingleton {
593                    left: Box::new(self.ir_node.into_inner()),
594                    right: Box::new(Self::other_ir_node(other)),
595                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
596                        bound: B::BOUND_KIND,
597                        element_type: stageleft::quote_type::<
598                            <Self as ZipResult<'a, O>>::ElementType,
599                        >()
600                        .into(),
601                    }),
602                },
603            )
604        }
605    }
606
607    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
608    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
609    ///
610    /// Useful for conditionally processing, such as only emitting a singleton's value outside
611    /// a tick if some other condition is satisfied.
612    ///
613    /// # Example
614    /// ```rust
615    /// # #[cfg(feature = "deploy")] {
616    /// # use hydro_lang::prelude::*;
617    /// # use futures::StreamExt;
618    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
619    /// let tick = process.tick();
620    /// // ticks are lazy by default, forces the second tick to run
621    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
622    ///
623    /// let batch_first_tick = process
624    ///   .source_iter(q!(vec![1]))
625    ///   .batch(&tick, nondet!(/** test */));
626    /// let batch_second_tick = process
627    ///   .source_iter(q!(vec![1, 2, 3]))
628    ///   .batch(&tick, nondet!(/** test */))
629    ///   .defer_tick(); // appears on the second tick
630    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
631    /// batch_first_tick.chain(batch_second_tick).count()
632    ///   .filter_if_some(some_on_first_tick)
633    ///   .all_ticks()
634    /// # }, |mut stream| async move {
635    /// // [1]
636    /// # for w in vec![1] {
637    /// #     assert_eq!(stream.next().await.unwrap(), w);
638    /// # }
639    /// # }));
640    /// # }
641    /// ```
642    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
643        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
644            .map(q!(|(d, _signal)| d))
645    }
646
647    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
648    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
649    ///
650    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
651    /// the condition.
652    ///
653    /// # Example
654    /// ```rust
655    /// # #[cfg(feature = "deploy")] {
656    /// # use hydro_lang::prelude::*;
657    /// # use futures::StreamExt;
658    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659    /// let tick = process.tick();
660    /// // ticks are lazy by default, forces the second tick to run
661    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
662    ///
663    /// let batch_first_tick = process
664    ///   .source_iter(q!(vec![1]))
665    ///   .batch(&tick, nondet!(/** test */));
666    /// let batch_second_tick = process
667    ///   .source_iter(q!(vec![1, 2, 3]))
668    ///   .batch(&tick, nondet!(/** test */))
669    ///   .defer_tick(); // appears on the second tick
670    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
671    /// batch_first_tick.chain(batch_second_tick).count()
672    ///   .filter_if_none(some_on_first_tick)
673    ///   .all_ticks()
674    /// # }, |mut stream| async move {
675    /// // [3]
676    /// # for w in vec![3] {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// # }
681    /// ```
682    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
683        self.filter_if_some(
684            other
685                .map(q!(|_| ()))
686                .into_singleton()
687                .filter(q!(|o| o.is_none())),
688        )
689    }
690
691    /// An operator which allows you to "name" a `HydroNode`.
692    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
693    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
694        {
695            let mut node = self.ir_node.borrow_mut();
696            let metadata = node.metadata_mut();
697            metadata.tag = Some(name.to_string());
698        }
699        self
700    }
701}
702
703impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
704where
705    L: Location<'a> + NoTick,
706{
707    /// Returns a singleton value corresponding to the latest snapshot of the singleton
708    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
709    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
710    /// all snapshots of this singleton into the atomic-associated tick will observe the
711    /// same value each tick.
712    ///
713    /// # Non-Determinism
714    /// Because this picks a snapshot of a singleton whose value is continuously changing,
715    /// the output singleton has a non-deterministic value since the snapshot can be at an
716    /// arbitrary point in time.
717    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
718        Singleton::new(
719            self.location.clone().tick,
720            HydroNode::Batch {
721                inner: Box::new(self.ir_node.into_inner()),
722                metadata: self
723                    .location
724                    .tick
725                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
726            },
727        )
728    }
729
730    /// Returns this singleton back into a top-level, asynchronous execution context where updates
731    /// to the value will be asynchronously propagated.
732    pub fn end_atomic(self) -> Singleton<T, L, B> {
733        Singleton::new(
734            self.location.tick.l.clone(),
735            HydroNode::EndAtomic {
736                inner: Box::new(self.ir_node.into_inner()),
737                metadata: self
738                    .location
739                    .tick
740                    .l
741                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
742            },
743        )
744    }
745}
746
747impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
748where
749    L: Location<'a>,
750{
751    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
752    /// will observe the same version of the value and will be executed synchronously before any
753    /// outputs are yielded (in [`Optional::end_atomic`]).
754    ///
755    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
756    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
757    /// a different version).
758    ///
759    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
760    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
761    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
762    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
763        let out_location = Atomic { tick: tick.clone() };
764        Singleton::new(
765            out_location.clone(),
766            HydroNode::BeginAtomic {
767                inner: Box::new(self.ir_node.into_inner()),
768                metadata: out_location
769                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
770            },
771        )
772    }
773
774    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
775    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
776    /// relevant data that contributed to the snapshot at tick `t`.
777    ///
778    /// # Non-Determinism
779    /// Because this picks a snapshot of a singleton whose value is continuously changing,
780    /// the output singleton has a non-deterministic value since the snapshot can be at an
781    /// arbitrary point in time.
782    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
783        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
784        Singleton::new(
785            tick.clone(),
786            HydroNode::Batch {
787                inner: Box::new(self.ir_node.into_inner()),
788                metadata: tick
789                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
790            },
791        )
792    }
793
794    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
795    /// with order corresponding to increasing prefixes of data contributing to the singleton.
796    ///
797    /// # Non-Determinism
798    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
799    /// to non-deterministic batching and arrival of inputs, the output stream is
800    /// non-deterministic.
801    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
802    where
803        L: NoTick,
804    {
805        sliced! {
806            let snapshot = use(self, nondet);
807            snapshot.into_stream()
808        }
809        .weakest_retries()
810    }
811
812    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
813    /// value taken at various points in time. Because the input singleton may be
814    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
815    /// represent the value of the singleton given some prefix of the streams leading up to
816    /// it.
817    ///
818    /// # Non-Determinism
819    /// The output stream is non-deterministic in which elements are sampled, since this
820    /// is controlled by a clock.
821    pub fn sample_every(
822        self,
823        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
824        nondet: NonDet,
825    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
826    where
827        L: NoTick + NoAtomic,
828    {
829        let samples = self.location.source_interval(interval, nondet);
830        sliced! {
831            let snapshot = use(self, nondet);
832            let sample_batch = use(samples, nondet);
833
834            snapshot.filter_if_some(sample_batch.first()).into_stream()
835        }
836        .weakest_retries()
837    }
838}
839
840impl<'a, T, L> Singleton<T, L, Bounded>
841where
842    L: Location<'a>,
843{
844    /// Clones this bounded singleton into a tick, returning a singleton that has the
845    /// same value as the outer singleton. Because the outer singleton is bounded, this
846    /// is deterministic because there is only a single immutable version.
847    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
848    where
849        T: Clone,
850    {
851        // TODO(shadaj): avoid printing simulator logs for this snapshot
852        self.snapshot(
853            tick,
854            nondet!(/** bounded top-level singleton so deterministic */),
855        )
856    }
857
858    /// Converts this singleton into a [`Stream`] containing a single element, the value.
859    ///
860    /// # Example
861    /// ```rust
862    /// # #[cfg(feature = "deploy")] {
863    /// # use hydro_lang::prelude::*;
864    /// # use futures::StreamExt;
865    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
866    /// let tick = process.tick();
867    /// let batch_input = process
868    ///   .source_iter(q!(vec![123, 456]))
869    ///   .batch(&tick, nondet!(/** test */));
870    /// batch_input.clone().chain(
871    ///   batch_input.count().into_stream()
872    /// ).all_ticks()
873    /// # }, |mut stream| async move {
874    /// // [123, 456, 2]
875    /// # for w in vec![123, 456, 2] {
876    /// #     assert_eq!(stream.next().await.unwrap(), w);
877    /// # }
878    /// # }));
879    /// # }
880    /// ```
881    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
882        Stream::new(
883            self.location.clone(),
884            HydroNode::Cast {
885                inner: Box::new(self.ir_node.into_inner()),
886                metadata: self.location.new_node_metadata(Stream::<
887                    T,
888                    Tick<L>,
889                    Bounded,
890                    TotalOrder,
891                    ExactlyOnce,
892                >::collection_kind()),
893            },
894        )
895    }
896}
897
898impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
899where
900    L: Location<'a>,
901{
902    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
903    /// which will stream the value computed in _each_ tick as a separate stream element.
904    ///
905    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
906    /// producing one element in the output for each tick. This is useful for batched computations,
907    /// where the results from each tick must be combined together.
908    ///
909    /// # Example
910    /// ```rust
911    /// # #[cfg(feature = "deploy")] {
912    /// # use hydro_lang::prelude::*;
913    /// # use futures::StreamExt;
914    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
915    /// let tick = process.tick();
916    /// # // ticks are lazy by default, forces the second tick to run
917    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
918    /// # let batch_first_tick = process
919    /// #   .source_iter(q!(vec![1]))
920    /// #   .batch(&tick, nondet!(/** test */));
921    /// # let batch_second_tick = process
922    /// #   .source_iter(q!(vec![1, 2, 3]))
923    /// #   .batch(&tick, nondet!(/** test */))
924    /// #   .defer_tick(); // appears on the second tick
925    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
926    /// input_batch // first tick: [1], second tick: [1, 2, 3]
927    ///     .count()
928    ///     .all_ticks()
929    /// # }, |mut stream| async move {
930    /// // [1, 3]
931    /// # for w in vec![1, 3] {
932    /// #     assert_eq!(stream.next().await.unwrap(), w);
933    /// # }
934    /// # }));
935    /// # }
936    /// ```
937    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
938        self.into_stream().all_ticks()
939    }
940
941    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
942    /// which will stream the value computed in _each_ tick as a separate stream element.
943    ///
944    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
945    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
946    /// singleton's [`Tick`] context.
947    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
948        self.into_stream().all_ticks_atomic()
949    }
950
951    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
952    /// be asynchronously updated with the latest value of the singleton inside the tick.
953    ///
954    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
955    /// tick that tracks the inner value. This is useful for getting the value as of the
956    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
957    ///
958    /// # Example
959    /// ```rust
960    /// # #[cfg(feature = "deploy")] {
961    /// # use hydro_lang::prelude::*;
962    /// # use futures::StreamExt;
963    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
964    /// let tick = process.tick();
965    /// # // ticks are lazy by default, forces the second tick to run
966    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
967    /// # let batch_first_tick = process
968    /// #   .source_iter(q!(vec![1]))
969    /// #   .batch(&tick, nondet!(/** test */));
970    /// # let batch_second_tick = process
971    /// #   .source_iter(q!(vec![1, 2, 3]))
972    /// #   .batch(&tick, nondet!(/** test */))
973    /// #   .defer_tick(); // appears on the second tick
974    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
975    /// input_batch // first tick: [1], second tick: [1, 2, 3]
976    ///     .count()
977    ///     .latest()
978    /// # .sample_eager(nondet!(/** test */))
979    /// # }, |mut stream| async move {
980    /// // asynchronously changes from 1 ~> 3
981    /// # for w in vec![1, 3] {
982    /// #     assert_eq!(stream.next().await.unwrap(), w);
983    /// # }
984    /// # }));
985    /// # }
986    /// ```
987    pub fn latest(self) -> Singleton<T, L, Unbounded> {
988        Singleton::new(
989            self.location.outer().clone(),
990            HydroNode::YieldConcat {
991                inner: Box::new(self.ir_node.into_inner()),
992                metadata: self
993                    .location
994                    .outer()
995                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
996            },
997        )
998    }
999
1000    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1001    /// be updated with the latest value of the singleton inside the tick.
1002    ///
1003    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1004    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1005    /// singleton's [`Tick`] context.
1006    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1007        let out_location = Atomic {
1008            tick: self.location.clone(),
1009        };
1010        Singleton::new(
1011            out_location.clone(),
1012            HydroNode::YieldConcat {
1013                inner: Box::new(self.ir_node.into_inner()),
1014                metadata: out_location
1015                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1016            },
1017        )
1018    }
1019}
1020
1021#[doc(hidden)]
1022/// Helper trait that determines the output collection type for [`Singleton::zip`].
1023///
1024/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1025/// [`Singleton`].
1026#[sealed::sealed]
1027pub trait ZipResult<'a, Other> {
1028    /// The output collection type.
1029    type Out;
1030    /// The type of the tupled output value.
1031    type ElementType;
1032    /// The type of the other collection's value.
1033    type OtherType;
1034    /// The location where the tupled result will be materialized.
1035    type Location: Location<'a>;
1036
1037    /// The location of the second input to the `zip`.
1038    fn other_location(other: &Other) -> Self::Location;
1039    /// The IR node of the second input to the `zip`.
1040    fn other_ir_node(other: Other) -> HydroNode;
1041
1042    /// Constructs the output live collection given an IR node containing the zip result.
1043    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1044}
1045
1046#[sealed::sealed]
1047impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1048where
1049    L: Location<'a>,
1050{
1051    type Out = Singleton<(T, U), L, B>;
1052    type ElementType = (T, U);
1053    type OtherType = U;
1054    type Location = L;
1055
1056    fn other_location(other: &Singleton<U, L, B>) -> L {
1057        other.location.clone()
1058    }
1059
1060    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1061        other.ir_node.into_inner()
1062    }
1063
1064    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1065        Singleton::new(
1066            location.clone(),
1067            HydroNode::Cast {
1068                inner: Box::new(ir_node),
1069                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1070            },
1071        )
1072    }
1073}
1074
1075#[sealed::sealed]
1076impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1077where
1078    L: Location<'a>,
1079{
1080    type Out = Optional<(T, U), L, B>;
1081    type ElementType = (T, U);
1082    type OtherType = U;
1083    type Location = L;
1084
1085    fn other_location(other: &Optional<U, L, B>) -> L {
1086        other.location.clone()
1087    }
1088
1089    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1090        other.ir_node.into_inner()
1091    }
1092
1093    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1094        Optional::new(location, ir_node)
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    #[cfg(feature = "deploy")]
1101    use futures::{SinkExt, StreamExt};
1102    #[cfg(feature = "deploy")]
1103    use hydro_deploy::Deployment;
1104    #[cfg(any(feature = "deploy", feature = "sim"))]
1105    use stageleft::q;
1106
1107    #[cfg(any(feature = "deploy", feature = "sim"))]
1108    use crate::compile::builder::FlowBuilder;
1109    #[cfg(feature = "deploy")]
1110    use crate::live_collections::stream::ExactlyOnce;
1111    #[cfg(any(feature = "deploy", feature = "sim"))]
1112    use crate::location::Location;
1113    #[cfg(any(feature = "deploy", feature = "sim"))]
1114    use crate::nondet::nondet;
1115
1116    #[cfg(feature = "deploy")]
1117    #[tokio::test]
1118    async fn tick_cycle_cardinality() {
1119        let mut deployment = Deployment::new();
1120
1121        let flow = FlowBuilder::new();
1122        let node = flow.process::<()>();
1123        let external = flow.external::<()>();
1124
1125        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1126
1127        let node_tick = node.tick();
1128        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1129        let counts = singleton
1130            .clone()
1131            .into_stream()
1132            .count()
1133            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1134            .all_ticks()
1135            .send_bincode_external(&external);
1136        complete_cycle.complete_next_tick(singleton);
1137
1138        let nodes = flow
1139            .with_process(&node, deployment.Localhost())
1140            .with_external(&external, deployment.Localhost())
1141            .deploy(&mut deployment);
1142
1143        deployment.deploy().await.unwrap();
1144
1145        let mut tick_trigger = nodes.connect(input_send).await;
1146        let mut external_out = nodes.connect(counts).await;
1147
1148        deployment.start().await.unwrap();
1149
1150        tick_trigger.send(()).await.unwrap();
1151
1152        assert_eq!(external_out.next().await.unwrap(), 1);
1153
1154        tick_trigger.send(()).await.unwrap();
1155
1156        assert_eq!(external_out.next().await.unwrap(), 1);
1157    }
1158
1159    #[cfg(feature = "sim")]
1160    #[test]
1161    #[should_panic]
1162    fn sim_fold_intermediate_states() {
1163        let flow = FlowBuilder::new();
1164        let node = flow.process::<()>();
1165
1166        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1167        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1168
1169        let tick = node.tick();
1170        let batch = folded.snapshot(&tick, nondet!(/** test */));
1171        let out_recv = batch.all_ticks().sim_output();
1172
1173        flow.sim().exhaustive(async || {
1174            assert_eq!(out_recv.next().await.unwrap(), 10);
1175        });
1176    }
1177
1178    #[cfg(feature = "sim")]
1179    #[test]
1180    fn sim_fold_intermediate_state_count() {
1181        let flow = FlowBuilder::new();
1182        let node = flow.process::<()>();
1183
1184        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1185        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1186
1187        let tick = node.tick();
1188        let batch = folded.snapshot(&tick, nondet!(/** test */));
1189        let out_recv = batch.all_ticks().sim_output();
1190
1191        let instance_count = flow.sim().exhaustive(async || {
1192            let out = out_recv.collect::<Vec<_>>().await;
1193            assert_eq!(out.last(), Some(&10));
1194        });
1195
1196        assert_eq!(
1197            instance_count,
1198            16 // 2^4 possible subsets of intermediates (including initial state)
1199        )
1200    }
1201
1202    #[cfg(feature = "sim")]
1203    #[test]
1204    fn sim_fold_no_repeat_initial() {
1205        // check that we don't repeat the initial state of the fold in autonomous decisions
1206
1207        let flow = FlowBuilder::new();
1208        let node = flow.process::<()>();
1209
1210        let (in_port, input) = node.sim_input();
1211        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1212
1213        let tick = node.tick();
1214        let batch = folded.snapshot(&tick, nondet!(/** test */));
1215        let out_recv = batch.all_ticks().sim_output();
1216
1217        flow.sim().exhaustive(async || {
1218            assert_eq!(out_recv.next().await.unwrap(), 0);
1219
1220            in_port.send(123);
1221
1222            assert_eq!(out_recv.next().await.unwrap(), 123);
1223        });
1224    }
1225
1226    #[cfg(feature = "sim")]
1227    #[test]
1228    #[should_panic]
1229    fn sim_fold_repeats_snapshots() {
1230        // when the tick is driven by a snapshot AND something else, the snapshot can
1231        // "stutter" and repeat the same state multiple times
1232
1233        let flow = FlowBuilder::new();
1234        let node = flow.process::<()>();
1235
1236        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1237        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1238
1239        let tick = node.tick();
1240        let batch = source
1241            .batch(&tick, nondet!(/** test */))
1242            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1243        let out_recv = batch.all_ticks().sim_output();
1244
1245        flow.sim().exhaustive(async || {
1246            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1247            {
1248                panic!("repeated snapshot");
1249            }
1250        });
1251    }
1252
1253    #[cfg(feature = "sim")]
1254    #[test]
1255    fn sim_fold_repeats_snapshots_count() {
1256        // check the number of instances
1257        let flow = FlowBuilder::new();
1258        let node = flow.process::<()>();
1259
1260        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1261        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1262
1263        let tick = node.tick();
1264        let batch = source
1265            .batch(&tick, nondet!(/** test */))
1266            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1267        let out_recv = batch.all_ticks().sim_output();
1268
1269        let count = flow.sim().exhaustive(async || {
1270            let _ = out_recv.collect::<Vec<_>>().await;
1271        });
1272
1273        assert_eq!(count, 52);
1274        // don't have a combinatorial explanation for this number yet, but checked via logs
1275    }
1276
1277    #[cfg(feature = "sim")]
1278    #[test]
1279    fn sim_top_level_singleton_exhaustive() {
1280        // ensures that top-level singletons have only one snapshot
1281        let flow = FlowBuilder::new();
1282        let node = flow.process::<()>();
1283
1284        let singleton = node.singleton(q!(1));
1285        let tick = node.tick();
1286        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1287        let out_recv = batch.all_ticks().sim_output();
1288
1289        let count = flow.sim().exhaustive(async || {
1290            let _ = out_recv.collect::<Vec<_>>().await;
1291        });
1292
1293        assert_eq!(count, 1);
1294    }
1295
1296    #[cfg(feature = "sim")]
1297    #[test]
1298    fn sim_top_level_singleton_join_count() {
1299        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1300        // exploration
1301
1302        let flow = FlowBuilder::new();
1303        let node = flow.process::<()>();
1304
1305        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1306        let tick = node.tick();
1307        let batch = source_iter
1308            .batch(&tick, nondet!(/** test */))
1309            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1310        let out_recv = batch.all_ticks().sim_output();
1311
1312        let instance_count = flow.sim().exhaustive(async || {
1313            let _ = out_recv.collect::<Vec<_>>().await;
1314        });
1315
1316        assert_eq!(
1317            instance_count,
1318            16 // 2^4 ways to split up (including a possibly empty first batch)
1319        )
1320    }
1321
1322    #[cfg(feature = "sim")]
1323    #[test]
1324    fn top_level_singleton_into_stream_no_replay() {
1325        let flow = FlowBuilder::new();
1326        let node = flow.process::<()>();
1327
1328        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1329        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1330
1331        let out_recv = folded.into_stream().sim_output();
1332
1333        flow.sim().exhaustive(async || {
1334            out_recv.assert_yields_only([10]).await;
1335        });
1336    }
1337}