Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{ApplyMonotoneStream, Proved};
28
29/// A marker trait indicating which components of a [`Singleton`] may change.
30///
31/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
32/// includes an additional variant [`Monotonic`], which means that the value will only grow.
33pub trait SingletonBound {
34    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
35    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
36
37    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
38    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
39
40    /// Returns the [`SingletonBoundKind`] corresponding to this type.
41    fn bound_kind() -> SingletonBoundKind;
42}
43
44impl SingletonBound for Unbounded {
45    type UnderlyingBound = Unbounded;
46
47    type StreamToMonotone = Monotonic;
48
49    fn bound_kind() -> SingletonBoundKind {
50        SingletonBoundKind::Unbounded
51    }
52}
53
54impl SingletonBound for Bounded {
55    type UnderlyingBound = Bounded;
56
57    type StreamToMonotone = Bounded;
58
59    fn bound_kind() -> SingletonBoundKind {
60        SingletonBoundKind::Bounded
61    }
62}
63
64/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
65pub struct Monotonic;
66
67impl SingletonBound for Monotonic {
68    type UnderlyingBound = Unbounded;
69
70    type StreamToMonotone = Monotonic;
71
72    fn bound_kind() -> SingletonBoundKind {
73        SingletonBoundKind::Monotonic
74    }
75}
76
77#[sealed]
78#[diagnostic::on_unimplemented(
79    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
80    label = "required here",
81    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
82)]
83/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
84pub trait IsMonotonic: SingletonBound {}
85
86#[sealed]
87#[diagnostic::do_not_recommend]
88impl IsMonotonic for Monotonic {}
89
90#[sealed]
91#[diagnostic::do_not_recommend]
92impl<B: IsBounded> IsMonotonic for B {}
93
94/// A single Rust value that can asynchronously change over time.
95///
96/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
97/// [`Unbounded`], the value will asynchronously change over time.
98///
99/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
100/// a single number that will asynchronously change as events are processed. Singletons also appear
101/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
102/// such as getting the length of a batch of requests.
103///
104/// Type Parameters:
105/// - `Type`: the type of the value in this singleton
106/// - `Loc`: the [`Location`] where the singleton is materialized
107/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
108pub struct Singleton<Type, Loc, Bound: SingletonBound> {
109    pub(crate) location: Loc,
110    pub(crate) ir_node: RefCell<HydroNode>,
111    pub(crate) flow_state: FlowState,
112
113    _phantom: PhantomData<(Type, Loc, Bound)>,
114}
115
116impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
117    fn drop(&mut self) {
118        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
119        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
120            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
121                input: Box::new(ir_node),
122                op_metadata: HydroIrOpMetadata::new(),
123            });
124        }
125    }
126}
127
128impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
129where
130    T: Clone,
131    L: Location<'a> + NoTick,
132{
133    fn from(value: Singleton<T, L, Bounded>) -> Self {
134        let tick = value.location().tick();
135        value.clone_into_tick(&tick).latest()
136    }
137}
138
139impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
140where
141    L: Location<'a>,
142{
143    type Location = Tick<L>;
144
145    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
146        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
147            location.clone(),
148            HydroNode::DeferTick {
149                input: Box::new(HydroNode::CycleSource {
150                    cycle_id,
151                    metadata: location.new_node_metadata(Self::collection_kind()),
152                }),
153                metadata: location
154                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
155            },
156        );
157
158        from_previous_tick.unwrap_or(initial)
159    }
160}
161
162impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
163where
164    L: Location<'a>,
165{
166    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
167        assert_eq!(
168            Location::id(&self.location),
169            expected_location,
170            "locations do not match"
171        );
172        self.location
173            .flow_state()
174            .borrow_mut()
175            .push_root(HydroRoot::CycleSink {
176                cycle_id,
177                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
178                op_metadata: HydroIrOpMetadata::new(),
179            });
180    }
181}
182
183impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
184where
185    L: Location<'a>,
186{
187    type Location = Tick<L>;
188
189    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
190        Singleton::new(
191            location.clone(),
192            HydroNode::CycleSource {
193                cycle_id,
194                metadata: location.new_node_metadata(Self::collection_kind()),
195            },
196        )
197    }
198}
199
200impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
201where
202    L: Location<'a>,
203{
204    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
205        assert_eq!(
206            Location::id(&self.location),
207            expected_location,
208            "locations do not match"
209        );
210        self.location
211            .flow_state()
212            .borrow_mut()
213            .push_root(HydroRoot::CycleSink {
214                cycle_id,
215                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
216                op_metadata: HydroIrOpMetadata::new(),
217            });
218    }
219}
220
221impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
222where
223    L: Location<'a> + NoTick,
224{
225    type Location = L;
226
227    fn create_source(cycle_id: CycleId, location: L) -> Self {
228        Singleton::new(
229            location.clone(),
230            HydroNode::CycleSource {
231                cycle_id,
232                metadata: location.new_node_metadata(Self::collection_kind()),
233            },
234        )
235    }
236}
237
238impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
239where
240    L: Location<'a> + NoTick,
241{
242    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
243        assert_eq!(
244            Location::id(&self.location),
245            expected_location,
246            "locations do not match"
247        );
248        self.location
249            .flow_state()
250            .borrow_mut()
251            .push_root(HydroRoot::CycleSink {
252                cycle_id,
253                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
254                op_metadata: HydroIrOpMetadata::new(),
255            });
256    }
257}
258
259impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
260where
261    T: Clone,
262    L: Location<'a>,
263{
264    fn clone(&self) -> Self {
265        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
266            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
267            *self.ir_node.borrow_mut() = HydroNode::Tee {
268                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
269                metadata: self.location.new_node_metadata(Self::collection_kind()),
270            };
271        }
272
273        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
274            Singleton {
275                location: self.location.clone(),
276                flow_state: self.flow_state.clone(),
277                ir_node: HydroNode::Tee {
278                    inner: SharedNode(inner.0.clone()),
279                    metadata: metadata.clone(),
280                }
281                .into(),
282                _phantom: PhantomData,
283            }
284        } else {
285            unreachable!()
286        }
287    }
288}
289
290#[cfg(stageleft_runtime)]
291fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
292    me: Singleton<T, Tick<L>, B>,
293    other: Optional<O, Tick<L>, B::UnderlyingBound>,
294) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
295    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
296    super::optional::zip_inside_tick(me_as_optional, other)
297}
298
299impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
300where
301    L: Location<'a>,
302{
303    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
304        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
305        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
306        let flow_state = location.flow_state().clone();
307        Singleton {
308            location,
309            flow_state,
310            ir_node: RefCell::new(ir_node),
311            _phantom: PhantomData,
312        }
313    }
314
315    pub(crate) fn collection_kind() -> CollectionKind {
316        CollectionKind::Singleton {
317            bound: B::bound_kind(),
318            element_type: stageleft::quote_type::<T>().into(),
319        }
320    }
321
322    /// Returns the [`Location`] where this singleton is being materialized.
323    pub fn location(&self) -> &L {
324        &self.location
325    }
326
327    /// Drops the monotonicity property of the [`Singleton`].
328    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
329        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
330            Singleton::new(
331                self.location.clone(),
332                self.ir_node.replace(HydroNode::Placeholder),
333            )
334        } else {
335            Singleton::new(
336                self.location.clone(),
337                HydroNode::Cast {
338                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
339                    metadata:
340                        self.location.new_node_metadata(
341                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
342                        ),
343                },
344            )
345        }
346    }
347
348    /// Transforms the singleton value by applying a function `f` to it,
349    /// continuously as the input is updated.
350    ///
351    /// # Example
352    /// ```rust
353    /// # #[cfg(feature = "deploy")] {
354    /// # use hydro_lang::prelude::*;
355    /// # use futures::StreamExt;
356    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
357    /// let tick = process.tick();
358    /// let singleton = tick.singleton(q!(5));
359    /// singleton.map(q!(|v| v * 2)).all_ticks()
360    /// # }, |mut stream| async move {
361    /// // 10
362    /// # assert_eq!(stream.next().await.unwrap(), 10);
363    /// # }));
364    /// # }
365    /// ```
366    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B::UnderlyingBound>
367    where
368        F: Fn(T) -> U + 'a,
369    {
370        let f = f.splice_fn1_ctx(&self.location).into();
371        Singleton::new(
372            self.location.clone(),
373            HydroNode::Map {
374                f,
375                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376                metadata: self
377                    .location
378                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
379            },
380        )
381    }
382
383    /// Transforms the singleton value by applying a function `f` to it and then flattening
384    /// the result into a stream, preserving the order of elements.
385    ///
386    /// The function `f` is applied to the singleton value to produce an iterator, and all items
387    /// from that iterator are emitted in the output stream in deterministic order.
388    ///
389    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
390    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
391    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
392    ///
393    /// # Example
394    /// ```rust
395    /// # #[cfg(feature = "deploy")] {
396    /// # use hydro_lang::prelude::*;
397    /// # use futures::StreamExt;
398    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
399    /// let tick = process.tick();
400    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
401    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
402    /// # }, |mut stream| async move {
403    /// // 1, 2, 3
404    /// # for w in vec![1, 2, 3] {
405    /// #     assert_eq!(stream.next().await.unwrap(), w);
406    /// # }
407    /// # }));
408    /// # }
409    /// ```
410    pub fn flat_map_ordered<U, I, F>(
411        self,
412        f: impl IntoQuotedMut<'a, F, L>,
413    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
414    where
415        B: IsBounded,
416        I: IntoIterator<Item = U>,
417        F: Fn(T) -> I + 'a,
418    {
419        self.into_stream().flat_map_ordered(f)
420    }
421
422    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
423    /// for the output type `I` to produce items in any order.
424    ///
425    /// The function `f` is applied to the singleton value to produce an iterator, and all items
426    /// from that iterator are emitted in the output stream in non-deterministic order.
427    ///
428    /// # Example
429    /// ```rust
430    /// # #[cfg(feature = "deploy")] {
431    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
432    /// # use futures::StreamExt;
433    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
434    /// let tick = process.tick();
435    /// let singleton = tick.singleton(q!(
436    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
437    /// ));
438    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
439    /// # }, |mut stream| async move {
440    /// // 1, 2, 3, but in no particular order
441    /// # let mut results = Vec::new();
442    /// # for _ in 0..3 {
443    /// #     results.push(stream.next().await.unwrap());
444    /// # }
445    /// # results.sort();
446    /// # assert_eq!(results, vec![1, 2, 3]);
447    /// # }));
448    /// # }
449    /// ```
450    pub fn flat_map_unordered<U, I, F>(
451        self,
452        f: impl IntoQuotedMut<'a, F, L>,
453    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
454    where
455        B: IsBounded,
456        I: IntoIterator<Item = U>,
457        F: Fn(T) -> I + 'a,
458    {
459        self.into_stream().flat_map_unordered(f)
460    }
461
462    /// Flattens the singleton value into a stream, preserving the order of elements.
463    ///
464    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
465    /// are emitted in the output stream in deterministic order.
466    ///
467    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
468    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
469    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
470    ///
471    /// # Example
472    /// ```rust
473    /// # #[cfg(feature = "deploy")] {
474    /// # use hydro_lang::prelude::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
477    /// let tick = process.tick();
478    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
479    /// singleton.flatten_ordered().all_ticks()
480    /// # }, |mut stream| async move {
481    /// // 1, 2, 3
482    /// # for w in vec![1, 2, 3] {
483    /// #     assert_eq!(stream.next().await.unwrap(), w);
484    /// # }
485    /// # }));
486    /// # }
487    /// ```
488    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
489    where
490        B: IsBounded,
491        T: IntoIterator<Item = U>,
492    {
493        self.flat_map_ordered(q!(|x| x))
494    }
495
496    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
497    /// for the element type `T` to produce items in any order.
498    ///
499    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
500    /// are emitted in the output stream in non-deterministic order.
501    ///
502    /// # Example
503    /// ```rust
504    /// # #[cfg(feature = "deploy")] {
505    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
506    /// # use futures::StreamExt;
507    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
508    /// let tick = process.tick();
509    /// let singleton = tick.singleton(q!(
510    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
511    /// ));
512    /// singleton.flatten_unordered().all_ticks()
513    /// # }, |mut stream| async move {
514    /// // 1, 2, 3, but in no particular order
515    /// # let mut results = Vec::new();
516    /// # for _ in 0..3 {
517    /// #     results.push(stream.next().await.unwrap());
518    /// # }
519    /// # results.sort();
520    /// # assert_eq!(results, vec![1, 2, 3]);
521    /// # }));
522    /// # }
523    /// ```
524    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
525    where
526        B: IsBounded,
527        T: IntoIterator<Item = U>,
528    {
529        self.flat_map_unordered(q!(|x| x))
530    }
531
532    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
533    ///
534    /// If the predicate returns `true`, the output optional contains the same value.
535    /// If the predicate returns `false`, the output optional is empty.
536    ///
537    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
538    /// not modify or take ownership of the value. If you need to modify the value while filtering
539    /// use [`Singleton::filter_map`] instead.
540    ///
541    /// # Example
542    /// ```rust
543    /// # #[cfg(feature = "deploy")] {
544    /// # use hydro_lang::prelude::*;
545    /// # use futures::StreamExt;
546    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
547    /// let tick = process.tick();
548    /// let singleton = tick.singleton(q!(5));
549    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
550    /// # }, |mut stream| async move {
551    /// // 5
552    /// # assert_eq!(stream.next().await.unwrap(), 5);
553    /// # }));
554    /// # }
555    /// ```
556    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
557    where
558        F: Fn(&T) -> bool + 'a,
559    {
560        let f = f.splice_fn1_borrow_ctx(&self.location).into();
561        Optional::new(
562            self.location.clone(),
563            HydroNode::Filter {
564                f,
565                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
566                metadata: self
567                    .location
568                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
569            },
570        )
571    }
572
573    /// An operator that both filters and maps. It yields the value only if the supplied
574    /// closure `f` returns `Some(value)`.
575    ///
576    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
577    /// If the closure returns `None`, the output optional is empty.
578    ///
579    /// # Example
580    /// ```rust
581    /// # #[cfg(feature = "deploy")] {
582    /// # use hydro_lang::prelude::*;
583    /// # use futures::StreamExt;
584    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
585    /// let tick = process.tick();
586    /// let singleton = tick.singleton(q!("42"));
587    /// singleton
588    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
589    ///     .all_ticks()
590    /// # }, |mut stream| async move {
591    /// // 42
592    /// # assert_eq!(stream.next().await.unwrap(), 42);
593    /// # }));
594    /// # }
595    /// ```
596    pub fn filter_map<U, F>(
597        self,
598        f: impl IntoQuotedMut<'a, F, L>,
599    ) -> Optional<U, L, B::UnderlyingBound>
600    where
601        F: Fn(T) -> Option<U> + 'a,
602    {
603        let f = f.splice_fn1_ctx(&self.location).into();
604        Optional::new(
605            self.location.clone(),
606            HydroNode::FilterMap {
607                f,
608                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609                metadata: self
610                    .location
611                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
612            },
613        )
614    }
615
616    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
617    ///
618    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
619    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
620    /// non-null. This is useful for combining several pieces of state together.
621    ///
622    /// # Example
623    /// ```rust
624    /// # #[cfg(feature = "deploy")] {
625    /// # use hydro_lang::prelude::*;
626    /// # use futures::StreamExt;
627    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
628    /// let tick = process.tick();
629    /// let numbers = process
630    ///   .source_iter(q!(vec![123, 456]))
631    ///   .batch(&tick, nondet!(/** test */));
632    /// let count = numbers.clone().count(); // Singleton
633    /// let max = numbers.max(); // Optional
634    /// count.zip(max).all_ticks()
635    /// # }, |mut stream| async move {
636    /// // [(2, 456)]
637    /// # for w in vec![(2, 456)] {
638    /// #     assert_eq!(stream.next().await.unwrap(), w);
639    /// # }
640    /// # }));
641    /// # }
642    /// ```
643    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
644    where
645        Self: ZipResult<'a, O, Location = L>,
646        B: IsBounded,
647    {
648        check_matching_location(&self.location, &Self::other_location(&other));
649
650        if L::is_top_level()
651            && let Some(tick) = self.location.try_tick()
652        {
653            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
654            let out = zip_inside_tick(
655                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
656                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
657                    other_location.clone(),
658                    HydroNode::Cast {
659                        inner: Box::new(Self::other_ir_node(other)),
660                        metadata: other_location.new_node_metadata(Optional::<
661                            <Self as ZipResult<'a, O>>::OtherType,
662                            Tick<L>,
663                            Bounded,
664                        >::collection_kind(
665                        )),
666                    },
667                )
668                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
669            )
670            .latest();
671
672            Self::make(
673                out.location.clone(),
674                out.ir_node.replace(HydroNode::Placeholder),
675            )
676        } else {
677            Self::make(
678                self.location.clone(),
679                HydroNode::CrossSingleton {
680                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
681                    right: Box::new(Self::other_ir_node(other)),
682                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
683                        bound: B::BOUND_KIND,
684                        element_type: stageleft::quote_type::<
685                            <Self as ZipResult<'a, O>>::ElementType,
686                        >()
687                        .into(),
688                    }),
689                },
690            )
691        }
692    }
693
694    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
695    /// boolean signal is `true`, otherwise the output is null.
696    ///
697    /// # Example
698    /// ```rust
699    /// # #[cfg(feature = "deploy")] {
700    /// # use hydro_lang::prelude::*;
701    /// # use futures::StreamExt;
702    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
703    /// let tick = process.tick();
704    /// // ticks are lazy by default, forces the second tick to run
705    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
706    ///
707    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
708    /// let batch_first_tick = process
709    ///   .source_iter(q!(vec![1]))
710    ///   .batch(&tick, nondet!(/** test */));
711    /// let batch_second_tick = process
712    ///   .source_iter(q!(vec![1, 2, 3]))
713    ///   .batch(&tick, nondet!(/** test */))
714    ///   .defer_tick();
715    /// batch_first_tick.chain(batch_second_tick).count()
716    ///   .filter_if(signal)
717    ///   .all_ticks()
718    /// # }, |mut stream| async move {
719    /// // [1]
720    /// # for w in vec![1] {
721    /// #     assert_eq!(stream.next().await.unwrap(), w);
722    /// # }
723    /// # }));
724    /// # }
725    /// ```
726    pub fn filter_if(
727        self,
728        signal: Singleton<bool, L, B>,
729    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
730    where
731        B: IsBounded,
732    {
733        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
734    }
735
736    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
737    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
738    ///
739    /// Useful for conditionally processing, such as only emitting a singleton's value outside
740    /// a tick if some other condition is satisfied.
741    ///
742    /// # Example
743    /// ```rust
744    /// # #[cfg(feature = "deploy")] {
745    /// # use hydro_lang::prelude::*;
746    /// # use futures::StreamExt;
747    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
748    /// let tick = process.tick();
749    /// // ticks are lazy by default, forces the second tick to run
750    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
751    ///
752    /// let batch_first_tick = process
753    ///   .source_iter(q!(vec![1]))
754    ///   .batch(&tick, nondet!(/** test */));
755    /// let batch_second_tick = process
756    ///   .source_iter(q!(vec![1, 2, 3]))
757    ///   .batch(&tick, nondet!(/** test */))
758    ///   .defer_tick(); // appears on the second tick
759    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
760    /// batch_first_tick.chain(batch_second_tick).count()
761    ///   .filter_if_some(some_on_first_tick)
762    ///   .all_ticks()
763    /// # }, |mut stream| async move {
764    /// // [1]
765    /// # for w in vec![1] {
766    /// #     assert_eq!(stream.next().await.unwrap(), w);
767    /// # }
768    /// # }));
769    /// # }
770    /// ```
771    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
772    pub fn filter_if_some<U>(
773        self,
774        signal: Optional<U, L, B>,
775    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
776    where
777        B: IsBounded,
778    {
779        self.filter_if(signal.is_some())
780    }
781
782    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
783    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
784    ///
785    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
786    /// the condition.
787    ///
788    /// # Example
789    /// ```rust
790    /// # #[cfg(feature = "deploy")] {
791    /// # use hydro_lang::prelude::*;
792    /// # use futures::StreamExt;
793    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
794    /// let tick = process.tick();
795    /// // ticks are lazy by default, forces the second tick to run
796    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
797    ///
798    /// let batch_first_tick = process
799    ///   .source_iter(q!(vec![1]))
800    ///   .batch(&tick, nondet!(/** test */));
801    /// let batch_second_tick = process
802    ///   .source_iter(q!(vec![1, 2, 3]))
803    ///   .batch(&tick, nondet!(/** test */))
804    ///   .defer_tick(); // appears on the second tick
805    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
806    /// batch_first_tick.chain(batch_second_tick).count()
807    ///   .filter_if_none(some_on_first_tick)
808    ///   .all_ticks()
809    /// # }, |mut stream| async move {
810    /// // [3]
811    /// # for w in vec![3] {
812    /// #     assert_eq!(stream.next().await.unwrap(), w);
813    /// # }
814    /// # }));
815    /// # }
816    /// ```
817    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
818    pub fn filter_if_none<U>(
819        self,
820        other: Optional<U, L, B>,
821    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
822    where
823        B: IsBounded,
824    {
825        self.filter_if(other.is_none())
826    }
827
828    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
829    ///
830    /// # Example
831    /// ```rust
832    /// # #[cfg(feature = "deploy")] {
833    /// # use hydro_lang::prelude::*;
834    /// # use futures::StreamExt;
835    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
836    /// let tick = process.tick();
837    /// let a = tick.singleton(q!(5));
838    /// let b = tick.singleton(q!(5));
839    /// a.equals(b).all_ticks()
840    /// # }, |mut stream| async move {
841    /// // [true]
842    /// # assert_eq!(stream.next().await.unwrap(), true);
843    /// # }));
844    /// # }
845    /// ```
846    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
847    where
848        T: PartialEq,
849        B: IsBounded,
850    {
851        self.zip(other).map(q!(|(a, b)| a == b))
852    }
853
854    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
855    /// greater than or equal to the provided threshold. The event will have the value of the
856    /// given threshold.
857    ///
858    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
859    /// the threshold would be non-deterministic.
860    ///
861    /// # Example
862    /// ```rust
863    /// # #[cfg(feature = "deploy")] {
864    /// # use hydro_lang::prelude::*;
865    /// # use futures::StreamExt;
866    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
867    /// let a = // singleton 1 ~> 5 ~> 10
868    /// # process.singleton(q!(5));
869    /// let b = process.singleton(q!(4));
870    /// a.threshold_greater_or_equal(b)
871    /// # }, |mut stream| async move {
872    /// // [4]
873    /// # assert_eq!(stream.next().await.unwrap(), 4);
874    /// # }));
875    /// # }
876    /// ```
877    pub fn threshold_greater_or_equal<B2: IsBounded>(
878        self,
879        threshold: Singleton<T, L, B2>,
880    ) -> Stream<T, L, B::UnderlyingBound>
881    where
882        T: Clone + PartialOrd,
883        B: IsMonotonic,
884    {
885        let threshold = threshold.make_bounded();
886        match self.try_make_bounded() {
887            Ok(bounded) => {
888                let uncasted = threshold
889                    .zip(bounded)
890                    .into_stream()
891                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
892
893                Stream::new(
894                    uncasted.location.clone(),
895                    uncasted.ir_node.replace(HydroNode::Placeholder),
896                )
897            }
898            Err(me) => {
899                let uncasted = sliced! {
900                    let me = use(me, nondet!(/** thresholds are deterministic */));
901                    let mut remaining_threshold = use::state(|l| {
902                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
903                        as_option
904                    });
905
906                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
907                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
908                    passed.map(q!(|(t, _)| t))
909                };
910
911                Stream::new(
912                    uncasted.location.clone(),
913                    uncasted.ir_node.replace(HydroNode::Placeholder),
914                )
915            }
916        }
917    }
918
919    /// An operator which allows you to "name" a `HydroNode`.
920    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
921    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
922        {
923            let mut node = self.ir_node.borrow_mut();
924            let metadata = node.metadata_mut();
925            metadata.tag = Some(name.to_owned());
926        }
927        self
928    }
929}
930
931impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
932    type Output = Singleton<bool, L, B::UnderlyingBound>;
933
934    fn not(self) -> Self::Output {
935        self.map(q!(|b| !b))
936    }
937}
938
939impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
940where
941    L: Location<'a>,
942{
943    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
944    /// the inner `Option`.
945    ///
946    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
947    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
948    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
949    ///
950    /// # Example
951    /// ```rust
952    /// # #[cfg(feature = "deploy")] {
953    /// # use hydro_lang::prelude::*;
954    /// # use futures::StreamExt;
955    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
956    /// let tick = process.tick();
957    /// let singleton = tick.singleton(q!(Some(42)));
958    /// singleton.into_optional().all_ticks()
959    /// # }, |mut stream| async move {
960    /// // 42
961    /// # assert_eq!(stream.next().await.unwrap(), 42);
962    /// # }));
963    /// # }
964    /// ```
965    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
966        self.filter_map(q!(|v| v))
967    }
968}
969
970impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
971where
972    L: Location<'a>,
973{
974    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
975    ///
976    /// # Example
977    /// ```rust
978    /// # #[cfg(feature = "deploy")] {
979    /// # use hydro_lang::prelude::*;
980    /// # use futures::StreamExt;
981    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
982    /// let tick = process.tick();
983    /// // ticks are lazy by default, forces the second tick to run
984    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
985    ///
986    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
987    /// let b = tick.singleton(q!(true)); // true, true
988    /// a.and(b).all_ticks()
989    /// # }, |mut stream| async move {
990    /// // [true, false]
991    /// # for w in vec![true, false] {
992    /// #     assert_eq!(stream.next().await.unwrap(), w);
993    /// # }
994    /// # }));
995    /// # }
996    /// ```
997    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
998    where
999        B: IsBounded,
1000    {
1001        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1002    }
1003
1004    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1005    ///
1006    /// # Example
1007    /// ```rust
1008    /// # #[cfg(feature = "deploy")] {
1009    /// # use hydro_lang::prelude::*;
1010    /// # use futures::StreamExt;
1011    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1012    /// let tick = process.tick();
1013    /// // ticks are lazy by default, forces the second tick to run
1014    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1015    ///
1016    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1017    /// let b = tick.singleton(q!(false)); // false, false
1018    /// a.or(b).all_ticks()
1019    /// # }, |mut stream| async move {
1020    /// // [true, false]
1021    /// # for w in vec![true, false] {
1022    /// #     assert_eq!(stream.next().await.unwrap(), w);
1023    /// # }
1024    /// # }));
1025    /// # }
1026    /// ```
1027    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1028    where
1029        B: IsBounded,
1030    {
1031        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1032    }
1033}
1034
1035impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1036where
1037    L: Location<'a> + NoTick,
1038{
1039    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1040    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1041    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1042    /// all snapshots of this singleton into the atomic-associated tick will observe the
1043    /// same value each tick.
1044    ///
1045    /// # Non-Determinism
1046    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1047    /// the output singleton has a non-deterministic value since the snapshot can be at an
1048    /// arbitrary point in time.
1049    pub fn snapshot_atomic(
1050        self,
1051        tick: &Tick<L>,
1052        _nondet: NonDet,
1053    ) -> Singleton<T, Tick<L>, Bounded> {
1054        Singleton::new(
1055            tick.clone(),
1056            HydroNode::Batch {
1057                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1058                metadata: tick
1059                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1060            },
1061        )
1062    }
1063
1064    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1065    /// to the value will be asynchronously propagated.
1066    pub fn end_atomic(self) -> Singleton<T, L, B> {
1067        Singleton::new(
1068            self.location.tick.l.clone(),
1069            HydroNode::EndAtomic {
1070                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1071                metadata: self
1072                    .location
1073                    .tick
1074                    .l
1075                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1076            },
1077        )
1078    }
1079}
1080
1081impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1082where
1083    L: Location<'a>,
1084{
1085    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1086    /// will observe the same version of the value and will be executed synchronously before any
1087    /// outputs are yielded (in [`Optional::end_atomic`]).
1088    ///
1089    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1090    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1091    /// a different version).
1092    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1093        let id = self.location.flow_state().borrow_mut().next_clock_id();
1094        let out_location = Atomic {
1095            tick: Tick {
1096                id,
1097                l: self.location.clone(),
1098            },
1099        };
1100        Singleton::new(
1101            out_location.clone(),
1102            HydroNode::BeginAtomic {
1103                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1104                metadata: out_location
1105                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1106            },
1107        )
1108    }
1109
1110    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1111    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1112    /// relevant data that contributed to the snapshot at tick `t`.
1113    ///
1114    /// # Non-Determinism
1115    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1116    /// the output singleton has a non-deterministic value since the snapshot can be at an
1117    /// arbitrary point in time.
1118    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1119        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1120        Singleton::new(
1121            tick.clone(),
1122            HydroNode::Batch {
1123                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1124                metadata: tick
1125                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1126            },
1127        )
1128    }
1129
1130    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1131    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1132    ///
1133    /// # Non-Determinism
1134    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1135    /// to non-deterministic batching and arrival of inputs, the output stream is
1136    /// non-deterministic.
1137    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1138    where
1139        L: NoTick,
1140    {
1141        sliced! {
1142            let snapshot = use(self, nondet);
1143            snapshot.into_stream()
1144        }
1145        .weaken_retries()
1146    }
1147
1148    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1149    /// value taken at various points in time. Because the input singleton may be
1150    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1151    /// represent the value of the singleton given some prefix of the streams leading up to
1152    /// it.
1153    ///
1154    /// # Non-Determinism
1155    /// The output stream is non-deterministic in which elements are sampled, since this
1156    /// is controlled by a clock.
1157    pub fn sample_every(
1158        self,
1159        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1160        nondet: NonDet,
1161    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1162    where
1163        L: NoTick + NoAtomic,
1164    {
1165        let samples = self.location.source_interval(interval, nondet);
1166        sliced! {
1167            let snapshot = use(self, nondet);
1168            let sample_batch = use(samples, nondet);
1169
1170            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1171        }
1172        .weaken_retries()
1173    }
1174
1175    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1176    /// implies that `B == Bounded`.
1177    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1178    where
1179        B: IsBounded,
1180    {
1181        Singleton::new(
1182            self.location.clone(),
1183            self.ir_node.replace(HydroNode::Placeholder),
1184        )
1185    }
1186
1187    #[expect(clippy::result_large_err, reason = "internal use only")]
1188    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1189        if B::UnderlyingBound::BOUNDED {
1190            Ok(Singleton::new(
1191                self.location.clone(),
1192                self.ir_node.replace(HydroNode::Placeholder),
1193            ))
1194        } else {
1195            Err(self)
1196        }
1197    }
1198
1199    /// Clones this bounded singleton into a tick, returning a singleton that has the
1200    /// same value as the outer singleton. Because the outer singleton is bounded, this
1201    /// is deterministic because there is only a single immutable version.
1202    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1203    where
1204        B: IsBounded,
1205        T: Clone,
1206    {
1207        // TODO(shadaj): avoid printing simulator logs for this snapshot
1208        self.snapshot(
1209            tick,
1210            nondet!(/** bounded top-level singleton so deterministic */),
1211        )
1212    }
1213
1214    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1215    ///
1216    /// # Example
1217    /// ```rust
1218    /// # #[cfg(feature = "deploy")] {
1219    /// # use hydro_lang::prelude::*;
1220    /// # use futures::StreamExt;
1221    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1222    /// let tick = process.tick();
1223    /// let batch_input = process
1224    ///   .source_iter(q!(vec![123, 456]))
1225    ///   .batch(&tick, nondet!(/** test */));
1226    /// batch_input.clone().chain(
1227    ///   batch_input.count().into_stream()
1228    /// ).all_ticks()
1229    /// # }, |mut stream| async move {
1230    /// // [123, 456, 2]
1231    /// # for w in vec![123, 456, 2] {
1232    /// #     assert_eq!(stream.next().await.unwrap(), w);
1233    /// # }
1234    /// # }));
1235    /// # }
1236    /// ```
1237    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1238    where
1239        B: IsBounded,
1240    {
1241        Stream::new(
1242            self.location.clone(),
1243            HydroNode::Cast {
1244                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1245                metadata: self.location.new_node_metadata(Stream::<
1246                    T,
1247                    Tick<L>,
1248                    Bounded,
1249                    TotalOrder,
1250                    ExactlyOnce,
1251                >::collection_kind()),
1252            },
1253        )
1254    }
1255
1256    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1257    /// producing a singleton of the resolved output.
1258    ///
1259    /// This is useful when the singleton contains an async computation that must
1260    /// be awaited before further processing. The future is polled to completion
1261    /// before the output value is emitted.
1262    ///
1263    /// # Example
1264    /// ```rust
1265    /// # #[cfg(feature = "deploy")] {
1266    /// # use hydro_lang::prelude::*;
1267    /// # use futures::StreamExt;
1268    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1269    /// let tick = process.tick();
1270    /// let singleton = tick.singleton(q!(5));
1271    /// singleton
1272    ///     .map(q!(|v| async move { v * 2 }))
1273    ///     .resolve_future_blocking()
1274    ///     .all_ticks()
1275    /// # }, |mut stream| async move {
1276    /// // 10
1277    /// # assert_eq!(stream.next().await.unwrap(), 10);
1278    /// # }));
1279    /// # }
1280    /// ```
1281    pub fn resolve_future_blocking(
1282        self,
1283    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1284    where
1285        T: Future,
1286        B: IsBounded,
1287    {
1288        Singleton::new(
1289            self.location.clone(),
1290            HydroNode::ResolveFuturesBlocking {
1291                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1292                metadata: self
1293                    .location
1294                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1295            },
1296        )
1297    }
1298}
1299
1300impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1301where
1302    L: Location<'a>,
1303{
1304    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1305    /// which will stream the value computed in _each_ tick as a separate stream element.
1306    ///
1307    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1308    /// producing one element in the output for each tick. This is useful for batched computations,
1309    /// where the results from each tick must be combined together.
1310    ///
1311    /// # Example
1312    /// ```rust
1313    /// # #[cfg(feature = "deploy")] {
1314    /// # use hydro_lang::prelude::*;
1315    /// # use futures::StreamExt;
1316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1317    /// let tick = process.tick();
1318    /// # // ticks are lazy by default, forces the second tick to run
1319    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1320    /// # let batch_first_tick = process
1321    /// #   .source_iter(q!(vec![1]))
1322    /// #   .batch(&tick, nondet!(/** test */));
1323    /// # let batch_second_tick = process
1324    /// #   .source_iter(q!(vec![1, 2, 3]))
1325    /// #   .batch(&tick, nondet!(/** test */))
1326    /// #   .defer_tick(); // appears on the second tick
1327    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1328    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1329    ///     .count()
1330    ///     .all_ticks()
1331    /// # }, |mut stream| async move {
1332    /// // [1, 3]
1333    /// # for w in vec![1, 3] {
1334    /// #     assert_eq!(stream.next().await.unwrap(), w);
1335    /// # }
1336    /// # }));
1337    /// # }
1338    /// ```
1339    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1340        self.into_stream().all_ticks()
1341    }
1342
1343    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1344    /// which will stream the value computed in _each_ tick as a separate stream element.
1345    ///
1346    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1347    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1348    /// singleton's [`Tick`] context.
1349    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1350        self.into_stream().all_ticks_atomic()
1351    }
1352
1353    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1354    /// be asynchronously updated with the latest value of the singleton inside the tick.
1355    ///
1356    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1357    /// tick that tracks the inner value. This is useful for getting the value as of the
1358    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1359    ///
1360    /// # Example
1361    /// ```rust
1362    /// # #[cfg(feature = "deploy")] {
1363    /// # use hydro_lang::prelude::*;
1364    /// # use futures::StreamExt;
1365    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1366    /// let tick = process.tick();
1367    /// # // ticks are lazy by default, forces the second tick to run
1368    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1369    /// # let batch_first_tick = process
1370    /// #   .source_iter(q!(vec![1]))
1371    /// #   .batch(&tick, nondet!(/** test */));
1372    /// # let batch_second_tick = process
1373    /// #   .source_iter(q!(vec![1, 2, 3]))
1374    /// #   .batch(&tick, nondet!(/** test */))
1375    /// #   .defer_tick(); // appears on the second tick
1376    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1377    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1378    ///     .count()
1379    ///     .latest()
1380    /// # .sample_eager(nondet!(/** test */))
1381    /// # }, |mut stream| async move {
1382    /// // asynchronously changes from 1 ~> 3
1383    /// # for w in vec![1, 3] {
1384    /// #     assert_eq!(stream.next().await.unwrap(), w);
1385    /// # }
1386    /// # }));
1387    /// # }
1388    /// ```
1389    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1390        Singleton::new(
1391            self.location.outer().clone(),
1392            HydroNode::YieldConcat {
1393                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1394                metadata: self
1395                    .location
1396                    .outer()
1397                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1398            },
1399        )
1400    }
1401
1402    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1403    /// be updated with the latest value of the singleton inside the tick.
1404    ///
1405    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1406    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1407    /// singleton's [`Tick`] context.
1408    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1409        let out_location = Atomic {
1410            tick: self.location.clone(),
1411        };
1412        Singleton::new(
1413            out_location.clone(),
1414            HydroNode::YieldConcat {
1415                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1416                metadata: out_location
1417                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1418            },
1419        )
1420    }
1421}
1422
1423#[doc(hidden)]
1424/// Helper trait that determines the output collection type for [`Singleton::zip`].
1425///
1426/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1427/// [`Singleton`].
1428#[sealed::sealed]
1429pub trait ZipResult<'a, Other> {
1430    /// The output collection type.
1431    type Out;
1432    /// The type of the tupled output value.
1433    type ElementType;
1434    /// The type of the other collection's value.
1435    type OtherType;
1436    /// The location where the tupled result will be materialized.
1437    type Location: Location<'a>;
1438
1439    /// The location of the second input to the `zip`.
1440    fn other_location(other: &Other) -> Self::Location;
1441    /// The IR node of the second input to the `zip`.
1442    fn other_ir_node(other: Other) -> HydroNode;
1443
1444    /// Constructs the output live collection given an IR node containing the zip result.
1445    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1446}
1447
1448#[sealed::sealed]
1449impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1450where
1451    L: Location<'a>,
1452{
1453    type Out = Singleton<(T, U), L, B>;
1454    type ElementType = (T, U);
1455    type OtherType = U;
1456    type Location = L;
1457
1458    fn other_location(other: &Singleton<U, L, B>) -> L {
1459        other.location.clone()
1460    }
1461
1462    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1463        other.ir_node.replace(HydroNode::Placeholder)
1464    }
1465
1466    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1467        Singleton::new(
1468            location.clone(),
1469            HydroNode::Cast {
1470                inner: Box::new(ir_node),
1471                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1472            },
1473        )
1474    }
1475}
1476
1477#[sealed::sealed]
1478impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1479    for Singleton<T, L, B>
1480where
1481    L: Location<'a>,
1482{
1483    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1484    type ElementType = (T, U);
1485    type OtherType = U;
1486    type Location = L;
1487
1488    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1489        other.location.clone()
1490    }
1491
1492    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1493        other.ir_node.replace(HydroNode::Placeholder)
1494    }
1495
1496    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1497        Optional::new(location, ir_node)
1498    }
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503    #[cfg(feature = "deploy")]
1504    use futures::{SinkExt, StreamExt};
1505    #[cfg(feature = "deploy")]
1506    use hydro_deploy::Deployment;
1507    #[cfg(any(feature = "deploy", feature = "sim"))]
1508    use stageleft::q;
1509
1510    #[cfg(any(feature = "deploy", feature = "sim"))]
1511    use crate::compile::builder::FlowBuilder;
1512    #[cfg(feature = "deploy")]
1513    use crate::live_collections::stream::ExactlyOnce;
1514    #[cfg(any(feature = "deploy", feature = "sim"))]
1515    use crate::location::Location;
1516    #[cfg(any(feature = "deploy", feature = "sim"))]
1517    use crate::nondet::nondet;
1518
1519    #[cfg(feature = "deploy")]
1520    #[tokio::test]
1521    async fn tick_cycle_cardinality() {
1522        let mut deployment = Deployment::new();
1523
1524        let mut flow = FlowBuilder::new();
1525        let node = flow.process::<()>();
1526        let external = flow.external::<()>();
1527
1528        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1529
1530        let node_tick = node.tick();
1531        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1532        let counts = singleton
1533            .clone()
1534            .into_stream()
1535            .count()
1536            .filter_if(
1537                input
1538                    .batch(&node_tick, nondet!(/** testing */))
1539                    .first()
1540                    .is_some(),
1541            )
1542            .all_ticks()
1543            .send_bincode_external(&external);
1544        complete_cycle.complete_next_tick(singleton);
1545
1546        let nodes = flow
1547            .with_process(&node, deployment.Localhost())
1548            .with_external(&external, deployment.Localhost())
1549            .deploy(&mut deployment);
1550
1551        deployment.deploy().await.unwrap();
1552
1553        let mut tick_trigger = nodes.connect(input_send).await;
1554        let mut external_out = nodes.connect(counts).await;
1555
1556        deployment.start().await.unwrap();
1557
1558        tick_trigger.send(()).await.unwrap();
1559
1560        assert_eq!(external_out.next().await.unwrap(), 1);
1561
1562        tick_trigger.send(()).await.unwrap();
1563
1564        assert_eq!(external_out.next().await.unwrap(), 1);
1565    }
1566
1567    #[cfg(feature = "sim")]
1568    #[test]
1569    #[should_panic]
1570    fn sim_fold_intermediate_states() {
1571        let mut flow = FlowBuilder::new();
1572        let node = flow.process::<()>();
1573
1574        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1575        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1576
1577        let tick = node.tick();
1578        let batch = folded.snapshot(&tick, nondet!(/** test */));
1579        let out_recv = batch.all_ticks().sim_output();
1580
1581        flow.sim().exhaustive(async || {
1582            assert_eq!(out_recv.next().await.unwrap(), 10);
1583        });
1584    }
1585
1586    #[cfg(feature = "sim")]
1587    #[test]
1588    fn sim_fold_intermediate_state_count() {
1589        let mut flow = FlowBuilder::new();
1590        let node = flow.process::<()>();
1591
1592        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1593        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1594
1595        let tick = node.tick();
1596        let batch = folded.snapshot(&tick, nondet!(/** test */));
1597        let out_recv = batch.all_ticks().sim_output();
1598
1599        let instance_count = flow.sim().exhaustive(async || {
1600            let out = out_recv.collect::<Vec<_>>().await;
1601            assert_eq!(out.last(), Some(&10));
1602        });
1603
1604        assert_eq!(
1605            instance_count,
1606            16 // 2^4 possible subsets of intermediates (including initial state)
1607        )
1608    }
1609
1610    #[cfg(feature = "sim")]
1611    #[test]
1612    fn sim_fold_no_repeat_initial() {
1613        // check that we don't repeat the initial state of the fold in autonomous decisions
1614
1615        let mut flow = FlowBuilder::new();
1616        let node = flow.process::<()>();
1617
1618        let (in_port, input) = node.sim_input();
1619        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1620
1621        let tick = node.tick();
1622        let batch = folded.snapshot(&tick, nondet!(/** test */));
1623        let out_recv = batch.all_ticks().sim_output();
1624
1625        flow.sim().exhaustive(async || {
1626            assert_eq!(out_recv.next().await.unwrap(), 0);
1627
1628            in_port.send(123);
1629
1630            assert_eq!(out_recv.next().await.unwrap(), 123);
1631        });
1632    }
1633
1634    #[cfg(feature = "sim")]
1635    #[test]
1636    #[should_panic]
1637    fn sim_fold_repeats_snapshots() {
1638        // when the tick is driven by a snapshot AND something else, the snapshot can
1639        // "stutter" and repeat the same state multiple times
1640
1641        let mut flow = FlowBuilder::new();
1642        let node = flow.process::<()>();
1643
1644        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1645        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1646
1647        let tick = node.tick();
1648        let batch = source
1649            .batch(&tick, nondet!(/** test */))
1650            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1651        let out_recv = batch.all_ticks().sim_output();
1652
1653        flow.sim().exhaustive(async || {
1654            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1655            {
1656                panic!("repeated snapshot");
1657            }
1658        });
1659    }
1660
1661    #[cfg(feature = "sim")]
1662    #[test]
1663    fn sim_fold_repeats_snapshots_count() {
1664        // check the number of instances
1665        let mut flow = FlowBuilder::new();
1666        let node = flow.process::<()>();
1667
1668        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1669        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1670
1671        let tick = node.tick();
1672        let batch = source
1673            .batch(&tick, nondet!(/** test */))
1674            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1675        let out_recv = batch.all_ticks().sim_output();
1676
1677        let count = flow.sim().exhaustive(async || {
1678            let _ = out_recv.collect::<Vec<_>>().await;
1679        });
1680
1681        assert_eq!(count, 52);
1682        // don't have a combinatorial explanation for this number yet, but checked via logs
1683    }
1684
1685    #[cfg(feature = "sim")]
1686    #[test]
1687    fn sim_top_level_singleton_exhaustive() {
1688        // ensures that top-level singletons have only one snapshot
1689        let mut flow = FlowBuilder::new();
1690        let node = flow.process::<()>();
1691
1692        let singleton = node.singleton(q!(1));
1693        let tick = node.tick();
1694        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1695        let out_recv = batch.all_ticks().sim_output();
1696
1697        let count = flow.sim().exhaustive(async || {
1698            let _ = out_recv.collect::<Vec<_>>().await;
1699        });
1700
1701        assert_eq!(count, 1);
1702    }
1703
1704    #[cfg(feature = "sim")]
1705    #[test]
1706    fn sim_top_level_singleton_join_count() {
1707        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1708        // exploration
1709
1710        let mut flow = FlowBuilder::new();
1711        let node = flow.process::<()>();
1712
1713        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1714        let tick = node.tick();
1715        let batch = source_iter
1716            .batch(&tick, nondet!(/** test */))
1717            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1718        let out_recv = batch.all_ticks().sim_output();
1719
1720        let instance_count = flow.sim().exhaustive(async || {
1721            let _ = out_recv.collect::<Vec<_>>().await;
1722        });
1723
1724        assert_eq!(
1725            instance_count,
1726            16 // 2^4 ways to split up (including a possibly empty first batch)
1727        )
1728    }
1729
1730    #[cfg(feature = "sim")]
1731    #[test]
1732    fn top_level_singleton_into_stream_no_replay() {
1733        let mut flow = FlowBuilder::new();
1734        let node = flow.process::<()>();
1735
1736        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1737        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1738
1739        let out_recv = folded.into_stream().sim_output();
1740
1741        flow.sim().exhaustive(async || {
1742            out_recv.assert_yields_only([10]).await;
1743        });
1744    }
1745
1746    #[cfg(feature = "sim")]
1747    #[test]
1748    fn inside_tick_singleton_zip() {
1749        use crate::live_collections::Stream;
1750        use crate::live_collections::sliced::sliced;
1751
1752        let mut flow = FlowBuilder::new();
1753        let node = flow.process::<()>();
1754
1755        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1756        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1757
1758        let out_recv = sliced! {
1759            let v = use(folded, nondet!(/** test */));
1760            v.clone().zip(v).into_stream()
1761        }
1762        .sim_output();
1763
1764        let count = flow.sim().exhaustive(async || {
1765            let out = out_recv.collect::<Vec<_>>().await;
1766            assert_eq!(out.last(), Some(&(3, 3)));
1767        });
1768
1769        assert_eq!(count, 4);
1770    }
1771}