Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A single Rust value that can asynchronously change over time.
25///
26/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
27/// [`Unbounded`], the value will asynchronously change over time.
28///
29/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
30/// a single number that will asynchronously change as events are processed. Singletons also appear
31/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
32/// such as getting the length of a batch of requests.
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this singleton
36/// - `Loc`: the [`Location`] where the singleton is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Singleton<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41
42    _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
46where
47    T: Clone,
48    L: Location<'a> + NoTick,
49{
50    fn from(value: Singleton<T, L, Bounded>) -> Self {
51        let tick = value.location().tick();
52        value.clone_into_tick(&tick).latest()
53    }
54}
55
56impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
57where
58    L: Location<'a>,
59{
60    type Location = Tick<L>;
61
62    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
63        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
64            location.clone(),
65            HydroNode::DeferTick {
66                input: Box::new(HydroNode::CycleSource {
67                    ident,
68                    metadata: location.new_node_metadata(Self::collection_kind()),
69                }),
70                metadata: location
71                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
72            },
73        );
74
75        from_previous_tick.unwrap_or(initial)
76    }
77}
78
79impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
80where
81    L: Location<'a>,
82{
83    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
84        assert_eq!(
85            Location::id(&self.location),
86            expected_location,
87            "locations do not match"
88        );
89        self.location
90            .flow_state()
91            .borrow_mut()
92            .push_root(HydroRoot::CycleSink {
93                ident,
94                input: Box::new(self.ir_node.into_inner()),
95                op_metadata: HydroIrOpMetadata::new(),
96            });
97    }
98}
99
100impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
101where
102    L: Location<'a>,
103{
104    type Location = Tick<L>;
105
106    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
107        Singleton::new(
108            location.clone(),
109            HydroNode::CycleSource {
110                ident,
111                metadata: location.new_node_metadata(Self::collection_kind()),
112            },
113        )
114    }
115}
116
117impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
118where
119    L: Location<'a>,
120{
121    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
122        assert_eq!(
123            Location::id(&self.location),
124            expected_location,
125            "locations do not match"
126        );
127        self.location
128            .flow_state()
129            .borrow_mut()
130            .push_root(HydroRoot::CycleSink {
131                ident,
132                input: Box::new(self.ir_node.into_inner()),
133                op_metadata: HydroIrOpMetadata::new(),
134            });
135    }
136}
137
138impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
139where
140    L: Location<'a> + NoTick,
141{
142    type Location = L;
143
144    fn create_source(ident: syn::Ident, location: L) -> Self {
145        Singleton::new(
146            location.clone(),
147            HydroNode::CycleSource {
148                ident,
149                metadata: location.new_node_metadata(Self::collection_kind()),
150            },
151        )
152    }
153}
154
155impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
156where
157    L: Location<'a> + NoTick,
158{
159    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
160        assert_eq!(
161            Location::id(&self.location),
162            expected_location,
163            "locations do not match"
164        );
165        self.location
166            .flow_state()
167            .borrow_mut()
168            .push_root(HydroRoot::CycleSink {
169                ident,
170                input: Box::new(self.ir_node.into_inner()),
171                op_metadata: HydroIrOpMetadata::new(),
172            });
173    }
174}
175
176impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
177where
178    T: Clone,
179    L: Location<'a>,
180{
181    fn clone(&self) -> Self {
182        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
183            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
184            *self.ir_node.borrow_mut() = HydroNode::Tee {
185                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
186                metadata: self.location.new_node_metadata(Self::collection_kind()),
187            };
188        }
189
190        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
191            Singleton {
192                location: self.location.clone(),
193                ir_node: HydroNode::Tee {
194                    inner: TeeNode(inner.0.clone()),
195                    metadata: metadata.clone(),
196                }
197                .into(),
198                _phantom: PhantomData,
199            }
200        } else {
201            unreachable!()
202        }
203    }
204}
205
206#[cfg(stageleft_runtime)]
207fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
208    me: Singleton<T, Tick<L>, B>,
209    other: O,
210) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
211where
212    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
213{
214    check_matching_location(
215        &me.location,
216        &Singleton::<T, Tick<L>, B>::other_location(&other),
217    );
218
219    Singleton::<T, Tick<L>, B>::make(
220        me.location.clone(),
221        HydroNode::CrossSingleton {
222            left: Box::new(me.ir_node.into_inner()),
223            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
224            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
225                bound: B::BOUND_KIND,
226                element_type: stageleft::quote_type::<
227                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
228                >()
229                .into(),
230            }),
231        },
232    )
233}
234
235impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
236where
237    L: Location<'a>,
238{
239    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
240        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
241        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
242        Singleton {
243            location,
244            ir_node: RefCell::new(ir_node),
245            _phantom: PhantomData,
246        }
247    }
248
249    pub(crate) fn collection_kind() -> CollectionKind {
250        CollectionKind::Singleton {
251            bound: B::BOUND_KIND,
252            element_type: stageleft::quote_type::<T>().into(),
253        }
254    }
255
256    /// Returns the [`Location`] where this singleton is being materialized.
257    pub fn location(&self) -> &L {
258        &self.location
259    }
260
261    /// Transforms the singleton value by applying a function `f` to it,
262    /// continuously as the input is updated.
263    ///
264    /// # Example
265    /// ```rust
266    /// # #[cfg(feature = "deploy")] {
267    /// # use hydro_lang::prelude::*;
268    /// # use futures::StreamExt;
269    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
270    /// let tick = process.tick();
271    /// let singleton = tick.singleton(q!(5));
272    /// singleton.map(q!(|v| v * 2)).all_ticks()
273    /// # }, |mut stream| async move {
274    /// // 10
275    /// # assert_eq!(stream.next().await.unwrap(), 10);
276    /// # }));
277    /// # }
278    /// ```
279    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
280    where
281        F: Fn(T) -> U + 'a,
282    {
283        let f = f.splice_fn1_ctx(&self.location).into();
284        Singleton::new(
285            self.location.clone(),
286            HydroNode::Map {
287                f,
288                input: Box::new(self.ir_node.into_inner()),
289                metadata: self
290                    .location
291                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
292            },
293        )
294    }
295
296    /// Transforms the singleton value by applying a function `f` to it and then flattening
297    /// the result into a stream, preserving the order of elements.
298    ///
299    /// The function `f` is applied to the singleton value to produce an iterator, and all items
300    /// from that iterator are emitted in the output stream in deterministic order.
301    ///
302    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
303    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
304    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
305    ///
306    /// # Example
307    /// ```rust
308    /// # #[cfg(feature = "deploy")] {
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312    /// let tick = process.tick();
313    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
314    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
315    /// # }, |mut stream| async move {
316    /// // 1, 2, 3
317    /// # for w in vec![1, 2, 3] {
318    /// #     assert_eq!(stream.next().await.unwrap(), w);
319    /// # }
320    /// # }));
321    /// # }
322    /// ```
323    pub fn flat_map_ordered<U, I, F>(
324        self,
325        f: impl IntoQuotedMut<'a, F, L>,
326    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
327    where
328        I: IntoIterator<Item = U>,
329        F: Fn(T) -> I + 'a,
330    {
331        let f = f.splice_fn1_ctx(&self.location).into();
332        Stream::new(
333            self.location.clone(),
334            HydroNode::FlatMap {
335                f,
336                input: Box::new(self.ir_node.into_inner()),
337                metadata: self.location.new_node_metadata(
338                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
339                ),
340            },
341        )
342    }
343
344    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
345    /// for the output type `I` to produce items in any order.
346    ///
347    /// The function `f` is applied to the singleton value to produce an iterator, and all items
348    /// from that iterator are emitted in the output stream in non-deterministic order.
349    ///
350    /// # Example
351    /// ```rust
352    /// # #[cfg(feature = "deploy")] {
353    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
354    /// # use futures::StreamExt;
355    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
356    /// let tick = process.tick();
357    /// let singleton = tick.singleton(q!(
358    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
359    /// ));
360    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
361    /// # }, |mut stream| async move {
362    /// // 1, 2, 3, but in no particular order
363    /// # let mut results = Vec::new();
364    /// # for _ in 0..3 {
365    /// #     results.push(stream.next().await.unwrap());
366    /// # }
367    /// # results.sort();
368    /// # assert_eq!(results, vec![1, 2, 3]);
369    /// # }));
370    /// # }
371    /// ```
372    pub fn flat_map_unordered<U, I, F>(
373        self,
374        f: impl IntoQuotedMut<'a, F, L>,
375    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
376    where
377        I: IntoIterator<Item = U>,
378        F: Fn(T) -> I + 'a,
379    {
380        let f = f.splice_fn1_ctx(&self.location).into();
381        Stream::new(
382            self.location.clone(),
383            HydroNode::FlatMap {
384                f,
385                input: Box::new(self.ir_node.into_inner()),
386                metadata: self
387                    .location
388                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
389            },
390        )
391    }
392
393    /// Flattens the singleton value into a stream, preserving the order of elements.
394    ///
395    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
396    /// are emitted in the output stream in deterministic order.
397    ///
398    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
399    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
400    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
401    ///
402    /// # Example
403    /// ```rust
404    /// # #[cfg(feature = "deploy")] {
405    /// # use hydro_lang::prelude::*;
406    /// # use futures::StreamExt;
407    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
408    /// let tick = process.tick();
409    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
410    /// singleton.flatten_ordered().all_ticks()
411    /// # }, |mut stream| async move {
412    /// // 1, 2, 3
413    /// # for w in vec![1, 2, 3] {
414    /// #     assert_eq!(stream.next().await.unwrap(), w);
415    /// # }
416    /// # }));
417    /// # }
418    /// ```
419    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
420    where
421        T: IntoIterator<Item = U>,
422    {
423        self.flat_map_ordered(q!(|x| x))
424    }
425
426    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
427    /// for the element type `T` to produce items in any order.
428    ///
429    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
430    /// are emitted in the output stream in non-deterministic order.
431    ///
432    /// # Example
433    /// ```rust
434    /// # #[cfg(feature = "deploy")] {
435    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
436    /// # use futures::StreamExt;
437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
438    /// let tick = process.tick();
439    /// let singleton = tick.singleton(q!(
440    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
441    /// ));
442    /// singleton.flatten_unordered().all_ticks()
443    /// # }, |mut stream| async move {
444    /// // 1, 2, 3, but in no particular order
445    /// # let mut results = Vec::new();
446    /// # for _ in 0..3 {
447    /// #     results.push(stream.next().await.unwrap());
448    /// # }
449    /// # results.sort();
450    /// # assert_eq!(results, vec![1, 2, 3]);
451    /// # }));
452    /// # }
453    /// ```
454    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
455    where
456        T: IntoIterator<Item = U>,
457    {
458        self.flat_map_unordered(q!(|x| x))
459    }
460
461    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
462    ///
463    /// If the predicate returns `true`, the output optional contains the same value.
464    /// If the predicate returns `false`, the output optional is empty.
465    ///
466    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
467    /// not modify or take ownership of the value. If you need to modify the value while filtering
468    /// use [`Singleton::filter_map`] instead.
469    ///
470    /// # Example
471    /// ```rust
472    /// # #[cfg(feature = "deploy")] {
473    /// # use hydro_lang::prelude::*;
474    /// # use futures::StreamExt;
475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
476    /// let tick = process.tick();
477    /// let singleton = tick.singleton(q!(5));
478    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
479    /// # }, |mut stream| async move {
480    /// // 5
481    /// # assert_eq!(stream.next().await.unwrap(), 5);
482    /// # }));
483    /// # }
484    /// ```
485    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
486    where
487        F: Fn(&T) -> bool + 'a,
488    {
489        let f = f.splice_fn1_borrow_ctx(&self.location).into();
490        Optional::new(
491            self.location.clone(),
492            HydroNode::Filter {
493                f,
494                input: Box::new(self.ir_node.into_inner()),
495                metadata: self
496                    .location
497                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
498            },
499        )
500    }
501
502    /// An operator that both filters and maps. It yields the value only if the supplied
503    /// closure `f` returns `Some(value)`.
504    ///
505    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
506    /// If the closure returns `None`, the output optional is empty.
507    ///
508    /// # Example
509    /// ```rust
510    /// # #[cfg(feature = "deploy")] {
511    /// # use hydro_lang::prelude::*;
512    /// # use futures::StreamExt;
513    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
514    /// let tick = process.tick();
515    /// let singleton = tick.singleton(q!("42"));
516    /// singleton
517    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
518    ///     .all_ticks()
519    /// # }, |mut stream| async move {
520    /// // 42
521    /// # assert_eq!(stream.next().await.unwrap(), 42);
522    /// # }));
523    /// # }
524    /// ```
525    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
526    where
527        F: Fn(T) -> Option<U> + 'a,
528    {
529        let f = f.splice_fn1_ctx(&self.location).into();
530        Optional::new(
531            self.location.clone(),
532            HydroNode::FilterMap {
533                f,
534                input: Box::new(self.ir_node.into_inner()),
535                metadata: self
536                    .location
537                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
538            },
539        )
540    }
541
542    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
543    ///
544    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
545    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
546    /// non-null. This is useful for combining several pieces of state together.
547    ///
548    /// # Example
549    /// ```rust
550    /// # #[cfg(feature = "deploy")] {
551    /// # use hydro_lang::prelude::*;
552    /// # use futures::StreamExt;
553    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
554    /// let tick = process.tick();
555    /// let numbers = process
556    ///   .source_iter(q!(vec![123, 456]))
557    ///   .batch(&tick, nondet!(/** test */));
558    /// let count = numbers.clone().count(); // Singleton
559    /// let max = numbers.max(); // Optional
560    /// count.zip(max).all_ticks()
561    /// # }, |mut stream| async move {
562    /// // [(2, 456)]
563    /// # for w in vec![(2, 456)] {
564    /// #     assert_eq!(stream.next().await.unwrap(), w);
565    /// # }
566    /// # }));
567    /// # }
568    /// ```
569    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
570    where
571        Self: ZipResult<'a, O, Location = L>,
572    {
573        check_matching_location(&self.location, &Self::other_location(&other));
574
575        if L::is_top_level()
576            && let Some(tick) = self.location.try_tick()
577        {
578            let out = zip_inside_tick(
579                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
580                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
581                    Self::other_location(&other),
582                    Self::other_ir_node(other),
583                )
584                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
585            )
586            .latest();
587
588            Self::make(out.location, out.ir_node.into_inner())
589        } else {
590            Self::make(
591                self.location.clone(),
592                HydroNode::CrossSingleton {
593                    left: Box::new(self.ir_node.into_inner()),
594                    right: Box::new(Self::other_ir_node(other)),
595                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
596                        bound: B::BOUND_KIND,
597                        element_type: stageleft::quote_type::<
598                            <Self as ZipResult<'a, O>>::ElementType,
599                        >()
600                        .into(),
601                    }),
602                },
603            )
604        }
605    }
606
607    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
608    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
609    ///
610    /// Useful for conditionally processing, such as only emitting a singleton's value outside
611    /// a tick if some other condition is satisfied.
612    ///
613    /// # Example
614    /// ```rust
615    /// # #[cfg(feature = "deploy")] {
616    /// # use hydro_lang::prelude::*;
617    /// # use futures::StreamExt;
618    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
619    /// let tick = process.tick();
620    /// // ticks are lazy by default, forces the second tick to run
621    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
622    ///
623    /// let batch_first_tick = process
624    ///   .source_iter(q!(vec![1]))
625    ///   .batch(&tick, nondet!(/** test */));
626    /// let batch_second_tick = process
627    ///   .source_iter(q!(vec![1, 2, 3]))
628    ///   .batch(&tick, nondet!(/** test */))
629    ///   .defer_tick(); // appears on the second tick
630    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
631    /// batch_first_tick.chain(batch_second_tick).count()
632    ///   .filter_if_some(some_on_first_tick)
633    ///   .all_ticks()
634    /// # }, |mut stream| async move {
635    /// // [1]
636    /// # for w in vec![1] {
637    /// #     assert_eq!(stream.next().await.unwrap(), w);
638    /// # }
639    /// # }));
640    /// # }
641    /// ```
642    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
643        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
644            .map(q!(|(d, _signal)| d))
645    }
646
647    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
648    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
649    ///
650    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
651    /// the condition.
652    ///
653    /// # Example
654    /// ```rust
655    /// # #[cfg(feature = "deploy")] {
656    /// # use hydro_lang::prelude::*;
657    /// # use futures::StreamExt;
658    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659    /// let tick = process.tick();
660    /// // ticks are lazy by default, forces the second tick to run
661    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
662    ///
663    /// let batch_first_tick = process
664    ///   .source_iter(q!(vec![1]))
665    ///   .batch(&tick, nondet!(/** test */));
666    /// let batch_second_tick = process
667    ///   .source_iter(q!(vec![1, 2, 3]))
668    ///   .batch(&tick, nondet!(/** test */))
669    ///   .defer_tick(); // appears on the second tick
670    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
671    /// batch_first_tick.chain(batch_second_tick).count()
672    ///   .filter_if_none(some_on_first_tick)
673    ///   .all_ticks()
674    /// # }, |mut stream| async move {
675    /// // [3]
676    /// # for w in vec![3] {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// # }
681    /// ```
682    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
683        self.filter_if_some(
684            other
685                .map(q!(|_| ()))
686                .into_singleton()
687                .filter(q!(|o| o.is_none())),
688        )
689    }
690
691    /// An operator which allows you to "name" a `HydroNode`.
692    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
693    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
694        {
695            let mut node = self.ir_node.borrow_mut();
696            let metadata = node.metadata_mut();
697            metadata.tag = Some(name.to_owned());
698        }
699        self
700    }
701}
702
703impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
704where
705    L: Location<'a>,
706{
707    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
708    /// the inner `Option`.
709    ///
710    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
711    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
712    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
713    ///
714    /// # Example
715    /// ```rust
716    /// # #[cfg(feature = "deploy")] {
717    /// # use hydro_lang::prelude::*;
718    /// # use futures::StreamExt;
719    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
720    /// let tick = process.tick();
721    /// let singleton = tick.singleton(q!(Some(42)));
722    /// singleton.into_optional().all_ticks()
723    /// # }, |mut stream| async move {
724    /// // 42
725    /// # assert_eq!(stream.next().await.unwrap(), 42);
726    /// # }));
727    /// # }
728    /// ```
729    pub fn into_optional(self) -> Optional<T, L, B> {
730        self.filter_map(q!(|v| v))
731    }
732}
733
734impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
735where
736    L: Location<'a> + NoTick,
737{
738    /// Returns a singleton value corresponding to the latest snapshot of the singleton
739    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
740    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
741    /// all snapshots of this singleton into the atomic-associated tick will observe the
742    /// same value each tick.
743    ///
744    /// # Non-Determinism
745    /// Because this picks a snapshot of a singleton whose value is continuously changing,
746    /// the output singleton has a non-deterministic value since the snapshot can be at an
747    /// arbitrary point in time.
748    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
749        Singleton::new(
750            self.location.clone().tick,
751            HydroNode::Batch {
752                inner: Box::new(self.ir_node.into_inner()),
753                metadata: self
754                    .location
755                    .tick
756                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
757            },
758        )
759    }
760
761    /// Returns this singleton back into a top-level, asynchronous execution context where updates
762    /// to the value will be asynchronously propagated.
763    pub fn end_atomic(self) -> Singleton<T, L, B> {
764        Singleton::new(
765            self.location.tick.l.clone(),
766            HydroNode::EndAtomic {
767                inner: Box::new(self.ir_node.into_inner()),
768                metadata: self
769                    .location
770                    .tick
771                    .l
772                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
773            },
774        )
775    }
776}
777
778impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
779where
780    L: Location<'a>,
781{
782    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
783    /// will observe the same version of the value and will be executed synchronously before any
784    /// outputs are yielded (in [`Optional::end_atomic`]).
785    ///
786    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
787    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
788    /// a different version).
789    ///
790    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
791    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
792    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
793    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
794        let out_location = Atomic { tick: tick.clone() };
795        Singleton::new(
796            out_location.clone(),
797            HydroNode::BeginAtomic {
798                inner: Box::new(self.ir_node.into_inner()),
799                metadata: out_location
800                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
801            },
802        )
803    }
804
805    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
806    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
807    /// relevant data that contributed to the snapshot at tick `t`.
808    ///
809    /// # Non-Determinism
810    /// Because this picks a snapshot of a singleton whose value is continuously changing,
811    /// the output singleton has a non-deterministic value since the snapshot can be at an
812    /// arbitrary point in time.
813    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
814        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
815        Singleton::new(
816            tick.clone(),
817            HydroNode::Batch {
818                inner: Box::new(self.ir_node.into_inner()),
819                metadata: tick
820                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
821            },
822        )
823    }
824
825    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
826    /// with order corresponding to increasing prefixes of data contributing to the singleton.
827    ///
828    /// # Non-Determinism
829    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
830    /// to non-deterministic batching and arrival of inputs, the output stream is
831    /// non-deterministic.
832    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
833    where
834        L: NoTick,
835    {
836        sliced! {
837            let snapshot = use(self, nondet);
838            snapshot.into_stream()
839        }
840        .weaken_retries()
841    }
842
843    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
844    /// value taken at various points in time. Because the input singleton may be
845    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
846    /// represent the value of the singleton given some prefix of the streams leading up to
847    /// it.
848    ///
849    /// # Non-Determinism
850    /// The output stream is non-deterministic in which elements are sampled, since this
851    /// is controlled by a clock.
852    pub fn sample_every(
853        self,
854        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
855        nondet: NonDet,
856    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
857    where
858        L: NoTick + NoAtomic,
859    {
860        let samples = self.location.source_interval(interval, nondet);
861        sliced! {
862            let snapshot = use(self, nondet);
863            let sample_batch = use(samples, nondet);
864
865            snapshot.filter_if_some(sample_batch.first()).into_stream()
866        }
867        .weaken_retries()
868    }
869}
870
871impl<'a, T, L> Singleton<T, L, Bounded>
872where
873    L: Location<'a>,
874{
875    /// Clones this bounded singleton into a tick, returning a singleton that has the
876    /// same value as the outer singleton. Because the outer singleton is bounded, this
877    /// is deterministic because there is only a single immutable version.
878    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
879    where
880        T: Clone,
881    {
882        // TODO(shadaj): avoid printing simulator logs for this snapshot
883        self.snapshot(
884            tick,
885            nondet!(/** bounded top-level singleton so deterministic */),
886        )
887    }
888
889    /// Converts this singleton into a [`Stream`] containing a single element, the value.
890    ///
891    /// # Example
892    /// ```rust
893    /// # #[cfg(feature = "deploy")] {
894    /// # use hydro_lang::prelude::*;
895    /// # use futures::StreamExt;
896    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
897    /// let tick = process.tick();
898    /// let batch_input = process
899    ///   .source_iter(q!(vec![123, 456]))
900    ///   .batch(&tick, nondet!(/** test */));
901    /// batch_input.clone().chain(
902    ///   batch_input.count().into_stream()
903    /// ).all_ticks()
904    /// # }, |mut stream| async move {
905    /// // [123, 456, 2]
906    /// # for w in vec![123, 456, 2] {
907    /// #     assert_eq!(stream.next().await.unwrap(), w);
908    /// # }
909    /// # }));
910    /// # }
911    /// ```
912    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
913        Stream::new(
914            self.location.clone(),
915            HydroNode::Cast {
916                inner: Box::new(self.ir_node.into_inner()),
917                metadata: self.location.new_node_metadata(Stream::<
918                    T,
919                    Tick<L>,
920                    Bounded,
921                    TotalOrder,
922                    ExactlyOnce,
923                >::collection_kind()),
924            },
925        )
926    }
927}
928
929impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
930where
931    L: Location<'a>,
932{
933    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
934    /// which will stream the value computed in _each_ tick as a separate stream element.
935    ///
936    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
937    /// producing one element in the output for each tick. This is useful for batched computations,
938    /// where the results from each tick must be combined together.
939    ///
940    /// # Example
941    /// ```rust
942    /// # #[cfg(feature = "deploy")] {
943    /// # use hydro_lang::prelude::*;
944    /// # use futures::StreamExt;
945    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
946    /// let tick = process.tick();
947    /// # // ticks are lazy by default, forces the second tick to run
948    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
949    /// # let batch_first_tick = process
950    /// #   .source_iter(q!(vec![1]))
951    /// #   .batch(&tick, nondet!(/** test */));
952    /// # let batch_second_tick = process
953    /// #   .source_iter(q!(vec![1, 2, 3]))
954    /// #   .batch(&tick, nondet!(/** test */))
955    /// #   .defer_tick(); // appears on the second tick
956    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
957    /// input_batch // first tick: [1], second tick: [1, 2, 3]
958    ///     .count()
959    ///     .all_ticks()
960    /// # }, |mut stream| async move {
961    /// // [1, 3]
962    /// # for w in vec![1, 3] {
963    /// #     assert_eq!(stream.next().await.unwrap(), w);
964    /// # }
965    /// # }));
966    /// # }
967    /// ```
968    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
969        self.into_stream().all_ticks()
970    }
971
972    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
973    /// which will stream the value computed in _each_ tick as a separate stream element.
974    ///
975    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
976    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
977    /// singleton's [`Tick`] context.
978    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
979        self.into_stream().all_ticks_atomic()
980    }
981
982    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
983    /// be asynchronously updated with the latest value of the singleton inside the tick.
984    ///
985    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
986    /// tick that tracks the inner value. This is useful for getting the value as of the
987    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
988    ///
989    /// # Example
990    /// ```rust
991    /// # #[cfg(feature = "deploy")] {
992    /// # use hydro_lang::prelude::*;
993    /// # use futures::StreamExt;
994    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
995    /// let tick = process.tick();
996    /// # // ticks are lazy by default, forces the second tick to run
997    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
998    /// # let batch_first_tick = process
999    /// #   .source_iter(q!(vec![1]))
1000    /// #   .batch(&tick, nondet!(/** test */));
1001    /// # let batch_second_tick = process
1002    /// #   .source_iter(q!(vec![1, 2, 3]))
1003    /// #   .batch(&tick, nondet!(/** test */))
1004    /// #   .defer_tick(); // appears on the second tick
1005    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1006    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1007    ///     .count()
1008    ///     .latest()
1009    /// # .sample_eager(nondet!(/** test */))
1010    /// # }, |mut stream| async move {
1011    /// // asynchronously changes from 1 ~> 3
1012    /// # for w in vec![1, 3] {
1013    /// #     assert_eq!(stream.next().await.unwrap(), w);
1014    /// # }
1015    /// # }));
1016    /// # }
1017    /// ```
1018    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1019        Singleton::new(
1020            self.location.outer().clone(),
1021            HydroNode::YieldConcat {
1022                inner: Box::new(self.ir_node.into_inner()),
1023                metadata: self
1024                    .location
1025                    .outer()
1026                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1027            },
1028        )
1029    }
1030
1031    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1032    /// be updated with the latest value of the singleton inside the tick.
1033    ///
1034    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1035    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1036    /// singleton's [`Tick`] context.
1037    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1038        let out_location = Atomic {
1039            tick: self.location.clone(),
1040        };
1041        Singleton::new(
1042            out_location.clone(),
1043            HydroNode::YieldConcat {
1044                inner: Box::new(self.ir_node.into_inner()),
1045                metadata: out_location
1046                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1047            },
1048        )
1049    }
1050}
1051
1052#[doc(hidden)]
1053/// Helper trait that determines the output collection type for [`Singleton::zip`].
1054///
1055/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1056/// [`Singleton`].
1057#[sealed::sealed]
1058pub trait ZipResult<'a, Other> {
1059    /// The output collection type.
1060    type Out;
1061    /// The type of the tupled output value.
1062    type ElementType;
1063    /// The type of the other collection's value.
1064    type OtherType;
1065    /// The location where the tupled result will be materialized.
1066    type Location: Location<'a>;
1067
1068    /// The location of the second input to the `zip`.
1069    fn other_location(other: &Other) -> Self::Location;
1070    /// The IR node of the second input to the `zip`.
1071    fn other_ir_node(other: Other) -> HydroNode;
1072
1073    /// Constructs the output live collection given an IR node containing the zip result.
1074    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1075}
1076
1077#[sealed::sealed]
1078impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1079where
1080    L: Location<'a>,
1081{
1082    type Out = Singleton<(T, U), L, B>;
1083    type ElementType = (T, U);
1084    type OtherType = U;
1085    type Location = L;
1086
1087    fn other_location(other: &Singleton<U, L, B>) -> L {
1088        other.location.clone()
1089    }
1090
1091    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1092        other.ir_node.into_inner()
1093    }
1094
1095    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1096        Singleton::new(
1097            location.clone(),
1098            HydroNode::Cast {
1099                inner: Box::new(ir_node),
1100                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1101            },
1102        )
1103    }
1104}
1105
1106#[sealed::sealed]
1107impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1108where
1109    L: Location<'a>,
1110{
1111    type Out = Optional<(T, U), L, B>;
1112    type ElementType = (T, U);
1113    type OtherType = U;
1114    type Location = L;
1115
1116    fn other_location(other: &Optional<U, L, B>) -> L {
1117        other.location.clone()
1118    }
1119
1120    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1121        other.ir_node.into_inner()
1122    }
1123
1124    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1125        Optional::new(location, ir_node)
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    #[cfg(feature = "deploy")]
1132    use futures::{SinkExt, StreamExt};
1133    #[cfg(feature = "deploy")]
1134    use hydro_deploy::Deployment;
1135    #[cfg(any(feature = "deploy", feature = "sim"))]
1136    use stageleft::q;
1137
1138    #[cfg(any(feature = "deploy", feature = "sim"))]
1139    use crate::compile::builder::FlowBuilder;
1140    #[cfg(feature = "deploy")]
1141    use crate::live_collections::stream::ExactlyOnce;
1142    #[cfg(any(feature = "deploy", feature = "sim"))]
1143    use crate::location::Location;
1144    #[cfg(any(feature = "deploy", feature = "sim"))]
1145    use crate::nondet::nondet;
1146
1147    #[cfg(feature = "deploy")]
1148    #[tokio::test]
1149    async fn tick_cycle_cardinality() {
1150        let mut deployment = Deployment::new();
1151
1152        let mut flow = FlowBuilder::new();
1153        let node = flow.process::<()>();
1154        let external = flow.external::<()>();
1155
1156        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1157
1158        let node_tick = node.tick();
1159        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1160        let counts = singleton
1161            .clone()
1162            .into_stream()
1163            .count()
1164            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1165            .all_ticks()
1166            .send_bincode_external(&external);
1167        complete_cycle.complete_next_tick(singleton);
1168
1169        let nodes = flow
1170            .with_process(&node, deployment.Localhost())
1171            .with_external(&external, deployment.Localhost())
1172            .deploy(&mut deployment);
1173
1174        deployment.deploy().await.unwrap();
1175
1176        let mut tick_trigger = nodes.connect(input_send).await;
1177        let mut external_out = nodes.connect(counts).await;
1178
1179        deployment.start().await.unwrap();
1180
1181        tick_trigger.send(()).await.unwrap();
1182
1183        assert_eq!(external_out.next().await.unwrap(), 1);
1184
1185        tick_trigger.send(()).await.unwrap();
1186
1187        assert_eq!(external_out.next().await.unwrap(), 1);
1188    }
1189
1190    #[cfg(feature = "sim")]
1191    #[test]
1192    #[should_panic]
1193    fn sim_fold_intermediate_states() {
1194        let mut flow = FlowBuilder::new();
1195        let node = flow.process::<()>();
1196
1197        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1198        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1199
1200        let tick = node.tick();
1201        let batch = folded.snapshot(&tick, nondet!(/** test */));
1202        let out_recv = batch.all_ticks().sim_output();
1203
1204        flow.sim().exhaustive(async || {
1205            assert_eq!(out_recv.next().await.unwrap(), 10);
1206        });
1207    }
1208
1209    #[cfg(feature = "sim")]
1210    #[test]
1211    fn sim_fold_intermediate_state_count() {
1212        let mut flow = FlowBuilder::new();
1213        let node = flow.process::<()>();
1214
1215        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1216        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1217
1218        let tick = node.tick();
1219        let batch = folded.snapshot(&tick, nondet!(/** test */));
1220        let out_recv = batch.all_ticks().sim_output();
1221
1222        let instance_count = flow.sim().exhaustive(async || {
1223            let out = out_recv.collect::<Vec<_>>().await;
1224            assert_eq!(out.last(), Some(&10));
1225        });
1226
1227        assert_eq!(
1228            instance_count,
1229            16 // 2^4 possible subsets of intermediates (including initial state)
1230        )
1231    }
1232
1233    #[cfg(feature = "sim")]
1234    #[test]
1235    fn sim_fold_no_repeat_initial() {
1236        // check that we don't repeat the initial state of the fold in autonomous decisions
1237
1238        let mut flow = FlowBuilder::new();
1239        let node = flow.process::<()>();
1240
1241        let (in_port, input) = node.sim_input();
1242        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1243
1244        let tick = node.tick();
1245        let batch = folded.snapshot(&tick, nondet!(/** test */));
1246        let out_recv = batch.all_ticks().sim_output();
1247
1248        flow.sim().exhaustive(async || {
1249            assert_eq!(out_recv.next().await.unwrap(), 0);
1250
1251            in_port.send(123);
1252
1253            assert_eq!(out_recv.next().await.unwrap(), 123);
1254        });
1255    }
1256
1257    #[cfg(feature = "sim")]
1258    #[test]
1259    #[should_panic]
1260    fn sim_fold_repeats_snapshots() {
1261        // when the tick is driven by a snapshot AND something else, the snapshot can
1262        // "stutter" and repeat the same state multiple times
1263
1264        let mut flow = FlowBuilder::new();
1265        let node = flow.process::<()>();
1266
1267        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1268        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1269
1270        let tick = node.tick();
1271        let batch = source
1272            .batch(&tick, nondet!(/** test */))
1273            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1274        let out_recv = batch.all_ticks().sim_output();
1275
1276        flow.sim().exhaustive(async || {
1277            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1278            {
1279                panic!("repeated snapshot");
1280            }
1281        });
1282    }
1283
1284    #[cfg(feature = "sim")]
1285    #[test]
1286    fn sim_fold_repeats_snapshots_count() {
1287        // check the number of instances
1288        let mut flow = FlowBuilder::new();
1289        let node = flow.process::<()>();
1290
1291        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1292        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1293
1294        let tick = node.tick();
1295        let batch = source
1296            .batch(&tick, nondet!(/** test */))
1297            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1298        let out_recv = batch.all_ticks().sim_output();
1299
1300        let count = flow.sim().exhaustive(async || {
1301            let _ = out_recv.collect::<Vec<_>>().await;
1302        });
1303
1304        assert_eq!(count, 52);
1305        // don't have a combinatorial explanation for this number yet, but checked via logs
1306    }
1307
1308    #[cfg(feature = "sim")]
1309    #[test]
1310    fn sim_top_level_singleton_exhaustive() {
1311        // ensures that top-level singletons have only one snapshot
1312        let mut flow = FlowBuilder::new();
1313        let node = flow.process::<()>();
1314
1315        let singleton = node.singleton(q!(1));
1316        let tick = node.tick();
1317        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1318        let out_recv = batch.all_ticks().sim_output();
1319
1320        let count = flow.sim().exhaustive(async || {
1321            let _ = out_recv.collect::<Vec<_>>().await;
1322        });
1323
1324        assert_eq!(count, 1);
1325    }
1326
1327    #[cfg(feature = "sim")]
1328    #[test]
1329    fn sim_top_level_singleton_join_count() {
1330        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1331        // exploration
1332
1333        let mut flow = FlowBuilder::new();
1334        let node = flow.process::<()>();
1335
1336        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1337        let tick = node.tick();
1338        let batch = source_iter
1339            .batch(&tick, nondet!(/** test */))
1340            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1341        let out_recv = batch.all_ticks().sim_output();
1342
1343        let instance_count = flow.sim().exhaustive(async || {
1344            let _ = out_recv.collect::<Vec<_>>().await;
1345        });
1346
1347        assert_eq!(
1348            instance_count,
1349            16 // 2^4 ways to split up (including a possibly empty first batch)
1350        )
1351    }
1352
1353    #[cfg(feature = "sim")]
1354    #[test]
1355    fn top_level_singleton_into_stream_no_replay() {
1356        let mut flow = FlowBuilder::new();
1357        let node = flow.process::<()>();
1358
1359        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1360        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1361
1362        let out_recv = folded.into_stream().sim_output();
1363
1364        flow.sim().exhaustive(async || {
1365            out_recv.assert_yields_only([10]).await;
1366        });
1367    }
1368}