Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::Atomic;
25use crate::location::{Location, Tick, TopLevel, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28    ApplyMonotoneStream, ApplyOrderPreservingSingleton, Proved, SingletonMapFuncAlgebra,
29    StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor,
30};
31
32/// A marker trait indicating which components of a [`Singleton`] may change.
33///
34/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
35/// includes an additional variant [`Monotonic`], which means that the value will only grow.
36pub trait SingletonBound {
37    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
38    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
39
40    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
41    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
42
43    /// Returns the [`SingletonBoundKind`] corresponding to this type.
44    fn bound_kind() -> SingletonBoundKind;
45}
46
47impl SingletonBound for Unbounded {
48    type UnderlyingBound = Unbounded;
49
50    type StreamToMonotone = Monotonic;
51
52    fn bound_kind() -> SingletonBoundKind {
53        SingletonBoundKind::Unbounded
54    }
55}
56
57impl SingletonBound for Bounded {
58    type UnderlyingBound = Bounded;
59
60    type StreamToMonotone = Bounded;
61
62    fn bound_kind() -> SingletonBoundKind {
63        SingletonBoundKind::Bounded
64    }
65}
66
67/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
68pub struct Monotonic;
69
70impl SingletonBound for Monotonic {
71    type UnderlyingBound = Unbounded;
72
73    type StreamToMonotone = Monotonic;
74
75    fn bound_kind() -> SingletonBoundKind {
76        SingletonBoundKind::Monotonic
77    }
78}
79
80#[sealed]
81#[diagnostic::on_unimplemented(
82    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
83    label = "required here",
84    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
85)]
86/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
87pub trait IsMonotonic: SingletonBound {}
88
89#[sealed]
90#[diagnostic::do_not_recommend]
91impl IsMonotonic for Monotonic {}
92
93#[sealed]
94#[diagnostic::do_not_recommend]
95impl<B: IsBounded> IsMonotonic for B {}
96
97/// A single Rust value that can asynchronously change over time.
98///
99/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
100/// [`Unbounded`], the value will asynchronously change over time.
101///
102/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
103/// a single number that will asynchronously change as events are processed. Singletons also appear
104/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
105/// such as getting the length of a batch of requests.
106///
107/// Type Parameters:
108/// - `Type`: the type of the value in this singleton
109/// - `Loc`: the [`Location`] where the singleton is materialized
110/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
111pub struct Singleton<Type, Loc, Bound: SingletonBound> {
112    pub(crate) location: Loc,
113    pub(crate) ir_node: RefCell<HydroNode>,
114    pub(crate) flow_state: FlowState,
115
116    _phantom: PhantomData<(Type, Loc, Bound)>,
117}
118
119impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
120    fn drop(&mut self) {
121        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
122        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
123            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
124                input: Box::new(ir_node),
125                op_metadata: HydroIrOpMetadata::new(),
126            });
127        }
128    }
129}
130
131impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
132where
133    T: Clone,
134    L: Location<'a>,
135{
136    fn from(value: Singleton<T, L, Bounded>) -> Self {
137        let location = value.location().clone();
138        Singleton::new(
139            location.clone(),
140            HydroNode::UnboundSingleton {
141                inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
142                metadata: location
143                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
144            },
145        )
146    }
147}
148
149impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
150where
151    L: Location<'a>,
152{
153    type Location = Tick<L>;
154
155    fn location(&self) -> &Self::Location {
156        self.location()
157    }
158
159    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
160        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
161            location.clone(),
162            HydroNode::DeferTick {
163                input: Box::new(HydroNode::CycleSource {
164                    cycle_id,
165                    metadata: location.new_node_metadata(Self::collection_kind()),
166                }),
167                metadata: location
168                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
169            },
170        );
171
172        from_previous_tick.unwrap_or(initial)
173    }
174}
175
176impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
177where
178    L: Location<'a>,
179{
180    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
181        assert_eq!(
182            Location::id(&self.location),
183            expected_location,
184            "locations do not match"
185        );
186        self.location
187            .flow_state()
188            .borrow_mut()
189            .push_root(HydroRoot::CycleSink {
190                cycle_id,
191                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
192                op_metadata: HydroIrOpMetadata::new(),
193            });
194    }
195}
196
197impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
198where
199    L: Location<'a>,
200{
201    type Location = L;
202
203    fn create_source(cycle_id: CycleId, location: L) -> Self {
204        Singleton::new(
205            location.clone(),
206            HydroNode::CycleSource {
207                cycle_id,
208                metadata: location.new_node_metadata(Self::collection_kind()),
209            },
210        )
211    }
212}
213
214impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
215where
216    L: Location<'a>,
217{
218    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
219        assert_eq!(
220            Location::id(&self.location),
221            expected_location,
222            "locations do not match"
223        );
224        self.location
225            .flow_state()
226            .borrow_mut()
227            .push_root(HydroRoot::CycleSink {
228                cycle_id,
229                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
230                op_metadata: HydroIrOpMetadata::new(),
231            });
232    }
233}
234
235impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
236where
237    T: Clone,
238    L: Location<'a>,
239{
240    fn clone(&self) -> Self {
241        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
242            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
243            *self.ir_node.borrow_mut() = HydroNode::Tee {
244                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
245                metadata: self.location.new_node_metadata(Self::collection_kind()),
246            };
247        }
248
249        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
250            Singleton {
251                location: self.location.clone(),
252                flow_state: self.flow_state.clone(),
253                ir_node: HydroNode::Tee {
254                    inner: SharedNode(inner.0.clone()),
255                    metadata: metadata.clone(),
256                }
257                .into(),
258                _phantom: PhantomData,
259            }
260        } else {
261            unreachable!()
262        }
263    }
264}
265
266#[cfg(stageleft_runtime)]
267fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
268    me: Singleton<T, Tick<L>, B>,
269    other: Optional<O, Tick<L>, B::UnderlyingBound>,
270) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
271    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
272    super::optional::zip_inside_tick(me_as_optional, other)
273}
274
275impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
276where
277    L: Location<'a>,
278{
279    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
280        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
281        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
282        let flow_state = location.flow_state().clone();
283        Singleton {
284            location,
285            flow_state,
286            ir_node: RefCell::new(ir_node),
287            _phantom: PhantomData,
288        }
289    }
290
291    pub(crate) fn collection_kind() -> CollectionKind {
292        CollectionKind::Singleton {
293            bound: B::bound_kind(),
294            element_type: stageleft::quote_type::<T>().into(),
295        }
296    }
297
298    /// Returns the [`Location`] where this singleton is being materialized.
299    pub fn location(&self) -> &L {
300        &self.location
301    }
302
303    /// Creates a lightweight reference handle to this singleton that can be captured
304    /// inside `q!()` closures. The handle resolves to `&T` at runtime.
305    ///
306    /// The singleton must be bounded, otherwise reading it would be non-deterministic.
307    ///
308    /// ```rust
309    /// # #[cfg(feature = "deploy")] {
310    /// # use hydro_lang::prelude::*;
311    /// # use futures::StreamExt;
312    /// # tokio_test::block_on(async {
313    /// # let mut deployment = hydro_deploy::Deployment::new();
314    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
315    /// # let process = builder.process::<()>();
316    /// # let external = builder.external::<()>();
317    /// let my_count = process
318    ///     .source_iter(q!(0..5i32))
319    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
320    /// let count_ref = my_count.by_ref();
321    /// let out_port = process
322    ///     .source_iter(q!(1..=3i32))
323    ///     .map(q!(|x| x + *count_ref))
324    ///     .send_bincode_external(&external);
325    /// # let nodes = builder
326    /// #     .with_default_optimize()
327    /// #     .with_process(&process, deployment.Localhost())
328    /// #     .with_external(&external, deployment.Localhost())
329    /// #     .deploy(&mut deployment);
330    /// # deployment.deploy().await.unwrap();
331    /// # let mut out_recv = nodes.connect(out_port).await;
332    /// # deployment.start().await.unwrap();
333    /// # let mut results = Vec::new();
334    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
335    /// # results.sort();
336    /// // fold(0..5) = 10, so results are 11, 12, 13
337    /// # assert_eq!(results, vec![11, 12, 13]);
338    /// # });
339    /// # }
340    /// ```
341    pub fn by_ref(&self) -> crate::handoff_ref::SingletonRef<'a, '_, T, L>
342    where
343        B: IsBounded,
344    {
345        crate::handoff_ref::SingletonRef::new(&self.ir_node)
346    }
347
348    /// Returns a mutable reference handle to this singleton that can be captured inside `q!()`
349    /// closures. The handle resolves to `&mut T` at runtime.
350    ///
351    /// Mutable references are ordered via access groups in the generated DFIR code, ensuring
352    /// exclusive access at each point in the execution order.
353    ///
354    /// ```rust
355    /// # #[cfg(feature = "deploy")] {
356    /// # use hydro_lang::prelude::*;
357    /// # use futures::StreamExt;
358    /// # tokio_test::block_on(async {
359    /// # let mut deployment = hydro_deploy::Deployment::new();
360    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
361    /// # let process = builder.process::<()>();
362    /// # let external = builder.external::<()>();
363    /// let my_count = process
364    ///     .source_iter(q!(0..5i32))
365    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
366    /// let count_mut = my_count.by_mut();
367    /// let out_port = process
368    ///     .source_iter(q!(1..=3i32))
369    ///     .map(q!(|x| {
370    ///         *count_mut += x;
371    ///         *count_mut
372    ///     }))
373    ///     .send_bincode_external(&external);
374    /// # let nodes = builder
375    /// #     .with_default_optimize()
376    /// #     .with_process(&process, deployment.Localhost())
377    /// #     .with_external(&external, deployment.Localhost())
378    /// #     .deploy(&mut deployment);
379    /// # deployment.deploy().await.unwrap();
380    /// # let mut out_recv = nodes.connect(out_port).await;
381    /// # deployment.start().await.unwrap();
382    /// # let mut results = Vec::new();
383    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
384    /// # results.sort();
385    /// // fold(0..5) = 10, then each map adds x: results are 11, 13, 16
386    /// # assert_eq!(results, vec![11, 13, 16]);
387    /// # });
388    /// # }
389    /// ```
390    pub fn by_mut(&self) -> crate::handoff_ref::SingletonMut<'a, '_, T, L>
391    where
392        B: IsBounded,
393    {
394        crate::handoff_ref::SingletonMut::new(&self.ir_node)
395    }
396
397    /// Weakens the consistency of this live collection to not guarantee any consistency across
398    /// cluster members (if this collection is on a cluster).
399    pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
400    where
401        L: Location<'a>,
402    {
403        if L::consistency()
404            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
405        {
406            // already no consistency
407            Singleton::new(
408                self.location.drop_consistency(),
409                self.ir_node.replace(HydroNode::Placeholder),
410            )
411        } else {
412            Singleton::new(
413                self.location.drop_consistency(),
414                HydroNode::Cast {
415                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
416                    metadata:
417                        self.location
418                            .clone()
419                            .drop_consistency()
420                            .new_node_metadata(
421                                Singleton::<T, L::DropConsistency, B>::collection_kind(),
422                            ),
423                },
424            )
425        }
426    }
427
428    /// Casts this live collection to have the consistency guarantees specified in the given
429    /// location type parameter. The developer must ensure that the strengthened consistency
430    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
431    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
432        self,
433        _proof: impl crate::properties::ConsistencyProof,
434    ) -> Singleton<T, L2, B>
435    where
436        L: Location<'a>,
437    {
438        if L::consistency() == L2::consistency() {
439            Singleton::new(
440                self.location.with_consistency_of(),
441                self.ir_node.replace(HydroNode::Placeholder),
442            )
443        } else {
444            Singleton::new(
445                self.location.with_consistency_of(),
446                HydroNode::AssertIsConsistent {
447                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448                    trusted: false,
449                    metadata: self
450                        .location
451                        .clone()
452                        .with_consistency_of::<L2>()
453                        .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
454                },
455            )
456        }
457    }
458
459    /// Drops the monotonicity property of the [`Singleton`].
460    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
461        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
462            Singleton::new(
463                self.location.clone(),
464                self.ir_node.replace(HydroNode::Placeholder),
465            )
466        } else {
467            Singleton::new(
468                self.location.clone(),
469                HydroNode::Cast {
470                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
471                    metadata:
472                        self.location.new_node_metadata(
473                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
474                        ),
475                },
476            )
477        }
478    }
479
480    /// Transforms the singleton value by applying a function `f` to it,
481    /// continuously as the input is updated.
482    ///
483    /// # Example
484    /// ```rust
485    /// # #[cfg(feature = "deploy")] {
486    /// # use hydro_lang::prelude::*;
487    /// # use futures::StreamExt;
488    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
489    /// let tick = process.tick();
490    /// let singleton = tick.singleton(q!(5));
491    /// singleton.map(q!(|v| v * 2)).all_ticks()
492    /// # }, |mut stream| async move {
493    /// // 10
494    /// # assert_eq!(stream.next().await.unwrap(), 10);
495    /// # }));
496    /// # }
497    /// ```
498    pub fn map<U, F, OP, B2: SingletonBound>(
499        self,
500        f: impl IntoQuotedMut<'a, F, L, SingletonMapFuncAlgebra<OP>>,
501    ) -> Singleton<U, L, B2>
502    where
503        F: Fn(T) -> U + 'a,
504        B: ApplyOrderPreservingSingleton<OP, B2>,
505    {
506        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
507        proof.register_proof(&f);
508        let f = f.into();
509        Singleton::new(
510            self.location.clone(),
511            HydroNode::Map {
512                f,
513                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
514                metadata: self
515                    .location
516                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
517            },
518        )
519    }
520
521    /// Transforms the singleton value by applying a function `f` to it and then flattening
522    /// the result into a stream, preserving the order of elements.
523    ///
524    /// The function `f` is applied to the singleton value to produce an iterator, and all items
525    /// from that iterator are emitted in the output stream in deterministic order.
526    ///
527    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
528    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
529    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
530    ///
531    /// # Example
532    /// ```rust
533    /// # #[cfg(feature = "deploy")] {
534    /// # use hydro_lang::prelude::*;
535    /// # use futures::StreamExt;
536    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
537    /// let tick = process.tick();
538    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
539    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
540    /// # }, |mut stream| async move {
541    /// // 1, 2, 3
542    /// # for w in vec![1, 2, 3] {
543    /// #     assert_eq!(stream.next().await.unwrap(), w);
544    /// # }
545    /// # }));
546    /// # }
547    /// ```
548    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
549        self,
550        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
551    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
552    where
553        B: IsBounded,
554        I: IntoIterator<Item = U>,
555        F: FnMut(T) -> I + 'a,
556        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
557        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
558    {
559        self.into_stream().flat_map_ordered(f)
560    }
561
562    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
563    /// for the output type `I` to produce items in any order.
564    ///
565    /// The function `f` is applied to the singleton value to produce an iterator, and all items
566    /// from that iterator are emitted in the output stream in non-deterministic order.
567    ///
568    /// # Example
569    /// ```rust
570    /// # #[cfg(feature = "deploy")] {
571    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
572    /// # use futures::StreamExt;
573    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
574    /// let tick = process.tick();
575    /// let singleton = tick.singleton(q!(
576    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
577    /// ));
578    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
579    /// # }, |mut stream| async move {
580    /// // 1, 2, 3, but in no particular order
581    /// # let mut results = Vec::new();
582    /// # for _ in 0..3 {
583    /// #     results.push(stream.next().await.unwrap());
584    /// # }
585    /// # results.sort();
586    /// # assert_eq!(results, vec![1, 2, 3]);
587    /// # }));
588    /// # }
589    /// ```
590    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
591        self,
592        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
593    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
594    where
595        B: IsBounded,
596        I: IntoIterator<Item = U>,
597        F: FnMut(T) -> I + 'a,
598        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
599        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
600    {
601        self.into_stream().flat_map_unordered(f)
602    }
603
604    /// Flattens the singleton value into a stream, preserving the order of elements.
605    ///
606    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
607    /// are emitted in the output stream in deterministic order.
608    ///
609    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
610    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
611    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
612    ///
613    /// # Example
614    /// ```rust
615    /// # #[cfg(feature = "deploy")] {
616    /// # use hydro_lang::prelude::*;
617    /// # use futures::StreamExt;
618    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
619    /// let tick = process.tick();
620    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
621    /// singleton.flatten_ordered().all_ticks()
622    /// # }, |mut stream| async move {
623    /// // 1, 2, 3
624    /// # for w in vec![1, 2, 3] {
625    /// #     assert_eq!(stream.next().await.unwrap(), w);
626    /// # }
627    /// # }));
628    /// # }
629    /// ```
630    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
631    where
632        B: IsBounded,
633        T: IntoIterator<Item = U>,
634    {
635        self.flat_map_ordered(q!(|x| x))
636    }
637
638    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
639    /// for the element type `T` to produce items in any order.
640    ///
641    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
642    /// are emitted in the output stream in non-deterministic order.
643    ///
644    /// # Example
645    /// ```rust
646    /// # #[cfg(feature = "deploy")] {
647    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
648    /// # use futures::StreamExt;
649    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
650    /// let tick = process.tick();
651    /// let singleton = tick.singleton(q!(
652    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
653    /// ));
654    /// singleton.flatten_unordered().all_ticks()
655    /// # }, |mut stream| async move {
656    /// // 1, 2, 3, but in no particular order
657    /// # let mut results = Vec::new();
658    /// # for _ in 0..3 {
659    /// #     results.push(stream.next().await.unwrap());
660    /// # }
661    /// # results.sort();
662    /// # assert_eq!(results, vec![1, 2, 3]);
663    /// # }));
664    /// # }
665    /// ```
666    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
667    where
668        B: IsBounded,
669        T: IntoIterator<Item = U>,
670    {
671        self.flat_map_unordered(q!(|x| x))
672    }
673
674    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
675    ///
676    /// If the predicate returns `true`, the output optional contains the same value.
677    /// If the predicate returns `false`, the output optional is empty.
678    ///
679    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
680    /// not modify or take ownership of the value. If you need to modify the value while filtering
681    /// use [`Singleton::filter_map`] instead.
682    ///
683    /// # Example
684    /// ```rust
685    /// # #[cfg(feature = "deploy")] {
686    /// # use hydro_lang::prelude::*;
687    /// # use futures::StreamExt;
688    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
689    /// let tick = process.tick();
690    /// let singleton = tick.singleton(q!(5));
691    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
692    /// # }, |mut stream| async move {
693    /// // 5
694    /// # assert_eq!(stream.next().await.unwrap(), 5);
695    /// # }));
696    /// # }
697    /// ```
698    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
699    where
700        F: Fn(&T) -> bool + 'a,
701    {
702        let f = f.splice_fn1_borrow_ctx(&self.location).into();
703        Optional::new(
704            self.location.clone(),
705            HydroNode::Filter {
706                f,
707                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
708                metadata: self
709                    .location
710                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
711            },
712        )
713    }
714
715    /// An operator that both filters and maps. It yields the value only if the supplied
716    /// closure `f` returns `Some(value)`.
717    ///
718    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
719    /// If the closure returns `None`, the output optional is empty.
720    ///
721    /// # Example
722    /// ```rust
723    /// # #[cfg(feature = "deploy")] {
724    /// # use hydro_lang::prelude::*;
725    /// # use futures::StreamExt;
726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727    /// let tick = process.tick();
728    /// let singleton = tick.singleton(q!("42"));
729    /// singleton
730    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
731    ///     .all_ticks()
732    /// # }, |mut stream| async move {
733    /// // 42
734    /// # assert_eq!(stream.next().await.unwrap(), 42);
735    /// # }));
736    /// # }
737    /// ```
738    pub fn filter_map<U, F>(
739        self,
740        f: impl IntoQuotedMut<'a, F, L>,
741    ) -> Optional<U, L, B::UnderlyingBound>
742    where
743        F: Fn(T) -> Option<U> + 'a,
744    {
745        let f = f.splice_fn1_ctx(&self.location).into();
746        Optional::new(
747            self.location.clone(),
748            HydroNode::FilterMap {
749                f,
750                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
751                metadata: self
752                    .location
753                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
754            },
755        )
756    }
757
758    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
759    ///
760    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
761    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
762    /// non-null. This is useful for combining several pieces of state together.
763    ///
764    /// # Example
765    /// ```rust
766    /// # #[cfg(feature = "deploy")] {
767    /// # use hydro_lang::prelude::*;
768    /// # use futures::StreamExt;
769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770    /// let tick = process.tick();
771    /// let numbers = process
772    ///   .source_iter(q!(vec![123, 456]))
773    ///   .batch(&tick, nondet!(/** test */));
774    /// let count = numbers.clone().count(); // Singleton
775    /// let max = numbers.max(); // Optional
776    /// count.zip(max).all_ticks()
777    /// # }, |mut stream| async move {
778    /// // [(2, 456)]
779    /// # for w in vec![(2, 456)] {
780    /// #     assert_eq!(stream.next().await.unwrap(), w);
781    /// # }
782    /// # }));
783    /// # }
784    /// ```
785    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
786    where
787        Self: ZipResult<'a, O, Location = L>,
788        B: IsBounded,
789    {
790        check_matching_location(&self.location, &Self::other_location(&other));
791
792        if L::is_top_level()
793            && let Some(tick) = self.location.try_tick()
794        {
795            let self_location = self.location().clone();
796            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
797            let out = zip_inside_tick(
798                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
799                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
800                    other_location.clone(),
801                    HydroNode::Cast {
802                        inner: Box::new(Self::other_ir_node(other)),
803                        metadata: other_location.new_node_metadata(Optional::<
804                            <Self as ZipResult<'a, O>>::OtherType,
805                            Tick<L>,
806                            Bounded,
807                        >::collection_kind(
808                        )),
809                    },
810                )
811                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
812            )
813            .latest();
814
815            Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
816        } else {
817            Self::make(
818                self.location.clone(),
819                HydroNode::CrossSingleton {
820                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
821                    right: Box::new(Self::other_ir_node(other)),
822                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
823                        bound: B::BOUND_KIND,
824                        element_type: stageleft::quote_type::<
825                            <Self as ZipResult<'a, O>>::ElementType,
826                        >()
827                        .into(),
828                    }),
829                },
830            )
831        }
832    }
833
834    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
835    /// boolean signal is `true`, otherwise the output is null.
836    ///
837    /// # Example
838    /// ```rust
839    /// # #[cfg(feature = "deploy")] {
840    /// # use hydro_lang::prelude::*;
841    /// # use futures::StreamExt;
842    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
843    /// let tick = process.tick();
844    /// // ticks are lazy by default, forces the second tick to run
845    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
846    ///
847    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
848    /// let batch_first_tick = process
849    ///   .source_iter(q!(vec![1]))
850    ///   .batch(&tick, nondet!(/** test */));
851    /// let batch_second_tick = process
852    ///   .source_iter(q!(vec![1, 2, 3]))
853    ///   .batch(&tick, nondet!(/** test */))
854    ///   .defer_tick();
855    /// batch_first_tick.chain(batch_second_tick).count()
856    ///   .filter_if(signal)
857    ///   .all_ticks()
858    /// # }, |mut stream| async move {
859    /// // [1]
860    /// # for w in vec![1] {
861    /// #     assert_eq!(stream.next().await.unwrap(), w);
862    /// # }
863    /// # }));
864    /// # }
865    /// ```
866    pub fn filter_if(
867        self,
868        signal: Singleton<bool, L, B>,
869    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
870    where
871        B: IsBounded,
872    {
873        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
874    }
875
876    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
877    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
878    ///
879    /// Useful for conditionally processing, such as only emitting a singleton's value outside
880    /// a tick if some other condition is satisfied.
881    ///
882    /// # Example
883    /// ```rust
884    /// # #[cfg(feature = "deploy")] {
885    /// # use hydro_lang::prelude::*;
886    /// # use futures::StreamExt;
887    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888    /// let tick = process.tick();
889    /// // ticks are lazy by default, forces the second tick to run
890    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
891    ///
892    /// let batch_first_tick = process
893    ///   .source_iter(q!(vec![1]))
894    ///   .batch(&tick, nondet!(/** test */));
895    /// let batch_second_tick = process
896    ///   .source_iter(q!(vec![1, 2, 3]))
897    ///   .batch(&tick, nondet!(/** test */))
898    ///   .defer_tick(); // appears on the second tick
899    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
900    /// batch_first_tick.chain(batch_second_tick).count()
901    ///   .filter_if_some(some_on_first_tick)
902    ///   .all_ticks()
903    /// # }, |mut stream| async move {
904    /// // [1]
905    /// # for w in vec![1] {
906    /// #     assert_eq!(stream.next().await.unwrap(), w);
907    /// # }
908    /// # }));
909    /// # }
910    /// ```
911    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
912    pub fn filter_if_some<U>(
913        self,
914        signal: Optional<U, L, B>,
915    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
916    where
917        B: IsBounded,
918    {
919        self.filter_if(signal.is_some())
920    }
921
922    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
923    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
924    ///
925    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
926    /// the condition.
927    ///
928    /// # Example
929    /// ```rust
930    /// # #[cfg(feature = "deploy")] {
931    /// # use hydro_lang::prelude::*;
932    /// # use futures::StreamExt;
933    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
934    /// let tick = process.tick();
935    /// // ticks are lazy by default, forces the second tick to run
936    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
937    ///
938    /// let batch_first_tick = process
939    ///   .source_iter(q!(vec![1]))
940    ///   .batch(&tick, nondet!(/** test */));
941    /// let batch_second_tick = process
942    ///   .source_iter(q!(vec![1, 2, 3]))
943    ///   .batch(&tick, nondet!(/** test */))
944    ///   .defer_tick(); // appears on the second tick
945    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
946    /// batch_first_tick.chain(batch_second_tick).count()
947    ///   .filter_if_none(some_on_first_tick)
948    ///   .all_ticks()
949    /// # }, |mut stream| async move {
950    /// // [3]
951    /// # for w in vec![3] {
952    /// #     assert_eq!(stream.next().await.unwrap(), w);
953    /// # }
954    /// # }));
955    /// # }
956    /// ```
957    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
958    pub fn filter_if_none<U>(
959        self,
960        other: Optional<U, L, B>,
961    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
962    where
963        B: IsBounded,
964    {
965        self.filter_if(other.is_none())
966    }
967
968    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
969    ///
970    /// # Example
971    /// ```rust
972    /// # #[cfg(feature = "deploy")] {
973    /// # use hydro_lang::prelude::*;
974    /// # use futures::StreamExt;
975    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976    /// let tick = process.tick();
977    /// let a = tick.singleton(q!(5));
978    /// let b = tick.singleton(q!(5));
979    /// a.equals(b).all_ticks()
980    /// # }, |mut stream| async move {
981    /// // [true]
982    /// # assert_eq!(stream.next().await.unwrap(), true);
983    /// # }));
984    /// # }
985    /// ```
986    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
987    where
988        T: PartialEq,
989        B: IsBounded,
990    {
991        self.zip(other).map(q!(|(a, b)| a == b))
992    }
993
994    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
995    /// greater than or equal to the provided threshold. The event will have the value of the
996    /// given threshold.
997    ///
998    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
999    /// the threshold would be non-deterministic.
1000    ///
1001    /// # Example
1002    /// ```rust
1003    /// # #[cfg(feature = "deploy")] {
1004    /// # use hydro_lang::prelude::*;
1005    /// # use futures::StreamExt;
1006    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1007    /// let a = // singleton 1 ~> 5 ~> 10
1008    /// # process.singleton(q!(5));
1009    /// let b = process.singleton(q!(4));
1010    /// a.threshold_greater_or_equal(b)
1011    /// # }, |mut stream| async move {
1012    /// // [4]
1013    /// # assert_eq!(stream.next().await.unwrap(), 4);
1014    /// # }));
1015    /// # }
1016    /// ```
1017    pub fn threshold_greater_or_equal<B2: IsBounded>(
1018        self,
1019        threshold: Singleton<T, L, B2>,
1020    ) -> Stream<T, L, B::UnderlyingBound>
1021    where
1022        T: Clone + PartialOrd,
1023        B: IsMonotonic,
1024    {
1025        let threshold = threshold.make_bounded();
1026        let self_location = self.location().clone();
1027        match self.try_make_bounded() {
1028            Ok(bounded) => {
1029                let uncasted = threshold
1030                    .zip(bounded)
1031                    .into_stream()
1032                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
1033
1034                Stream::new(
1035                    uncasted.location.clone(),
1036                    uncasted.ir_node.replace(HydroNode::Placeholder),
1037                )
1038            }
1039            Err(me) => {
1040                let uncasted = sliced! {
1041                    let me = use(me, nondet!(/** thresholds are deterministic */));
1042                    let mut remaining_threshold = use::state(|l| {
1043                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
1044                        as_option
1045                    });
1046
1047                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
1048                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
1049                    passed.map(q!(|(t, _)| t))
1050                };
1051
1052                Stream::new(
1053                    self_location,
1054                    uncasted.ir_node.replace(HydroNode::Placeholder),
1055                )
1056            }
1057        }
1058    }
1059
1060    /// An operator which allows you to "name" a `HydroNode`.
1061    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1062    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1063        {
1064            let mut node = self.ir_node.borrow_mut();
1065            let metadata = node.metadata_mut();
1066            metadata.tag = Some(name.to_owned());
1067        }
1068        self
1069    }
1070}
1071
1072impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1073    type Output = Singleton<bool, L, B::UnderlyingBound>;
1074
1075    fn not(self) -> Self::Output {
1076        self.map(q!(|b| !b))
1077    }
1078}
1079
1080impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1081where
1082    L: Location<'a>,
1083{
1084    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1085    /// the inner `Option`.
1086    ///
1087    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1088    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1089    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1090    ///
1091    /// # Example
1092    /// ```rust
1093    /// # #[cfg(feature = "deploy")] {
1094    /// # use hydro_lang::prelude::*;
1095    /// # use futures::StreamExt;
1096    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1097    /// let tick = process.tick();
1098    /// let singleton = tick.singleton(q!(Some(42)));
1099    /// singleton.into_optional().all_ticks()
1100    /// # }, |mut stream| async move {
1101    /// // 42
1102    /// # assert_eq!(stream.next().await.unwrap(), 42);
1103    /// # }));
1104    /// # }
1105    /// ```
1106    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1107        self.filter_map(q!(|v| v))
1108    }
1109}
1110
1111impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1112where
1113    L: Location<'a>,
1114{
1115    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1116    ///
1117    /// # Example
1118    /// ```rust
1119    /// # #[cfg(feature = "deploy")] {
1120    /// # use hydro_lang::prelude::*;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123    /// let tick = process.tick();
1124    /// // ticks are lazy by default, forces the second tick to run
1125    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1126    ///
1127    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1128    /// let b = tick.singleton(q!(true)); // true, true
1129    /// a.and(b).all_ticks()
1130    /// # }, |mut stream| async move {
1131    /// // [true, false]
1132    /// # for w in vec![true, false] {
1133    /// #     assert_eq!(stream.next().await.unwrap(), w);
1134    /// # }
1135    /// # }));
1136    /// # }
1137    /// ```
1138    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1139    where
1140        B: IsBounded,
1141    {
1142        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1143    }
1144
1145    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1146    ///
1147    /// # Example
1148    /// ```rust
1149    /// # #[cfg(feature = "deploy")] {
1150    /// # use hydro_lang::prelude::*;
1151    /// # use futures::StreamExt;
1152    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1153    /// let tick = process.tick();
1154    /// // ticks are lazy by default, forces the second tick to run
1155    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1156    ///
1157    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1158    /// let b = tick.singleton(q!(false)); // false, false
1159    /// a.or(b).all_ticks()
1160    /// # }, |mut stream| async move {
1161    /// // [true, false]
1162    /// # for w in vec![true, false] {
1163    /// #     assert_eq!(stream.next().await.unwrap(), w);
1164    /// # }
1165    /// # }));
1166    /// # }
1167    /// ```
1168    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1169    where
1170        B: IsBounded,
1171    {
1172        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1173    }
1174}
1175
1176impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1177where
1178    L: Location<'a>,
1179{
1180    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1181    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1182    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1183    /// all snapshots of this singleton into the atomic-associated tick will observe the
1184    /// same value each tick.
1185    ///
1186    /// # Non-Determinism
1187    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1188    /// the output singleton has a non-deterministic value since the snapshot can be at an
1189    /// arbitrary point in time.
1190    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1191        self,
1192        tick: &Tick<L2>,
1193        _nondet: NonDet,
1194    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1195        Singleton::new(
1196            tick.drop_consistency(),
1197            HydroNode::Batch {
1198                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199                metadata: tick
1200                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1201            },
1202        )
1203    }
1204
1205    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1206    /// to the value will be asynchronously propagated.
1207    pub fn end_atomic(self) -> Singleton<T, L, B> {
1208        Singleton::new(
1209            self.location.tick.l.clone(),
1210            HydroNode::EndAtomic {
1211                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1212                metadata: self
1213                    .location
1214                    .tick
1215                    .l
1216                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1217            },
1218        )
1219    }
1220}
1221
1222impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1223where
1224    L: Location<'a>,
1225{
1226    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1227    /// will observe the same version of the value and will be executed synchronously before any
1228    /// outputs are yielded (in [`Optional::end_atomic`]).
1229    ///
1230    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1231    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1232    /// a different version).
1233    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1234        let id = self.location.flow_state().borrow_mut().next_clock_id();
1235        let out_location = Atomic {
1236            tick: Tick {
1237                id,
1238                l: self.location.clone(),
1239            },
1240        };
1241        Singleton::new(
1242            out_location.clone(),
1243            HydroNode::BeginAtomic {
1244                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1245                metadata: out_location
1246                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1247            },
1248        )
1249    }
1250
1251    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1252    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1253    /// relevant data that contributed to the snapshot at tick `t`.
1254    ///
1255    /// # Non-Determinism
1256    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1257    /// the output singleton has a non-deterministic value since the snapshot can be at an
1258    /// arbitrary point in time.
1259    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1260        self,
1261        tick: &Tick<L2>,
1262        _nondet: NonDet,
1263    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1264        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1265        Singleton::new(
1266            tick.drop_consistency(),
1267            HydroNode::Batch {
1268                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1269                metadata: tick
1270                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1271            },
1272        )
1273    }
1274
1275    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1276    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1277    ///
1278    /// # Non-Determinism
1279    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1280    /// to non-deterministic batching and arrival of inputs, the output stream is
1281    /// non-deterministic.
1282    pub fn sample_eager(
1283        self,
1284        nondet: NonDet,
1285    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1286        sliced! {
1287            let snapshot = use(self, nondet);
1288            snapshot.into_stream()
1289        }
1290        .weaken_retries()
1291    }
1292
1293    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1294    /// value taken at various points in time. Because the input singleton may be
1295    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1296    /// represent the value of the singleton given some prefix of the streams leading up to
1297    /// it.
1298    ///
1299    /// # Non-Determinism
1300    /// The output stream is non-deterministic in which elements are sampled, since this
1301    /// is controlled by a clock.
1302    pub fn sample_every(
1303        self,
1304        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1305        nondet: NonDet,
1306    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1307    where
1308        L: TopLevel<'a>,
1309    {
1310        let samples = self.location.source_interval(interval);
1311        sliced! {
1312            let snapshot = use(self, nondet);
1313            let sample_batch = use(samples, nondet);
1314
1315            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1316        }
1317        .weaken_retries()
1318    }
1319
1320    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1321    /// implies that `B == Bounded`.
1322    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1323    where
1324        B: IsBounded,
1325    {
1326        Singleton::new(
1327            self.location.clone(),
1328            self.ir_node.replace(HydroNode::Placeholder),
1329        )
1330    }
1331
1332    #[expect(clippy::result_large_err, reason = "internal use only")]
1333    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1334        if B::UnderlyingBound::BOUNDED {
1335            Ok(Singleton::new(
1336                self.location.clone(),
1337                self.ir_node.replace(HydroNode::Placeholder),
1338            ))
1339        } else {
1340            Err(self)
1341        }
1342    }
1343
1344    /// Clones this bounded singleton into a tick, returning a singleton that has the
1345    /// same value as the outer singleton. Because the outer singleton is bounded, this
1346    /// is deterministic because there is only a single immutable version.
1347    pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1348        self,
1349        tick: &Tick<L2>,
1350    ) -> Singleton<T, Tick<L2>, Bounded>
1351    where
1352        B: IsBounded,
1353        T: Clone,
1354    {
1355        // TODO(shadaj): avoid printing simulator logs for this snapshot
1356        let inner = self.snapshot(
1357            tick,
1358            nondet!(/** bounded top-level singleton so deterministic */),
1359        );
1360        Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1361    }
1362
1363    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1364    ///
1365    /// # Example
1366    /// ```rust
1367    /// # #[cfg(feature = "deploy")] {
1368    /// # use hydro_lang::prelude::*;
1369    /// # use futures::StreamExt;
1370    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1371    /// let tick = process.tick();
1372    /// let batch_input = process
1373    ///   .source_iter(q!(vec![123, 456]))
1374    ///   .batch(&tick, nondet!(/** test */));
1375    /// batch_input.clone().chain(
1376    ///   batch_input.count().into_stream()
1377    /// ).all_ticks()
1378    /// # }, |mut stream| async move {
1379    /// // [123, 456, 2]
1380    /// # for w in vec![123, 456, 2] {
1381    /// #     assert_eq!(stream.next().await.unwrap(), w);
1382    /// # }
1383    /// # }));
1384    /// # }
1385    /// ```
1386    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1387    where
1388        B: IsBounded,
1389    {
1390        Stream::new(
1391            self.location.clone(),
1392            HydroNode::Cast {
1393                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1394                metadata: self.location.new_node_metadata(Stream::<
1395                    T,
1396                    Tick<L>,
1397                    Bounded,
1398                    TotalOrder,
1399                    ExactlyOnce,
1400                >::collection_kind()),
1401            },
1402        )
1403    }
1404
1405    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1406    /// producing a singleton of the resolved output.
1407    ///
1408    /// This is useful when the singleton contains an async computation that must
1409    /// be awaited before further processing. The future is polled to completion
1410    /// before the output value is emitted.
1411    ///
1412    /// # Example
1413    /// ```rust
1414    /// # #[cfg(feature = "deploy")] {
1415    /// # use hydro_lang::prelude::*;
1416    /// # use futures::StreamExt;
1417    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1418    /// let tick = process.tick();
1419    /// let singleton = tick.singleton(q!(5));
1420    /// singleton
1421    ///     .map(q!(|v| async move { v * 2 }))
1422    ///     .resolve_future_blocking()
1423    ///     .all_ticks()
1424    /// # }, |mut stream| async move {
1425    /// // 10
1426    /// # assert_eq!(stream.next().await.unwrap(), 10);
1427    /// # }));
1428    /// # }
1429    /// ```
1430    pub fn resolve_future_blocking(
1431        self,
1432    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1433    where
1434        T: Future,
1435        B: IsBounded,
1436    {
1437        Singleton::new(
1438            self.location.clone(),
1439            HydroNode::ResolveFuturesBlocking {
1440                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1441                metadata: self
1442                    .location
1443                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1444            },
1445        )
1446    }
1447}
1448
1449impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1450where
1451    L: Location<'a>,
1452{
1453    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1454    /// which will stream the value computed in _each_ tick as a separate stream element.
1455    ///
1456    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1457    /// producing one element in the output for each tick. This is useful for batched computations,
1458    /// where the results from each tick must be combined together.
1459    ///
1460    /// # Example
1461    /// ```rust
1462    /// # #[cfg(feature = "deploy")] {
1463    /// # use hydro_lang::prelude::*;
1464    /// # use futures::StreamExt;
1465    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1466    /// let tick = process.tick();
1467    /// # // ticks are lazy by default, forces the second tick to run
1468    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1469    /// # let batch_first_tick = process
1470    /// #   .source_iter(q!(vec![1]))
1471    /// #   .batch(&tick, nondet!(/** test */));
1472    /// # let batch_second_tick = process
1473    /// #   .source_iter(q!(vec![1, 2, 3]))
1474    /// #   .batch(&tick, nondet!(/** test */))
1475    /// #   .defer_tick(); // appears on the second tick
1476    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1477    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1478    ///     .count()
1479    ///     .all_ticks()
1480    /// # }, |mut stream| async move {
1481    /// // [1, 3]
1482    /// # for w in vec![1, 3] {
1483    /// #     assert_eq!(stream.next().await.unwrap(), w);
1484    /// # }
1485    /// # }));
1486    /// # }
1487    /// ```
1488    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1489        self.into_stream().all_ticks()
1490    }
1491
1492    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1493    /// which will stream the value computed in _each_ tick as a separate stream element.
1494    ///
1495    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1496    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1497    /// singleton's [`Tick`] context.
1498    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1499        self.into_stream().all_ticks_atomic()
1500    }
1501
1502    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1503    /// be asynchronously updated with the latest value of the singleton inside the tick.
1504    ///
1505    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1506    /// tick that tracks the inner value. This is useful for getting the value as of the
1507    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1508    ///
1509    /// # Example
1510    /// ```rust
1511    /// # #[cfg(feature = "deploy")] {
1512    /// # use hydro_lang::prelude::*;
1513    /// # use futures::StreamExt;
1514    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1515    /// let tick = process.tick();
1516    /// # // ticks are lazy by default, forces the second tick to run
1517    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1518    /// # let batch_first_tick = process
1519    /// #   .source_iter(q!(vec![1]))
1520    /// #   .batch(&tick, nondet!(/** test */));
1521    /// # let batch_second_tick = process
1522    /// #   .source_iter(q!(vec![1, 2, 3]))
1523    /// #   .batch(&tick, nondet!(/** test */))
1524    /// #   .defer_tick(); // appears on the second tick
1525    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1526    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1527    ///     .count()
1528    ///     .latest()
1529    /// # .sample_eager(nondet!(/** test */))
1530    /// # }, |mut stream| async move {
1531    /// // asynchronously changes from 1 ~> 3
1532    /// # for w in vec![1, 3] {
1533    /// #     assert_eq!(stream.next().await.unwrap(), w);
1534    /// # }
1535    /// # }));
1536    /// # }
1537    /// ```
1538    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1539        Singleton::new(
1540            self.location.outer().clone(),
1541            HydroNode::YieldConcat {
1542                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1543                metadata: self
1544                    .location
1545                    .outer()
1546                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1547            },
1548        )
1549    }
1550
1551    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1552    /// be updated with the latest value of the singleton inside the tick.
1553    ///
1554    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1555    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1556    /// singleton's [`Tick`] context.
1557    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1558        let out_location = Atomic {
1559            tick: self.location.clone(),
1560        };
1561        Singleton::new(
1562            out_location.clone(),
1563            HydroNode::YieldConcat {
1564                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1565                metadata: out_location
1566                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1567            },
1568        )
1569    }
1570}
1571
1572#[doc(hidden)]
1573/// Helper trait that determines the output collection type for [`Singleton::zip`].
1574///
1575/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1576/// [`Singleton`].
1577#[sealed::sealed]
1578pub trait ZipResult<'a, Other> {
1579    /// The output collection type.
1580    type Out;
1581    /// The type of the tupled output value.
1582    type ElementType;
1583    /// The type of the other collection's value.
1584    type OtherType;
1585    /// The location where the tupled result will be materialized.
1586    type Location: Location<'a>;
1587
1588    /// The location of the second input to the `zip`.
1589    fn other_location(other: &Other) -> Self::Location;
1590    /// The IR node of the second input to the `zip`.
1591    fn other_ir_node(other: Other) -> HydroNode;
1592
1593    /// Constructs the output live collection given an IR node containing the zip result.
1594    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1595}
1596
1597#[sealed::sealed]
1598impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1599where
1600    L: Location<'a>,
1601{
1602    type Out = Singleton<(T, U), L, B>;
1603    type ElementType = (T, U);
1604    type OtherType = U;
1605    type Location = L;
1606
1607    fn other_location(other: &Singleton<U, L, B>) -> L {
1608        other.location.clone()
1609    }
1610
1611    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1612        other.ir_node.replace(HydroNode::Placeholder)
1613    }
1614
1615    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1616        Singleton::new(
1617            location.clone(),
1618            HydroNode::Cast {
1619                inner: Box::new(ir_node),
1620                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1621            },
1622        )
1623    }
1624}
1625
1626#[sealed::sealed]
1627impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1628    for Singleton<T, L, B>
1629where
1630    L: Location<'a>,
1631{
1632    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1633    type ElementType = (T, U);
1634    type OtherType = U;
1635    type Location = L;
1636
1637    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1638        other.location.clone()
1639    }
1640
1641    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1642        other.ir_node.replace(HydroNode::Placeholder)
1643    }
1644
1645    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1646        Optional::new(location, ir_node)
1647    }
1648}
1649
1650#[cfg(test)]
1651mod tests {
1652    #[cfg(feature = "deploy")]
1653    use futures::{SinkExt, StreamExt};
1654    #[cfg(feature = "deploy")]
1655    use hydro_deploy::Deployment;
1656    #[cfg(any(feature = "deploy", feature = "sim"))]
1657    use stageleft::q;
1658
1659    #[cfg(any(feature = "deploy", feature = "sim"))]
1660    use crate::compile::builder::FlowBuilder;
1661    #[cfg(feature = "deploy")]
1662    use crate::live_collections::stream::ExactlyOnce;
1663    #[cfg(any(feature = "deploy", feature = "sim"))]
1664    use crate::location::Location;
1665    #[cfg(any(feature = "deploy", feature = "sim"))]
1666    use crate::nondet::nondet;
1667
1668    #[cfg(feature = "deploy")]
1669    #[tokio::test]
1670    async fn tick_cycle_cardinality() {
1671        let mut deployment = Deployment::new();
1672
1673        let mut flow = FlowBuilder::new();
1674        let node = flow.process::<()>();
1675        let external = flow.external::<()>();
1676
1677        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1678
1679        let node_tick = node.tick();
1680        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1681        let counts = singleton
1682            .clone()
1683            .into_stream()
1684            .count()
1685            .filter_if(
1686                input
1687                    .batch(&node_tick, nondet!(/** testing */))
1688                    .first()
1689                    .is_some(),
1690            )
1691            .all_ticks()
1692            .send_bincode_external(&external);
1693        complete_cycle.complete_next_tick(singleton);
1694
1695        let nodes = flow
1696            .with_process(&node, deployment.Localhost())
1697            .with_external(&external, deployment.Localhost())
1698            .deploy(&mut deployment);
1699
1700        deployment.deploy().await.unwrap();
1701
1702        let mut tick_trigger = nodes.connect(input_send).await;
1703        let mut external_out = nodes.connect(counts).await;
1704
1705        deployment.start().await.unwrap();
1706
1707        tick_trigger.send(()).await.unwrap();
1708
1709        assert_eq!(external_out.next().await.unwrap(), 1);
1710
1711        tick_trigger.send(()).await.unwrap();
1712
1713        assert_eq!(external_out.next().await.unwrap(), 1);
1714    }
1715
1716    #[cfg(feature = "sim")]
1717    #[test]
1718    #[should_panic]
1719    fn sim_fold_intermediate_states() {
1720        let mut flow = FlowBuilder::new();
1721        let node = flow.process::<()>();
1722
1723        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1724        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1725
1726        let tick = node.tick();
1727        let batch = folded.snapshot(&tick, nondet!(/** test */));
1728        let out_recv = batch.all_ticks().sim_output();
1729
1730        flow.sim().exhaustive(async || {
1731            assert_eq!(out_recv.next().await.unwrap(), 10);
1732        });
1733    }
1734
1735    #[cfg(feature = "sim")]
1736    #[test]
1737    fn sim_fold_intermediate_state_count() {
1738        let mut flow = FlowBuilder::new();
1739        let node = flow.process::<()>();
1740
1741        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1742        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1743
1744        let tick = node.tick();
1745        let batch = folded.snapshot(&tick, nondet!(/** test */));
1746        let out_recv = batch.all_ticks().sim_output();
1747
1748        let instance_count = flow.sim().exhaustive(async || {
1749            let out = out_recv.collect::<Vec<_>>().await;
1750            assert_eq!(out.last(), Some(&10));
1751        });
1752
1753        assert_eq!(
1754            instance_count,
1755            16 // 2^4 possible subsets of intermediates (including initial state)
1756        )
1757    }
1758
1759    #[cfg(feature = "sim")]
1760    #[test]
1761    fn sim_fold_no_repeat_initial() {
1762        // check that we don't repeat the initial state of the fold in autonomous decisions
1763
1764        let mut flow = FlowBuilder::new();
1765        let node = flow.process::<()>();
1766
1767        let (in_port, input) = node.sim_input();
1768        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1769
1770        let tick = node.tick();
1771        let batch = folded.snapshot(&tick, nondet!(/** test */));
1772        let out_recv = batch.all_ticks().sim_output();
1773
1774        flow.sim().exhaustive(async || {
1775            assert_eq!(out_recv.next().await.unwrap(), 0);
1776
1777            in_port.send(123);
1778
1779            assert_eq!(out_recv.next().await.unwrap(), 123);
1780        });
1781    }
1782
1783    #[cfg(feature = "sim")]
1784    #[test]
1785    #[should_panic]
1786    fn sim_fold_repeats_snapshots() {
1787        // when the tick is driven by a snapshot AND something else, the snapshot can
1788        // "stutter" and repeat the same state multiple times
1789
1790        let mut flow = FlowBuilder::new();
1791        let node = flow.process::<()>();
1792
1793        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1794        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1795
1796        let tick = node.tick();
1797        let batch = source
1798            .batch(&tick, nondet!(/** test */))
1799            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1800        let out_recv = batch.all_ticks().sim_output();
1801
1802        flow.sim().exhaustive(async || {
1803            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1804            {
1805                panic!("repeated snapshot");
1806            }
1807        });
1808    }
1809
1810    #[cfg(feature = "sim")]
1811    #[test]
1812    fn sim_fold_repeats_snapshots_count() {
1813        // check the number of instances
1814        let mut flow = FlowBuilder::new();
1815        let node = flow.process::<()>();
1816
1817        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1818        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1819
1820        let tick = node.tick();
1821        let batch = source
1822            .batch(&tick, nondet!(/** test */))
1823            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1824        let out_recv = batch.all_ticks().sim_output();
1825
1826        let count = flow.sim().exhaustive(async || {
1827            let _ = out_recv.collect::<Vec<_>>().await;
1828        });
1829
1830        assert_eq!(count, 52);
1831        // don't have a combinatorial explanation for this number yet, but checked via logs
1832    }
1833
1834    #[cfg(feature = "sim")]
1835    #[test]
1836    fn sim_top_level_singleton_exhaustive() {
1837        // ensures that top-level singletons have only one snapshot
1838        let mut flow = FlowBuilder::new();
1839        let node = flow.process::<()>();
1840
1841        let singleton = node.singleton(q!(1));
1842        let tick = node.tick();
1843        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1844        let out_recv = batch.all_ticks().sim_output();
1845
1846        let count = flow.sim().exhaustive(async || {
1847            let _ = out_recv.collect::<Vec<_>>().await;
1848        });
1849
1850        assert_eq!(count, 1);
1851    }
1852
1853    #[cfg(feature = "sim")]
1854    #[test]
1855    fn sim_top_level_singleton_join_count() {
1856        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1857        // exploration
1858
1859        let mut flow = FlowBuilder::new();
1860        let node = flow.process::<()>();
1861
1862        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1863        let tick = node.tick();
1864        let batch = source_iter
1865            .batch(&tick, nondet!(/** test */))
1866            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1867        let out_recv = batch.all_ticks().sim_output();
1868
1869        let instance_count = flow.sim().exhaustive(async || {
1870            let _ = out_recv.collect::<Vec<_>>().await;
1871        });
1872
1873        assert_eq!(
1874            instance_count,
1875            16 // 2^4 ways to split up (including a possibly empty first batch)
1876        )
1877    }
1878
1879    #[cfg(feature = "sim")]
1880    #[test]
1881    fn top_level_singleton_into_stream_no_replay() {
1882        let mut flow = FlowBuilder::new();
1883        let node = flow.process::<()>();
1884
1885        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1886        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1887
1888        let out_recv = folded.into_stream().sim_output();
1889
1890        flow.sim().exhaustive(async || {
1891            out_recv.assert_yields_only([10]).await;
1892        });
1893    }
1894
1895    #[cfg(feature = "sim")]
1896    #[test]
1897    fn inside_tick_singleton_zip() {
1898        use crate::live_collections::Stream;
1899        use crate::live_collections::sliced::sliced;
1900
1901        let mut flow = FlowBuilder::new();
1902        let node = flow.process::<()>();
1903
1904        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1905        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1906
1907        let out_recv = sliced! {
1908            let v = use(folded, nondet!(/** test */));
1909            v.clone().zip(v).into_stream()
1910        }
1911        .sim_output();
1912
1913        let count = flow.sim().exhaustive(async || {
1914            let out = out_recv.collect::<Vec<_>>().await;
1915            assert_eq!(out.last(), Some(&(3, 3)));
1916        });
1917
1918        assert_eq!(count, 4);
1919    }
1920
1921    /// Reproducer for simulator hang when using cross_singleton on a top-level
1922    /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1923    /// after the first iteration.
1924    #[cfg(feature = "sim")]
1925    #[test]
1926    fn sim_cross_singleton_top_level_unbounded_hang() {
1927        let mut flow = FlowBuilder::new();
1928        let node = flow.process::<()>();
1929
1930        let (cmd_port, input) = node.sim_input::<String, _, _>();
1931
1932        let top_level_singleton = node.singleton(q!(123));
1933
1934        // cross_singleton on a top-level stream - bug trigger
1935        let crossed = input.cross_singleton(top_level_singleton);
1936
1937        // Output directly
1938        let resp_port = crossed.sim_output();
1939
1940        let count = flow.sim().exhaustive(async || {
1941            cmd_port.send("abc".to_owned());
1942
1943            let responses: Vec<_> = resp_port.collect().await;
1944            assert!(!responses.is_empty());
1945        });
1946
1947        assert_eq!(count, 1);
1948    }
1949
1950    #[cfg(feature = "sim")]
1951    #[test]
1952    fn sim_top_level_singleton_state_count() {
1953        let mut flow = FlowBuilder::new();
1954        let process = flow.process::<()>();
1955
1956        let (cmd_port, input) = process.sim_input();
1957        {
1958            // increases exhaustive inputs from 1 to 2 before we optimized `From`
1959            use super::Singleton;
1960            use crate::live_collections::boundedness::Unbounded;
1961            let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1962        }
1963        let tick = process.tick();
1964        let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1965        let resp_port = batched_unbatched.sim_output();
1966
1967        let count = flow.sim().exhaustive(async || {
1968            cmd_port.send(());
1969            let _responses: Vec<_> = resp_port.collect().await;
1970        });
1971
1972        assert_eq!(count, 1);
1973    }
1974}