Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42
43    _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
47where
48    T: Clone,
49    L: Location<'a> + NoTick,
50{
51    fn from(value: Singleton<T, L, Bounded>) -> Self {
52        let tick = value.location().tick();
53        value.clone_into_tick(&tick).latest()
54    }
55}
56
57impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
58where
59    L: Location<'a>,
60{
61    type Location = Tick<L>;
62
63    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
64        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
65            location.clone(),
66            HydroNode::DeferTick {
67                input: Box::new(HydroNode::CycleSource {
68                    cycle_id,
69                    metadata: location.new_node_metadata(Self::collection_kind()),
70                }),
71                metadata: location
72                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
73            },
74        );
75
76        from_previous_tick.unwrap_or(initial)
77    }
78}
79
80impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
81where
82    L: Location<'a>,
83{
84    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
85        assert_eq!(
86            Location::id(&self.location),
87            expected_location,
88            "locations do not match"
89        );
90        self.location
91            .flow_state()
92            .borrow_mut()
93            .push_root(HydroRoot::CycleSink {
94                cycle_id,
95                input: Box::new(self.ir_node.into_inner()),
96                op_metadata: HydroIrOpMetadata::new(),
97            });
98    }
99}
100
101impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
102where
103    L: Location<'a>,
104{
105    type Location = Tick<L>;
106
107    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
108        Singleton::new(
109            location.clone(),
110            HydroNode::CycleSource {
111                cycle_id,
112                metadata: location.new_node_metadata(Self::collection_kind()),
113            },
114        )
115    }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
119where
120    L: Location<'a>,
121{
122    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123        assert_eq!(
124            Location::id(&self.location),
125            expected_location,
126            "locations do not match"
127        );
128        self.location
129            .flow_state()
130            .borrow_mut()
131            .push_root(HydroRoot::CycleSink {
132                cycle_id,
133                input: Box::new(self.ir_node.into_inner()),
134                op_metadata: HydroIrOpMetadata::new(),
135            });
136    }
137}
138
139impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
140where
141    L: Location<'a> + NoTick,
142{
143    type Location = L;
144
145    fn create_source(cycle_id: CycleId, location: L) -> Self {
146        Singleton::new(
147            location.clone(),
148            HydroNode::CycleSource {
149                cycle_id,
150                metadata: location.new_node_metadata(Self::collection_kind()),
151            },
152        )
153    }
154}
155
156impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
157where
158    L: Location<'a> + NoTick,
159{
160    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161        assert_eq!(
162            Location::id(&self.location),
163            expected_location,
164            "locations do not match"
165        );
166        self.location
167            .flow_state()
168            .borrow_mut()
169            .push_root(HydroRoot::CycleSink {
170                cycle_id,
171                input: Box::new(self.ir_node.into_inner()),
172                op_metadata: HydroIrOpMetadata::new(),
173            });
174    }
175}
176
177impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
178where
179    T: Clone,
180    L: Location<'a>,
181{
182    fn clone(&self) -> Self {
183        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
184            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
185            *self.ir_node.borrow_mut() = HydroNode::Tee {
186                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
187                metadata: self.location.new_node_metadata(Self::collection_kind()),
188            };
189        }
190
191        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
192            Singleton {
193                location: self.location.clone(),
194                ir_node: HydroNode::Tee {
195                    inner: TeeNode(inner.0.clone()),
196                    metadata: metadata.clone(),
197                }
198                .into(),
199                _phantom: PhantomData,
200            }
201        } else {
202            unreachable!()
203        }
204    }
205}
206
207#[cfg(stageleft_runtime)]
208fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
209    me: Singleton<T, Tick<L>, B>,
210    other: Optional<O, Tick<L>, B>,
211) -> Optional<(T, O), Tick<L>, B> {
212    let me_as_optional: Optional<T, Tick<L>, B> = me.into();
213    super::optional::zip_inside_tick(me_as_optional, other)
214}
215
216impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
217where
218    L: Location<'a>,
219{
220    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
221        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
222        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
223        Singleton {
224            location,
225            ir_node: RefCell::new(ir_node),
226            _phantom: PhantomData,
227        }
228    }
229
230    pub(crate) fn collection_kind() -> CollectionKind {
231        CollectionKind::Singleton {
232            bound: B::BOUND_KIND,
233            element_type: stageleft::quote_type::<T>().into(),
234        }
235    }
236
237    /// Returns the [`Location`] where this singleton is being materialized.
238    pub fn location(&self) -> &L {
239        &self.location
240    }
241
242    /// Transforms the singleton value by applying a function `f` to it,
243    /// continuously as the input is updated.
244    ///
245    /// # Example
246    /// ```rust
247    /// # #[cfg(feature = "deploy")] {
248    /// # use hydro_lang::prelude::*;
249    /// # use futures::StreamExt;
250    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
251    /// let tick = process.tick();
252    /// let singleton = tick.singleton(q!(5));
253    /// singleton.map(q!(|v| v * 2)).all_ticks()
254    /// # }, |mut stream| async move {
255    /// // 10
256    /// # assert_eq!(stream.next().await.unwrap(), 10);
257    /// # }));
258    /// # }
259    /// ```
260    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
261    where
262        F: Fn(T) -> U + 'a,
263    {
264        let f = f.splice_fn1_ctx(&self.location).into();
265        Singleton::new(
266            self.location.clone(),
267            HydroNode::Map {
268                f,
269                input: Box::new(self.ir_node.into_inner()),
270                metadata: self
271                    .location
272                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
273            },
274        )
275    }
276
277    /// Transforms the singleton value by applying a function `f` to it and then flattening
278    /// the result into a stream, preserving the order of elements.
279    ///
280    /// The function `f` is applied to the singleton value to produce an iterator, and all items
281    /// from that iterator are emitted in the output stream in deterministic order.
282    ///
283    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
284    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
285    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
286    ///
287    /// # Example
288    /// ```rust
289    /// # #[cfg(feature = "deploy")] {
290    /// # use hydro_lang::prelude::*;
291    /// # use futures::StreamExt;
292    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
293    /// let tick = process.tick();
294    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
295    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
296    /// # }, |mut stream| async move {
297    /// // 1, 2, 3
298    /// # for w in vec![1, 2, 3] {
299    /// #     assert_eq!(stream.next().await.unwrap(), w);
300    /// # }
301    /// # }));
302    /// # }
303    /// ```
304    pub fn flat_map_ordered<U, I, F>(
305        self,
306        f: impl IntoQuotedMut<'a, F, L>,
307    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
308    where
309        I: IntoIterator<Item = U>,
310        F: Fn(T) -> I + 'a,
311    {
312        let f = f.splice_fn1_ctx(&self.location).into();
313        Stream::new(
314            self.location.clone(),
315            HydroNode::FlatMap {
316                f,
317                input: Box::new(self.ir_node.into_inner()),
318                metadata: self.location.new_node_metadata(
319                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
320                ),
321            },
322        )
323    }
324
325    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
326    /// for the output type `I` to produce items in any order.
327    ///
328    /// The function `f` is applied to the singleton value to produce an iterator, and all items
329    /// from that iterator are emitted in the output stream in non-deterministic order.
330    ///
331    /// # Example
332    /// ```rust
333    /// # #[cfg(feature = "deploy")] {
334    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
335    /// # use futures::StreamExt;
336    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
337    /// let tick = process.tick();
338    /// let singleton = tick.singleton(q!(
339    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
340    /// ));
341    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
342    /// # }, |mut stream| async move {
343    /// // 1, 2, 3, but in no particular order
344    /// # let mut results = Vec::new();
345    /// # for _ in 0..3 {
346    /// #     results.push(stream.next().await.unwrap());
347    /// # }
348    /// # results.sort();
349    /// # assert_eq!(results, vec![1, 2, 3]);
350    /// # }));
351    /// # }
352    /// ```
353    pub fn flat_map_unordered<U, I, F>(
354        self,
355        f: impl IntoQuotedMut<'a, F, L>,
356    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
357    where
358        I: IntoIterator<Item = U>,
359        F: Fn(T) -> I + 'a,
360    {
361        let f = f.splice_fn1_ctx(&self.location).into();
362        Stream::new(
363            self.location.clone(),
364            HydroNode::FlatMap {
365                f,
366                input: Box::new(self.ir_node.into_inner()),
367                metadata: self
368                    .location
369                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
370            },
371        )
372    }
373
374    /// Flattens the singleton value into a stream, preserving the order of elements.
375    ///
376    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
377    /// are emitted in the output stream in deterministic order.
378    ///
379    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
380    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
381    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
382    ///
383    /// # Example
384    /// ```rust
385    /// # #[cfg(feature = "deploy")] {
386    /// # use hydro_lang::prelude::*;
387    /// # use futures::StreamExt;
388    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
389    /// let tick = process.tick();
390    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
391    /// singleton.flatten_ordered().all_ticks()
392    /// # }, |mut stream| async move {
393    /// // 1, 2, 3
394    /// # for w in vec![1, 2, 3] {
395    /// #     assert_eq!(stream.next().await.unwrap(), w);
396    /// # }
397    /// # }));
398    /// # }
399    /// ```
400    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
401    where
402        T: IntoIterator<Item = U>,
403    {
404        self.flat_map_ordered(q!(|x| x))
405    }
406
407    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
408    /// for the element type `T` to produce items in any order.
409    ///
410    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
411    /// are emitted in the output stream in non-deterministic order.
412    ///
413    /// # Example
414    /// ```rust
415    /// # #[cfg(feature = "deploy")] {
416    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
417    /// # use futures::StreamExt;
418    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
419    /// let tick = process.tick();
420    /// let singleton = tick.singleton(q!(
421    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
422    /// ));
423    /// singleton.flatten_unordered().all_ticks()
424    /// # }, |mut stream| async move {
425    /// // 1, 2, 3, but in no particular order
426    /// # let mut results = Vec::new();
427    /// # for _ in 0..3 {
428    /// #     results.push(stream.next().await.unwrap());
429    /// # }
430    /// # results.sort();
431    /// # assert_eq!(results, vec![1, 2, 3]);
432    /// # }));
433    /// # }
434    /// ```
435    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
436    where
437        T: IntoIterator<Item = U>,
438    {
439        self.flat_map_unordered(q!(|x| x))
440    }
441
442    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
443    ///
444    /// If the predicate returns `true`, the output optional contains the same value.
445    /// If the predicate returns `false`, the output optional is empty.
446    ///
447    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
448    /// not modify or take ownership of the value. If you need to modify the value while filtering
449    /// use [`Singleton::filter_map`] instead.
450    ///
451    /// # Example
452    /// ```rust
453    /// # #[cfg(feature = "deploy")] {
454    /// # use hydro_lang::prelude::*;
455    /// # use futures::StreamExt;
456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457    /// let tick = process.tick();
458    /// let singleton = tick.singleton(q!(5));
459    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
460    /// # }, |mut stream| async move {
461    /// // 5
462    /// # assert_eq!(stream.next().await.unwrap(), 5);
463    /// # }));
464    /// # }
465    /// ```
466    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
467    where
468        F: Fn(&T) -> bool + 'a,
469    {
470        let f = f.splice_fn1_borrow_ctx(&self.location).into();
471        Optional::new(
472            self.location.clone(),
473            HydroNode::Filter {
474                f,
475                input: Box::new(self.ir_node.into_inner()),
476                metadata: self
477                    .location
478                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
479            },
480        )
481    }
482
483    /// An operator that both filters and maps. It yields the value only if the supplied
484    /// closure `f` returns `Some(value)`.
485    ///
486    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
487    /// If the closure returns `None`, the output optional is empty.
488    ///
489    /// # Example
490    /// ```rust
491    /// # #[cfg(feature = "deploy")] {
492    /// # use hydro_lang::prelude::*;
493    /// # use futures::StreamExt;
494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
495    /// let tick = process.tick();
496    /// let singleton = tick.singleton(q!("42"));
497    /// singleton
498    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
499    ///     .all_ticks()
500    /// # }, |mut stream| async move {
501    /// // 42
502    /// # assert_eq!(stream.next().await.unwrap(), 42);
503    /// # }));
504    /// # }
505    /// ```
506    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
507    where
508        F: Fn(T) -> Option<U> + 'a,
509    {
510        let f = f.splice_fn1_ctx(&self.location).into();
511        Optional::new(
512            self.location.clone(),
513            HydroNode::FilterMap {
514                f,
515                input: Box::new(self.ir_node.into_inner()),
516                metadata: self
517                    .location
518                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
519            },
520        )
521    }
522
523    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
524    ///
525    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
526    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
527    /// non-null. This is useful for combining several pieces of state together.
528    ///
529    /// # Example
530    /// ```rust
531    /// # #[cfg(feature = "deploy")] {
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let tick = process.tick();
536    /// let numbers = process
537    ///   .source_iter(q!(vec![123, 456]))
538    ///   .batch(&tick, nondet!(/** test */));
539    /// let count = numbers.clone().count(); // Singleton
540    /// let max = numbers.max(); // Optional
541    /// count.zip(max).all_ticks()
542    /// # }, |mut stream| async move {
543    /// // [(2, 456)]
544    /// # for w in vec![(2, 456)] {
545    /// #     assert_eq!(stream.next().await.unwrap(), w);
546    /// # }
547    /// # }));
548    /// # }
549    /// ```
550    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
551    where
552        Self: ZipResult<'a, O, Location = L>,
553        B: IsBounded,
554    {
555        check_matching_location(&self.location, &Self::other_location(&other));
556
557        if L::is_top_level()
558            && let Some(tick) = self.location.try_tick()
559        {
560            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
561            let out = zip_inside_tick(
562                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
563                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
564                    other_location.clone(),
565                    HydroNode::Cast {
566                        inner: Box::new(Self::other_ir_node(other)),
567                        metadata: other_location.new_node_metadata(Optional::<
568                            <Self as ZipResult<'a, O>>::OtherType,
569                            Tick<L>,
570                            Bounded,
571                        >::collection_kind(
572                        )),
573                    },
574                )
575                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
576            )
577            .latest();
578
579            Self::make(out.location, out.ir_node.into_inner())
580        } else {
581            Self::make(
582                self.location.clone(),
583                HydroNode::CrossSingleton {
584                    left: Box::new(self.ir_node.into_inner()),
585                    right: Box::new(Self::other_ir_node(other)),
586                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
587                        bound: B::BOUND_KIND,
588                        element_type: stageleft::quote_type::<
589                            <Self as ZipResult<'a, O>>::ElementType,
590                        >()
591                        .into(),
592                    }),
593                },
594            )
595        }
596    }
597
598    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
599    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
600    ///
601    /// Useful for conditionally processing, such as only emitting a singleton's value outside
602    /// a tick if some other condition is satisfied.
603    ///
604    /// # Example
605    /// ```rust
606    /// # #[cfg(feature = "deploy")] {
607    /// # use hydro_lang::prelude::*;
608    /// # use futures::StreamExt;
609    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610    /// let tick = process.tick();
611    /// // ticks are lazy by default, forces the second tick to run
612    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
613    ///
614    /// let batch_first_tick = process
615    ///   .source_iter(q!(vec![1]))
616    ///   .batch(&tick, nondet!(/** test */));
617    /// let batch_second_tick = process
618    ///   .source_iter(q!(vec![1, 2, 3]))
619    ///   .batch(&tick, nondet!(/** test */))
620    ///   .defer_tick(); // appears on the second tick
621    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
622    /// batch_first_tick.chain(batch_second_tick).count()
623    ///   .filter_if_some(some_on_first_tick)
624    ///   .all_ticks()
625    /// # }, |mut stream| async move {
626    /// // [1]
627    /// # for w in vec![1] {
628    /// #     assert_eq!(stream.next().await.unwrap(), w);
629    /// # }
630    /// # }));
631    /// # }
632    /// ```
633    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
634    where
635        B: IsBounded,
636    {
637        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
638            .map(q!(|(d, _signal)| d))
639    }
640
641    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
642    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
643    ///
644    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
645    /// the condition.
646    ///
647    /// # Example
648    /// ```rust
649    /// # #[cfg(feature = "deploy")] {
650    /// # use hydro_lang::prelude::*;
651    /// # use futures::StreamExt;
652    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
653    /// let tick = process.tick();
654    /// // ticks are lazy by default, forces the second tick to run
655    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
656    ///
657    /// let batch_first_tick = process
658    ///   .source_iter(q!(vec![1]))
659    ///   .batch(&tick, nondet!(/** test */));
660    /// let batch_second_tick = process
661    ///   .source_iter(q!(vec![1, 2, 3]))
662    ///   .batch(&tick, nondet!(/** test */))
663    ///   .defer_tick(); // appears on the second tick
664    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
665    /// batch_first_tick.chain(batch_second_tick).count()
666    ///   .filter_if_none(some_on_first_tick)
667    ///   .all_ticks()
668    /// # }, |mut stream| async move {
669    /// // [3]
670    /// # for w in vec![3] {
671    /// #     assert_eq!(stream.next().await.unwrap(), w);
672    /// # }
673    /// # }));
674    /// # }
675    /// ```
676    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
677    where
678        B: IsBounded,
679    {
680        self.filter_if_some(
681            other
682                .map(q!(|_| ()))
683                .into_singleton()
684                .filter(q!(|o| o.is_none())),
685        )
686    }
687
688    /// An operator which allows you to "name" a `HydroNode`.
689    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
690    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
691        {
692            let mut node = self.ir_node.borrow_mut();
693            let metadata = node.metadata_mut();
694            metadata.tag = Some(name.to_owned());
695        }
696        self
697    }
698}
699
700impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
701where
702    L: Location<'a>,
703{
704    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
705    /// the inner `Option`.
706    ///
707    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
708    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
709    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
710    ///
711    /// # Example
712    /// ```rust
713    /// # #[cfg(feature = "deploy")] {
714    /// # use hydro_lang::prelude::*;
715    /// # use futures::StreamExt;
716    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
717    /// let tick = process.tick();
718    /// let singleton = tick.singleton(q!(Some(42)));
719    /// singleton.into_optional().all_ticks()
720    /// # }, |mut stream| async move {
721    /// // 42
722    /// # assert_eq!(stream.next().await.unwrap(), 42);
723    /// # }));
724    /// # }
725    /// ```
726    pub fn into_optional(self) -> Optional<T, L, B> {
727        self.filter_map(q!(|v| v))
728    }
729}
730
731impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
732where
733    L: Location<'a> + NoTick,
734{
735    /// Returns a singleton value corresponding to the latest snapshot of the singleton
736    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
737    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
738    /// all snapshots of this singleton into the atomic-associated tick will observe the
739    /// same value each tick.
740    ///
741    /// # Non-Determinism
742    /// Because this picks a snapshot of a singleton whose value is continuously changing,
743    /// the output singleton has a non-deterministic value since the snapshot can be at an
744    /// arbitrary point in time.
745    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
746        Singleton::new(
747            self.location.clone().tick,
748            HydroNode::Batch {
749                inner: Box::new(self.ir_node.into_inner()),
750                metadata: self
751                    .location
752                    .tick
753                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
754            },
755        )
756    }
757
758    /// Returns this singleton back into a top-level, asynchronous execution context where updates
759    /// to the value will be asynchronously propagated.
760    pub fn end_atomic(self) -> Singleton<T, L, B> {
761        Singleton::new(
762            self.location.tick.l.clone(),
763            HydroNode::EndAtomic {
764                inner: Box::new(self.ir_node.into_inner()),
765                metadata: self
766                    .location
767                    .tick
768                    .l
769                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
770            },
771        )
772    }
773}
774
775impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
776where
777    L: Location<'a>,
778{
779    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
780    /// will observe the same version of the value and will be executed synchronously before any
781    /// outputs are yielded (in [`Optional::end_atomic`]).
782    ///
783    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
784    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
785    /// a different version).
786    ///
787    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
788    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
789    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
790    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
791        let out_location = Atomic { tick: tick.clone() };
792        Singleton::new(
793            out_location.clone(),
794            HydroNode::BeginAtomic {
795                inner: Box::new(self.ir_node.into_inner()),
796                metadata: out_location
797                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
798            },
799        )
800    }
801
802    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
803    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
804    /// relevant data that contributed to the snapshot at tick `t`.
805    ///
806    /// # Non-Determinism
807    /// Because this picks a snapshot of a singleton whose value is continuously changing,
808    /// the output singleton has a non-deterministic value since the snapshot can be at an
809    /// arbitrary point in time.
810    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
811        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
812        Singleton::new(
813            tick.clone(),
814            HydroNode::Batch {
815                inner: Box::new(self.ir_node.into_inner()),
816                metadata: tick
817                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
818            },
819        )
820    }
821
822    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
823    /// with order corresponding to increasing prefixes of data contributing to the singleton.
824    ///
825    /// # Non-Determinism
826    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
827    /// to non-deterministic batching and arrival of inputs, the output stream is
828    /// non-deterministic.
829    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
830    where
831        L: NoTick,
832    {
833        sliced! {
834            let snapshot = use(self, nondet);
835            snapshot.into_stream()
836        }
837        .weaken_retries()
838    }
839
840    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
841    /// value taken at various points in time. Because the input singleton may be
842    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
843    /// represent the value of the singleton given some prefix of the streams leading up to
844    /// it.
845    ///
846    /// # Non-Determinism
847    /// The output stream is non-deterministic in which elements are sampled, since this
848    /// is controlled by a clock.
849    pub fn sample_every(
850        self,
851        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
852        nondet: NonDet,
853    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
854    where
855        L: NoTick + NoAtomic,
856    {
857        let samples = self.location.source_interval(interval, nondet);
858        sliced! {
859            let snapshot = use(self, nondet);
860            let sample_batch = use(samples, nondet);
861
862            snapshot.filter_if_some(sample_batch.first()).into_stream()
863        }
864        .weaken_retries()
865    }
866
867    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
868    /// implies that `B == Bounded`.
869    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
870    where
871        B: IsBounded,
872    {
873        Singleton::new(self.location, self.ir_node.into_inner())
874    }
875
876    /// Clones this bounded singleton into a tick, returning a singleton that has the
877    /// same value as the outer singleton. Because the outer singleton is bounded, this
878    /// is deterministic because there is only a single immutable version.
879    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
880    where
881        B: IsBounded,
882        T: Clone,
883    {
884        // TODO(shadaj): avoid printing simulator logs for this snapshot
885        self.snapshot(
886            tick,
887            nondet!(/** bounded top-level singleton so deterministic */),
888        )
889    }
890
891    /// Converts this singleton into a [`Stream`] containing a single element, the value.
892    ///
893    /// # Example
894    /// ```rust
895    /// # #[cfg(feature = "deploy")] {
896    /// # use hydro_lang::prelude::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// let batch_input = process
901    ///   .source_iter(q!(vec![123, 456]))
902    ///   .batch(&tick, nondet!(/** test */));
903    /// batch_input.clone().chain(
904    ///   batch_input.count().into_stream()
905    /// ).all_ticks()
906    /// # }, |mut stream| async move {
907    /// // [123, 456, 2]
908    /// # for w in vec![123, 456, 2] {
909    /// #     assert_eq!(stream.next().await.unwrap(), w);
910    /// # }
911    /// # }));
912    /// # }
913    /// ```
914    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
915    where
916        B: IsBounded,
917    {
918        Stream::new(
919            self.location.clone(),
920            HydroNode::Cast {
921                inner: Box::new(self.ir_node.into_inner()),
922                metadata: self.location.new_node_metadata(Stream::<
923                    T,
924                    Tick<L>,
925                    Bounded,
926                    TotalOrder,
927                    ExactlyOnce,
928                >::collection_kind()),
929            },
930        )
931    }
932}
933
934impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
935where
936    L: Location<'a>,
937{
938    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
939    /// which will stream the value computed in _each_ tick as a separate stream element.
940    ///
941    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
942    /// producing one element in the output for each tick. This is useful for batched computations,
943    /// where the results from each tick must be combined together.
944    ///
945    /// # Example
946    /// ```rust
947    /// # #[cfg(feature = "deploy")] {
948    /// # use hydro_lang::prelude::*;
949    /// # use futures::StreamExt;
950    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
951    /// let tick = process.tick();
952    /// # // ticks are lazy by default, forces the second tick to run
953    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
954    /// # let batch_first_tick = process
955    /// #   .source_iter(q!(vec![1]))
956    /// #   .batch(&tick, nondet!(/** test */));
957    /// # let batch_second_tick = process
958    /// #   .source_iter(q!(vec![1, 2, 3]))
959    /// #   .batch(&tick, nondet!(/** test */))
960    /// #   .defer_tick(); // appears on the second tick
961    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
962    /// input_batch // first tick: [1], second tick: [1, 2, 3]
963    ///     .count()
964    ///     .all_ticks()
965    /// # }, |mut stream| async move {
966    /// // [1, 3]
967    /// # for w in vec![1, 3] {
968    /// #     assert_eq!(stream.next().await.unwrap(), w);
969    /// # }
970    /// # }));
971    /// # }
972    /// ```
973    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
974        self.into_stream().all_ticks()
975    }
976
977    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
978    /// which will stream the value computed in _each_ tick as a separate stream element.
979    ///
980    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
981    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
982    /// singleton's [`Tick`] context.
983    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
984        self.into_stream().all_ticks_atomic()
985    }
986
987    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
988    /// be asynchronously updated with the latest value of the singleton inside the tick.
989    ///
990    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
991    /// tick that tracks the inner value. This is useful for getting the value as of the
992    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
993    ///
994    /// # Example
995    /// ```rust
996    /// # #[cfg(feature = "deploy")] {
997    /// # use hydro_lang::prelude::*;
998    /// # use futures::StreamExt;
999    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1000    /// let tick = process.tick();
1001    /// # // ticks are lazy by default, forces the second tick to run
1002    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1003    /// # let batch_first_tick = process
1004    /// #   .source_iter(q!(vec![1]))
1005    /// #   .batch(&tick, nondet!(/** test */));
1006    /// # let batch_second_tick = process
1007    /// #   .source_iter(q!(vec![1, 2, 3]))
1008    /// #   .batch(&tick, nondet!(/** test */))
1009    /// #   .defer_tick(); // appears on the second tick
1010    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1011    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1012    ///     .count()
1013    ///     .latest()
1014    /// # .sample_eager(nondet!(/** test */))
1015    /// # }, |mut stream| async move {
1016    /// // asynchronously changes from 1 ~> 3
1017    /// # for w in vec![1, 3] {
1018    /// #     assert_eq!(stream.next().await.unwrap(), w);
1019    /// # }
1020    /// # }));
1021    /// # }
1022    /// ```
1023    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1024        Singleton::new(
1025            self.location.outer().clone(),
1026            HydroNode::YieldConcat {
1027                inner: Box::new(self.ir_node.into_inner()),
1028                metadata: self
1029                    .location
1030                    .outer()
1031                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1032            },
1033        )
1034    }
1035
1036    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1037    /// be updated with the latest value of the singleton inside the tick.
1038    ///
1039    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1040    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1041    /// singleton's [`Tick`] context.
1042    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1043        let out_location = Atomic {
1044            tick: self.location.clone(),
1045        };
1046        Singleton::new(
1047            out_location.clone(),
1048            HydroNode::YieldConcat {
1049                inner: Box::new(self.ir_node.into_inner()),
1050                metadata: out_location
1051                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1052            },
1053        )
1054    }
1055}
1056
1057#[doc(hidden)]
1058/// Helper trait that determines the output collection type for [`Singleton::zip`].
1059///
1060/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1061/// [`Singleton`].
1062#[sealed::sealed]
1063pub trait ZipResult<'a, Other> {
1064    /// The output collection type.
1065    type Out;
1066    /// The type of the tupled output value.
1067    type ElementType;
1068    /// The type of the other collection's value.
1069    type OtherType;
1070    /// The location where the tupled result will be materialized.
1071    type Location: Location<'a>;
1072
1073    /// The location of the second input to the `zip`.
1074    fn other_location(other: &Other) -> Self::Location;
1075    /// The IR node of the second input to the `zip`.
1076    fn other_ir_node(other: Other) -> HydroNode;
1077
1078    /// Constructs the output live collection given an IR node containing the zip result.
1079    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1080}
1081
1082#[sealed::sealed]
1083impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1084where
1085    L: Location<'a>,
1086{
1087    type Out = Singleton<(T, U), L, B>;
1088    type ElementType = (T, U);
1089    type OtherType = U;
1090    type Location = L;
1091
1092    fn other_location(other: &Singleton<U, L, B>) -> L {
1093        other.location.clone()
1094    }
1095
1096    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1097        other.ir_node.into_inner()
1098    }
1099
1100    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1101        Singleton::new(
1102            location.clone(),
1103            HydroNode::Cast {
1104                inner: Box::new(ir_node),
1105                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1106            },
1107        )
1108    }
1109}
1110
1111#[sealed::sealed]
1112impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1113where
1114    L: Location<'a>,
1115{
1116    type Out = Optional<(T, U), L, B>;
1117    type ElementType = (T, U);
1118    type OtherType = U;
1119    type Location = L;
1120
1121    fn other_location(other: &Optional<U, L, B>) -> L {
1122        other.location.clone()
1123    }
1124
1125    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1126        other.ir_node.into_inner()
1127    }
1128
1129    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1130        Optional::new(location, ir_node)
1131    }
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136    #[cfg(feature = "deploy")]
1137    use futures::{SinkExt, StreamExt};
1138    #[cfg(feature = "deploy")]
1139    use hydro_deploy::Deployment;
1140    #[cfg(any(feature = "deploy", feature = "sim"))]
1141    use stageleft::q;
1142
1143    #[cfg(any(feature = "deploy", feature = "sim"))]
1144    use crate::compile::builder::FlowBuilder;
1145    #[cfg(feature = "deploy")]
1146    use crate::live_collections::stream::ExactlyOnce;
1147    #[cfg(any(feature = "deploy", feature = "sim"))]
1148    use crate::location::Location;
1149    #[cfg(any(feature = "deploy", feature = "sim"))]
1150    use crate::nondet::nondet;
1151
1152    #[cfg(feature = "deploy")]
1153    #[tokio::test]
1154    async fn tick_cycle_cardinality() {
1155        let mut deployment = Deployment::new();
1156
1157        let mut flow = FlowBuilder::new();
1158        let node = flow.process::<()>();
1159        let external = flow.external::<()>();
1160
1161        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1162
1163        let node_tick = node.tick();
1164        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1165        let counts = singleton
1166            .clone()
1167            .into_stream()
1168            .count()
1169            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1170            .all_ticks()
1171            .send_bincode_external(&external);
1172        complete_cycle.complete_next_tick(singleton);
1173
1174        let nodes = flow
1175            .with_process(&node, deployment.Localhost())
1176            .with_external(&external, deployment.Localhost())
1177            .deploy(&mut deployment);
1178
1179        deployment.deploy().await.unwrap();
1180
1181        let mut tick_trigger = nodes.connect(input_send).await;
1182        let mut external_out = nodes.connect(counts).await;
1183
1184        deployment.start().await.unwrap();
1185
1186        tick_trigger.send(()).await.unwrap();
1187
1188        assert_eq!(external_out.next().await.unwrap(), 1);
1189
1190        tick_trigger.send(()).await.unwrap();
1191
1192        assert_eq!(external_out.next().await.unwrap(), 1);
1193    }
1194
1195    #[cfg(feature = "sim")]
1196    #[test]
1197    #[should_panic]
1198    fn sim_fold_intermediate_states() {
1199        let mut flow = FlowBuilder::new();
1200        let node = flow.process::<()>();
1201
1202        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1203        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1204
1205        let tick = node.tick();
1206        let batch = folded.snapshot(&tick, nondet!(/** test */));
1207        let out_recv = batch.all_ticks().sim_output();
1208
1209        flow.sim().exhaustive(async || {
1210            assert_eq!(out_recv.next().await.unwrap(), 10);
1211        });
1212    }
1213
1214    #[cfg(feature = "sim")]
1215    #[test]
1216    fn sim_fold_intermediate_state_count() {
1217        let mut flow = FlowBuilder::new();
1218        let node = flow.process::<()>();
1219
1220        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1221        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1222
1223        let tick = node.tick();
1224        let batch = folded.snapshot(&tick, nondet!(/** test */));
1225        let out_recv = batch.all_ticks().sim_output();
1226
1227        let instance_count = flow.sim().exhaustive(async || {
1228            let out = out_recv.collect::<Vec<_>>().await;
1229            assert_eq!(out.last(), Some(&10));
1230        });
1231
1232        assert_eq!(
1233            instance_count,
1234            16 // 2^4 possible subsets of intermediates (including initial state)
1235        )
1236    }
1237
1238    #[cfg(feature = "sim")]
1239    #[test]
1240    fn sim_fold_no_repeat_initial() {
1241        // check that we don't repeat the initial state of the fold in autonomous decisions
1242
1243        let mut flow = FlowBuilder::new();
1244        let node = flow.process::<()>();
1245
1246        let (in_port, input) = node.sim_input();
1247        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1248
1249        let tick = node.tick();
1250        let batch = folded.snapshot(&tick, nondet!(/** test */));
1251        let out_recv = batch.all_ticks().sim_output();
1252
1253        flow.sim().exhaustive(async || {
1254            assert_eq!(out_recv.next().await.unwrap(), 0);
1255
1256            in_port.send(123);
1257
1258            assert_eq!(out_recv.next().await.unwrap(), 123);
1259        });
1260    }
1261
1262    #[cfg(feature = "sim")]
1263    #[test]
1264    #[should_panic]
1265    fn sim_fold_repeats_snapshots() {
1266        // when the tick is driven by a snapshot AND something else, the snapshot can
1267        // "stutter" and repeat the same state multiple times
1268
1269        let mut flow = FlowBuilder::new();
1270        let node = flow.process::<()>();
1271
1272        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1273        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1274
1275        let tick = node.tick();
1276        let batch = source
1277            .batch(&tick, nondet!(/** test */))
1278            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1279        let out_recv = batch.all_ticks().sim_output();
1280
1281        flow.sim().exhaustive(async || {
1282            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1283            {
1284                panic!("repeated snapshot");
1285            }
1286        });
1287    }
1288
1289    #[cfg(feature = "sim")]
1290    #[test]
1291    fn sim_fold_repeats_snapshots_count() {
1292        // check the number of instances
1293        let mut flow = FlowBuilder::new();
1294        let node = flow.process::<()>();
1295
1296        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1297        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1298
1299        let tick = node.tick();
1300        let batch = source
1301            .batch(&tick, nondet!(/** test */))
1302            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1303        let out_recv = batch.all_ticks().sim_output();
1304
1305        let count = flow.sim().exhaustive(async || {
1306            let _ = out_recv.collect::<Vec<_>>().await;
1307        });
1308
1309        assert_eq!(count, 52);
1310        // don't have a combinatorial explanation for this number yet, but checked via logs
1311    }
1312
1313    #[cfg(feature = "sim")]
1314    #[test]
1315    fn sim_top_level_singleton_exhaustive() {
1316        // ensures that top-level singletons have only one snapshot
1317        let mut flow = FlowBuilder::new();
1318        let node = flow.process::<()>();
1319
1320        let singleton = node.singleton(q!(1));
1321        let tick = node.tick();
1322        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1323        let out_recv = batch.all_ticks().sim_output();
1324
1325        let count = flow.sim().exhaustive(async || {
1326            let _ = out_recv.collect::<Vec<_>>().await;
1327        });
1328
1329        assert_eq!(count, 1);
1330    }
1331
1332    #[cfg(feature = "sim")]
1333    #[test]
1334    fn sim_top_level_singleton_join_count() {
1335        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1336        // exploration
1337
1338        let mut flow = FlowBuilder::new();
1339        let node = flow.process::<()>();
1340
1341        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1342        let tick = node.tick();
1343        let batch = source_iter
1344            .batch(&tick, nondet!(/** test */))
1345            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1346        let out_recv = batch.all_ticks().sim_output();
1347
1348        let instance_count = flow.sim().exhaustive(async || {
1349            let _ = out_recv.collect::<Vec<_>>().await;
1350        });
1351
1352        assert_eq!(
1353            instance_count,
1354            16 // 2^4 ways to split up (including a possibly empty first batch)
1355        )
1356    }
1357
1358    #[cfg(feature = "sim")]
1359    #[test]
1360    fn top_level_singleton_into_stream_no_replay() {
1361        let mut flow = FlowBuilder::new();
1362        let node = flow.process::<()>();
1363
1364        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1365        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1366
1367        let out_recv = folded.into_stream().sim_output();
1368
1369        flow.sim().exhaustive(async || {
1370            out_recv.assert_yields_only([10]).await;
1371        });
1372    }
1373
1374    #[cfg(feature = "sim")]
1375    #[test]
1376    fn inside_tick_singleton_zip() {
1377        use crate::live_collections::Stream;
1378        use crate::live_collections::sliced::sliced;
1379
1380        let mut flow = FlowBuilder::new();
1381        let node = flow.process::<()>();
1382
1383        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1384        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1385
1386        let out_recv = sliced! {
1387            let v = use(folded, nondet!(/** test */));
1388            v.clone().zip(v).into_stream()
1389        }
1390        .sim_output();
1391
1392        let count = flow.sim().exhaustive(async || {
1393            let out = out_recv.collect::<Vec<_>>().await;
1394            assert_eq!(out.last(), Some(&(3, 3)));
1395        });
1396
1397        assert_eq!(count, 4);
1398    }
1399}