hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46 L: Location<'a>,
47{
48 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49 Singleton::new(singleton.location, singleton.ir_node.into_inner())
50 }
51}
52
53impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
60 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
61 location.clone(),
62 HydroNode::DeferTick {
63 input: Box::new(HydroNode::CycleSource {
64 ident,
65 metadata: location.new_node_metadata(Self::collection_kind()),
66 }),
67 metadata: location
68 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
69 },
70 );
71
72 from_previous_tick.unwrap_or(initial)
73 }
74}
75
76impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
77where
78 L: Location<'a>,
79{
80 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
81 assert_eq!(
82 Location::id(&self.location),
83 expected_location,
84 "locations do not match"
85 );
86 self.location
87 .flow_state()
88 .borrow_mut()
89 .push_root(HydroRoot::CycleSink {
90 ident,
91 input: Box::new(self.ir_node.into_inner()),
92 op_metadata: HydroIrOpMetadata::new(),
93 });
94 }
95}
96
97impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
104 Singleton::new(
105 location.clone(),
106 HydroNode::CycleSource {
107 ident,
108 metadata: location.new_node_metadata(Self::collection_kind()),
109 },
110 )
111 }
112}
113
114impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116 L: Location<'a>,
117{
118 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
119 assert_eq!(
120 Location::id(&self.location),
121 expected_location,
122 "locations do not match"
123 );
124 self.location
125 .flow_state()
126 .borrow_mut()
127 .push_root(HydroRoot::CycleSink {
128 ident,
129 input: Box::new(self.ir_node.into_inner()),
130 op_metadata: HydroIrOpMetadata::new(),
131 });
132 }
133}
134
135impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
136where
137 L: Location<'a> + NoTick,
138{
139 type Location = L;
140
141 fn create_source(ident: syn::Ident, location: L) -> Self {
142 Singleton::new(
143 location.clone(),
144 HydroNode::CycleSource {
145 ident,
146 metadata: location.new_node_metadata(Self::collection_kind()),
147 },
148 )
149 }
150}
151
152impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
174where
175 T: Clone,
176 L: Location<'a>,
177{
178 fn clone(&self) -> Self {
179 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
180 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
181 *self.ir_node.borrow_mut() = HydroNode::Tee {
182 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
183 metadata: self.location.new_node_metadata(Self::collection_kind()),
184 };
185 }
186
187 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
188 Singleton {
189 location: self.location.clone(),
190 ir_node: HydroNode::Tee {
191 inner: TeeNode(inner.0.clone()),
192 metadata: metadata.clone(),
193 }
194 .into(),
195 _phantom: PhantomData,
196 }
197 } else {
198 unreachable!()
199 }
200 }
201}
202
203#[cfg(stageleft_runtime)]
204fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
205 me: Singleton<T, Tick<L>, B>,
206 other: O,
207) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
208where
209 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
210{
211 check_matching_location(
212 &me.location,
213 &Singleton::<T, Tick<L>, B>::other_location(&other),
214 );
215
216 Singleton::<T, Tick<L>, B>::make(
217 me.location.clone(),
218 HydroNode::CrossSingleton {
219 left: Box::new(me.ir_node.into_inner()),
220 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
221 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
222 bound: B::BOUND_KIND,
223 element_type: stageleft::quote_type::<
224 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
225 >()
226 .into(),
227 }),
228 },
229 )
230}
231
232impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
233where
234 L: Location<'a>,
235{
236 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
237 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
238 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
239 Singleton {
240 location,
241 ir_node: RefCell::new(ir_node),
242 _phantom: PhantomData,
243 }
244 }
245
246 pub(crate) fn collection_kind() -> CollectionKind {
247 CollectionKind::Singleton {
248 bound: B::BOUND_KIND,
249 element_type: stageleft::quote_type::<T>().into(),
250 }
251 }
252
253 /// Returns the [`Location`] where this singleton is being materialized.
254 pub fn location(&self) -> &L {
255 &self.location
256 }
257
258 /// Transforms the singleton value by applying a function `f` to it,
259 /// continuously as the input is updated.
260 ///
261 /// # Example
262 /// ```rust
263 /// # #[cfg(feature = "deploy")] {
264 /// # use hydro_lang::prelude::*;
265 /// # use futures::StreamExt;
266 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267 /// let tick = process.tick();
268 /// let singleton = tick.singleton(q!(5));
269 /// singleton.map(q!(|v| v * 2)).all_ticks()
270 /// # }, |mut stream| async move {
271 /// // 10
272 /// # assert_eq!(stream.next().await.unwrap(), 10);
273 /// # }));
274 /// # }
275 /// ```
276 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
277 where
278 F: Fn(T) -> U + 'a,
279 {
280 let f = f.splice_fn1_ctx(&self.location).into();
281 Singleton::new(
282 self.location.clone(),
283 HydroNode::Map {
284 f,
285 input: Box::new(self.ir_node.into_inner()),
286 metadata: self
287 .location
288 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
289 },
290 )
291 }
292
293 /// Transforms the singleton value by applying a function `f` to it and then flattening
294 /// the result into a stream, preserving the order of elements.
295 ///
296 /// The function `f` is applied to the singleton value to produce an iterator, and all items
297 /// from that iterator are emitted in the output stream in deterministic order.
298 ///
299 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
300 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
301 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
302 ///
303 /// # Example
304 /// ```rust
305 /// # #[cfg(feature = "deploy")] {
306 /// # use hydro_lang::prelude::*;
307 /// # use futures::StreamExt;
308 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309 /// let tick = process.tick();
310 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
311 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
312 /// # }, |mut stream| async move {
313 /// // 1, 2, 3
314 /// # for w in vec![1, 2, 3] {
315 /// # assert_eq!(stream.next().await.unwrap(), w);
316 /// # }
317 /// # }));
318 /// # }
319 /// ```
320 pub fn flat_map_ordered<U, I, F>(
321 self,
322 f: impl IntoQuotedMut<'a, F, L>,
323 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
324 where
325 I: IntoIterator<Item = U>,
326 F: Fn(T) -> I + 'a,
327 {
328 let f = f.splice_fn1_ctx(&self.location).into();
329 Stream::new(
330 self.location.clone(),
331 HydroNode::FlatMap {
332 f,
333 input: Box::new(self.ir_node.into_inner()),
334 metadata: self.location.new_node_metadata(
335 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
336 ),
337 },
338 )
339 }
340
341 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
342 /// for the output type `I` to produce items in any order.
343 ///
344 /// The function `f` is applied to the singleton value to produce an iterator, and all items
345 /// from that iterator are emitted in the output stream in non-deterministic order.
346 ///
347 /// # Example
348 /// ```rust
349 /// # #[cfg(feature = "deploy")] {
350 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
351 /// # use futures::StreamExt;
352 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
353 /// let tick = process.tick();
354 /// let singleton = tick.singleton(q!(
355 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
356 /// ));
357 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
358 /// # }, |mut stream| async move {
359 /// // 1, 2, 3, but in no particular order
360 /// # let mut results = Vec::new();
361 /// # for _ in 0..3 {
362 /// # results.push(stream.next().await.unwrap());
363 /// # }
364 /// # results.sort();
365 /// # assert_eq!(results, vec![1, 2, 3]);
366 /// # }));
367 /// # }
368 /// ```
369 pub fn flat_map_unordered<U, I, F>(
370 self,
371 f: impl IntoQuotedMut<'a, F, L>,
372 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
373 where
374 I: IntoIterator<Item = U>,
375 F: Fn(T) -> I + 'a,
376 {
377 let f = f.splice_fn1_ctx(&self.location).into();
378 Stream::new(
379 self.location.clone(),
380 HydroNode::FlatMap {
381 f,
382 input: Box::new(self.ir_node.into_inner()),
383 metadata: self
384 .location
385 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
386 },
387 )
388 }
389
390 /// Flattens the singleton value into a stream, preserving the order of elements.
391 ///
392 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
393 /// are emitted in the output stream in deterministic order.
394 ///
395 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
396 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
397 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
398 ///
399 /// # Example
400 /// ```rust
401 /// # #[cfg(feature = "deploy")] {
402 /// # use hydro_lang::prelude::*;
403 /// # use futures::StreamExt;
404 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405 /// let tick = process.tick();
406 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
407 /// singleton.flatten_ordered().all_ticks()
408 /// # }, |mut stream| async move {
409 /// // 1, 2, 3
410 /// # for w in vec![1, 2, 3] {
411 /// # assert_eq!(stream.next().await.unwrap(), w);
412 /// # }
413 /// # }));
414 /// # }
415 /// ```
416 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
417 where
418 T: IntoIterator<Item = U>,
419 {
420 self.flat_map_ordered(q!(|x| x))
421 }
422
423 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
424 /// for the element type `T` to produce items in any order.
425 ///
426 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
427 /// are emitted in the output stream in non-deterministic order.
428 ///
429 /// # Example
430 /// ```rust
431 /// # #[cfg(feature = "deploy")] {
432 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
435 /// let tick = process.tick();
436 /// let singleton = tick.singleton(q!(
437 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
438 /// ));
439 /// singleton.flatten_unordered().all_ticks()
440 /// # }, |mut stream| async move {
441 /// // 1, 2, 3, but in no particular order
442 /// # let mut results = Vec::new();
443 /// # for _ in 0..3 {
444 /// # results.push(stream.next().await.unwrap());
445 /// # }
446 /// # results.sort();
447 /// # assert_eq!(results, vec![1, 2, 3]);
448 /// # }));
449 /// # }
450 /// ```
451 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
452 where
453 T: IntoIterator<Item = U>,
454 {
455 self.flat_map_unordered(q!(|x| x))
456 }
457
458 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
459 ///
460 /// If the predicate returns `true`, the output optional contains the same value.
461 /// If the predicate returns `false`, the output optional is empty.
462 ///
463 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
464 /// not modify or take ownership of the value. If you need to modify the value while filtering
465 /// use [`Singleton::filter_map`] instead.
466 ///
467 /// # Example
468 /// ```rust
469 /// # #[cfg(feature = "deploy")] {
470 /// # use hydro_lang::prelude::*;
471 /// # use futures::StreamExt;
472 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
473 /// let tick = process.tick();
474 /// let singleton = tick.singleton(q!(5));
475 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
476 /// # }, |mut stream| async move {
477 /// // 5
478 /// # assert_eq!(stream.next().await.unwrap(), 5);
479 /// # }));
480 /// # }
481 /// ```
482 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
483 where
484 F: Fn(&T) -> bool + 'a,
485 {
486 let f = f.splice_fn1_borrow_ctx(&self.location).into();
487 Optional::new(
488 self.location.clone(),
489 HydroNode::Filter {
490 f,
491 input: Box::new(self.ir_node.into_inner()),
492 metadata: self
493 .location
494 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
495 },
496 )
497 }
498
499 /// An operator that both filters and maps. It yields the value only if the supplied
500 /// closure `f` returns `Some(value)`.
501 ///
502 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
503 /// If the closure returns `None`, the output optional is empty.
504 ///
505 /// # Example
506 /// ```rust
507 /// # #[cfg(feature = "deploy")] {
508 /// # use hydro_lang::prelude::*;
509 /// # use futures::StreamExt;
510 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
511 /// let tick = process.tick();
512 /// let singleton = tick.singleton(q!("42"));
513 /// singleton
514 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
515 /// .all_ticks()
516 /// # }, |mut stream| async move {
517 /// // 42
518 /// # assert_eq!(stream.next().await.unwrap(), 42);
519 /// # }));
520 /// # }
521 /// ```
522 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
523 where
524 F: Fn(T) -> Option<U> + 'a,
525 {
526 let f = f.splice_fn1_ctx(&self.location).into();
527 Optional::new(
528 self.location.clone(),
529 HydroNode::FilterMap {
530 f,
531 input: Box::new(self.ir_node.into_inner()),
532 metadata: self
533 .location
534 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
535 },
536 )
537 }
538
539 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
540 ///
541 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
542 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
543 /// non-null. This is useful for combining several pieces of state together.
544 ///
545 /// # Example
546 /// ```rust
547 /// # #[cfg(feature = "deploy")] {
548 /// # use hydro_lang::prelude::*;
549 /// # use futures::StreamExt;
550 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
551 /// let tick = process.tick();
552 /// let numbers = process
553 /// .source_iter(q!(vec![123, 456]))
554 /// .batch(&tick, nondet!(/** test */));
555 /// let count = numbers.clone().count(); // Singleton
556 /// let max = numbers.max(); // Optional
557 /// count.zip(max).all_ticks()
558 /// # }, |mut stream| async move {
559 /// // [(2, 456)]
560 /// # for w in vec![(2, 456)] {
561 /// # assert_eq!(stream.next().await.unwrap(), w);
562 /// # }
563 /// # }));
564 /// # }
565 /// ```
566 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
567 where
568 Self: ZipResult<'a, O, Location = L>,
569 {
570 check_matching_location(&self.location, &Self::other_location(&other));
571
572 if L::is_top_level()
573 && let Some(tick) = self.location.try_tick()
574 {
575 let out = zip_inside_tick(
576 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
577 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
578 Self::other_location(&other),
579 Self::other_ir_node(other),
580 )
581 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
582 )
583 .latest();
584
585 Self::make(out.location, out.ir_node.into_inner())
586 } else {
587 Self::make(
588 self.location.clone(),
589 HydroNode::CrossSingleton {
590 left: Box::new(self.ir_node.into_inner()),
591 right: Box::new(Self::other_ir_node(other)),
592 metadata: self.location.new_node_metadata(CollectionKind::Optional {
593 bound: B::BOUND_KIND,
594 element_type: stageleft::quote_type::<
595 <Self as ZipResult<'a, O>>::ElementType,
596 >()
597 .into(),
598 }),
599 },
600 )
601 }
602 }
603
604 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
605 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
606 ///
607 /// Useful for conditionally processing, such as only emitting a singleton's value outside
608 /// a tick if some other condition is satisfied.
609 ///
610 /// # Example
611 /// ```rust
612 /// # #[cfg(feature = "deploy")] {
613 /// # use hydro_lang::prelude::*;
614 /// # use futures::StreamExt;
615 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616 /// let tick = process.tick();
617 /// // ticks are lazy by default, forces the second tick to run
618 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
619 ///
620 /// let batch_first_tick = process
621 /// .source_iter(q!(vec![1]))
622 /// .batch(&tick, nondet!(/** test */));
623 /// let batch_second_tick = process
624 /// .source_iter(q!(vec![1, 2, 3]))
625 /// .batch(&tick, nondet!(/** test */))
626 /// .defer_tick(); // appears on the second tick
627 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
628 /// batch_first_tick.chain(batch_second_tick).count()
629 /// .filter_if_some(some_on_first_tick)
630 /// .all_ticks()
631 /// # }, |mut stream| async move {
632 /// // [1]
633 /// # for w in vec![1] {
634 /// # assert_eq!(stream.next().await.unwrap(), w);
635 /// # }
636 /// # }));
637 /// # }
638 /// ```
639 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
640 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
641 .map(q!(|(d, _signal)| d))
642 }
643
644 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
645 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
646 ///
647 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
648 /// the condition.
649 ///
650 /// # Example
651 /// ```rust
652 /// # #[cfg(feature = "deploy")] {
653 /// # use hydro_lang::prelude::*;
654 /// # use futures::StreamExt;
655 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
656 /// let tick = process.tick();
657 /// // ticks are lazy by default, forces the second tick to run
658 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
659 ///
660 /// let batch_first_tick = process
661 /// .source_iter(q!(vec![1]))
662 /// .batch(&tick, nondet!(/** test */));
663 /// let batch_second_tick = process
664 /// .source_iter(q!(vec![1, 2, 3]))
665 /// .batch(&tick, nondet!(/** test */))
666 /// .defer_tick(); // appears on the second tick
667 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
668 /// batch_first_tick.chain(batch_second_tick).count()
669 /// .filter_if_none(some_on_first_tick)
670 /// .all_ticks()
671 /// # }, |mut stream| async move {
672 /// // [3]
673 /// # for w in vec![3] {
674 /// # assert_eq!(stream.next().await.unwrap(), w);
675 /// # }
676 /// # }));
677 /// # }
678 /// ```
679 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
680 self.filter_if_some(
681 other
682 .map(q!(|_| ()))
683 .into_singleton()
684 .filter(q!(|o| o.is_none())),
685 )
686 }
687
688 /// An operator which allows you to "name" a `HydroNode`.
689 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
690 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
691 {
692 let mut node = self.ir_node.borrow_mut();
693 let metadata = node.metadata_mut();
694 metadata.tag = Some(name.to_string());
695 }
696 self
697 }
698}
699
700impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
701where
702 L: Location<'a> + NoTick,
703{
704 /// Returns a singleton value corresponding to the latest snapshot of the singleton
705 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
706 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
707 /// all snapshots of this singleton into the atomic-associated tick will observe the
708 /// same value each tick.
709 ///
710 /// # Non-Determinism
711 /// Because this picks a snapshot of a singleton whose value is continuously changing,
712 /// the output singleton has a non-deterministic value since the snapshot can be at an
713 /// arbitrary point in time.
714 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
715 Singleton::new(
716 self.location.clone().tick,
717 HydroNode::Batch {
718 inner: Box::new(self.ir_node.into_inner()),
719 metadata: self
720 .location
721 .tick
722 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
723 },
724 )
725 }
726
727 /// Returns this singleton back into a top-level, asynchronous execution context where updates
728 /// to the value will be asynchronously propagated.
729 pub fn end_atomic(self) -> Singleton<T, L, B> {
730 Singleton::new(
731 self.location.tick.l.clone(),
732 HydroNode::EndAtomic {
733 inner: Box::new(self.ir_node.into_inner()),
734 metadata: self
735 .location
736 .tick
737 .l
738 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
739 },
740 )
741 }
742}
743
744impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
745where
746 L: Location<'a>,
747{
748 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
749 /// will observe the same version of the value and will be executed synchronously before any
750 /// outputs are yielded (in [`Optional::end_atomic`]).
751 ///
752 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
753 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
754 /// a different version).
755 ///
756 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
757 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
758 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
759 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
760 let out_location = Atomic { tick: tick.clone() };
761 Singleton::new(
762 out_location.clone(),
763 HydroNode::BeginAtomic {
764 inner: Box::new(self.ir_node.into_inner()),
765 metadata: out_location
766 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
767 },
768 )
769 }
770
771 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
772 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
773 /// relevant data that contributed to the snapshot at tick `t`.
774 ///
775 /// # Non-Determinism
776 /// Because this picks a snapshot of a singleton whose value is continuously changing,
777 /// the output singleton has a non-deterministic value since the snapshot can be at an
778 /// arbitrary point in time.
779 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
780 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
781 Singleton::new(
782 tick.clone(),
783 HydroNode::Batch {
784 inner: Box::new(self.ir_node.into_inner()),
785 metadata: tick
786 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
787 },
788 )
789 }
790
791 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
792 /// with order corresponding to increasing prefixes of data contributing to the singleton.
793 ///
794 /// # Non-Determinism
795 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
796 /// to non-deterministic batching and arrival of inputs, the output stream is
797 /// non-deterministic.
798 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
799 where
800 L: NoTick,
801 {
802 let tick = self.location.tick();
803 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
804 }
805
806 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
807 /// value taken at various points in time. Because the input singleton may be
808 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
809 /// represent the value of the singleton given some prefix of the streams leading up to
810 /// it.
811 ///
812 /// # Non-Determinism
813 /// The output stream is non-deterministic in which elements are sampled, since this
814 /// is controlled by a clock.
815 pub fn sample_every(
816 self,
817 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
818 nondet: NonDet,
819 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
820 where
821 L: NoTick + NoAtomic,
822 {
823 let samples = self.location.source_interval(interval, nondet);
824 let tick = self.location.tick();
825
826 self.snapshot(&tick, nondet)
827 .filter_if_some(samples.batch(&tick, nondet).first())
828 .all_ticks()
829 .weakest_retries()
830 }
831}
832
833impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
834where
835 L: Location<'a>,
836{
837 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
838 /// which will stream the value computed in _each_ tick as a separate stream element.
839 ///
840 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
841 /// producing one element in the output for each tick. This is useful for batched computations,
842 /// where the results from each tick must be combined together.
843 ///
844 /// # Example
845 /// ```rust
846 /// # #[cfg(feature = "deploy")] {
847 /// # use hydro_lang::prelude::*;
848 /// # use futures::StreamExt;
849 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
850 /// let tick = process.tick();
851 /// # // ticks are lazy by default, forces the second tick to run
852 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
853 /// # let batch_first_tick = process
854 /// # .source_iter(q!(vec![1]))
855 /// # .batch(&tick, nondet!(/** test */));
856 /// # let batch_second_tick = process
857 /// # .source_iter(q!(vec![1, 2, 3]))
858 /// # .batch(&tick, nondet!(/** test */))
859 /// # .defer_tick(); // appears on the second tick
860 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
861 /// input_batch // first tick: [1], second tick: [1, 2, 3]
862 /// .count()
863 /// .all_ticks()
864 /// # }, |mut stream| async move {
865 /// // [1, 3]
866 /// # for w in vec![1, 3] {
867 /// # assert_eq!(stream.next().await.unwrap(), w);
868 /// # }
869 /// # }));
870 /// # }
871 /// ```
872 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
873 self.into_stream().all_ticks()
874 }
875
876 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
877 /// which will stream the value computed in _each_ tick as a separate stream element.
878 ///
879 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
880 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
881 /// singleton's [`Tick`] context.
882 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
883 self.into_stream().all_ticks_atomic()
884 }
885
886 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
887 /// be asynchronously updated with the latest value of the singleton inside the tick.
888 ///
889 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
890 /// tick that tracks the inner value. This is useful for getting the value as of the
891 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
892 ///
893 /// # Example
894 /// ```rust
895 /// # #[cfg(feature = "deploy")] {
896 /// # use hydro_lang::prelude::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// # // ticks are lazy by default, forces the second tick to run
901 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902 /// # let batch_first_tick = process
903 /// # .source_iter(q!(vec![1]))
904 /// # .batch(&tick, nondet!(/** test */));
905 /// # let batch_second_tick = process
906 /// # .source_iter(q!(vec![1, 2, 3]))
907 /// # .batch(&tick, nondet!(/** test */))
908 /// # .defer_tick(); // appears on the second tick
909 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
910 /// input_batch // first tick: [1], second tick: [1, 2, 3]
911 /// .count()
912 /// .latest()
913 /// # .sample_eager(nondet!(/** test */))
914 /// # }, |mut stream| async move {
915 /// // asynchronously changes from 1 ~> 3
916 /// # for w in vec![1, 3] {
917 /// # assert_eq!(stream.next().await.unwrap(), w);
918 /// # }
919 /// # }));
920 /// # }
921 /// ```
922 pub fn latest(self) -> Singleton<T, L, Unbounded> {
923 Singleton::new(
924 self.location.outer().clone(),
925 HydroNode::YieldConcat {
926 inner: Box::new(self.ir_node.into_inner()),
927 metadata: self
928 .location
929 .outer()
930 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
931 },
932 )
933 }
934
935 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
936 /// be updated with the latest value of the singleton inside the tick.
937 ///
938 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
939 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
940 /// singleton's [`Tick`] context.
941 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
942 let out_location = Atomic {
943 tick: self.location.clone(),
944 };
945 Singleton::new(
946 out_location.clone(),
947 HydroNode::YieldConcat {
948 inner: Box::new(self.ir_node.into_inner()),
949 metadata: out_location
950 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
951 },
952 )
953 }
954
955 #[deprecated(note = "use .into_stream().persist()")]
956 #[expect(missing_docs, reason = "deprecated")]
957 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
958 where
959 T: Clone,
960 {
961 self.into_stream().persist()
962 }
963
964 /// Converts this singleton into a [`Stream`] containing a single element, the value.
965 ///
966 /// # Example
967 /// ```rust
968 /// # #[cfg(feature = "deploy")] {
969 /// # use hydro_lang::prelude::*;
970 /// # use futures::StreamExt;
971 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
972 /// let tick = process.tick();
973 /// let batch_input = process
974 /// .source_iter(q!(vec![123, 456]))
975 /// .batch(&tick, nondet!(/** test */));
976 /// batch_input.clone().chain(
977 /// batch_input.count().into_stream()
978 /// ).all_ticks()
979 /// # }, |mut stream| async move {
980 /// // [123, 456, 2]
981 /// # for w in vec![123, 456, 2] {
982 /// # assert_eq!(stream.next().await.unwrap(), w);
983 /// # }
984 /// # }));
985 /// # }
986 /// ```
987 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
988 Stream::new(
989 self.location.clone(),
990 HydroNode::Cast {
991 inner: Box::new(self.ir_node.into_inner()),
992 metadata: self.location.new_node_metadata(Stream::<
993 T,
994 Tick<L>,
995 Bounded,
996 TotalOrder,
997 ExactlyOnce,
998 >::collection_kind()),
999 },
1000 )
1001 }
1002}
1003
1004#[doc(hidden)]
1005/// Helper trait that determines the output collection type for [`Singleton::zip`].
1006///
1007/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1008/// [`Singleton`].
1009#[sealed::sealed]
1010pub trait ZipResult<'a, Other> {
1011 /// The output collection type.
1012 type Out;
1013 /// The type of the tupled output value.
1014 type ElementType;
1015 /// The type of the other collection's value.
1016 type OtherType;
1017 /// The location where the tupled result will be materialized.
1018 type Location: Location<'a>;
1019
1020 /// The location of the second input to the `zip`.
1021 fn other_location(other: &Other) -> Self::Location;
1022 /// The IR node of the second input to the `zip`.
1023 fn other_ir_node(other: Other) -> HydroNode;
1024
1025 /// Constructs the output live collection given an IR node containing the zip result.
1026 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1027}
1028
1029#[sealed::sealed]
1030impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1031where
1032 L: Location<'a>,
1033{
1034 type Out = Singleton<(T, U), L, B>;
1035 type ElementType = (T, U);
1036 type OtherType = U;
1037 type Location = L;
1038
1039 fn other_location(other: &Singleton<U, L, B>) -> L {
1040 other.location.clone()
1041 }
1042
1043 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1044 other.ir_node.into_inner()
1045 }
1046
1047 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1048 Singleton::new(
1049 location.clone(),
1050 HydroNode::Cast {
1051 inner: Box::new(ir_node),
1052 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1053 },
1054 )
1055 }
1056}
1057
1058#[sealed::sealed]
1059impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1060where
1061 L: Location<'a>,
1062{
1063 type Out = Optional<(T, U), L, B>;
1064 type ElementType = (T, U);
1065 type OtherType = U;
1066 type Location = L;
1067
1068 fn other_location(other: &Optional<U, L, B>) -> L {
1069 other.location.clone()
1070 }
1071
1072 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1073 other.ir_node.into_inner()
1074 }
1075
1076 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1077 Optional::new(location, ir_node)
1078 }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 #[cfg(feature = "deploy")]
1084 use futures::{SinkExt, StreamExt};
1085 #[cfg(feature = "deploy")]
1086 use hydro_deploy::Deployment;
1087 #[cfg(any(feature = "deploy", feature = "sim"))]
1088 use stageleft::q;
1089
1090 #[cfg(any(feature = "deploy", feature = "sim"))]
1091 use crate::compile::builder::FlowBuilder;
1092 #[cfg(feature = "deploy")]
1093 use crate::live_collections::stream::ExactlyOnce;
1094 #[cfg(any(feature = "deploy", feature = "sim"))]
1095 use crate::location::Location;
1096 #[cfg(any(feature = "deploy", feature = "sim"))]
1097 use crate::nondet::nondet;
1098
1099 #[cfg(feature = "deploy")]
1100 #[tokio::test]
1101 async fn tick_cycle_cardinality() {
1102 let mut deployment = Deployment::new();
1103
1104 let flow = FlowBuilder::new();
1105 let node = flow.process::<()>();
1106 let external = flow.external::<()>();
1107
1108 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1109
1110 let node_tick = node.tick();
1111 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1112 let counts = singleton
1113 .clone()
1114 .into_stream()
1115 .count()
1116 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1117 .all_ticks()
1118 .send_bincode_external(&external);
1119 complete_cycle.complete_next_tick(singleton);
1120
1121 let nodes = flow
1122 .with_process(&node, deployment.Localhost())
1123 .with_external(&external, deployment.Localhost())
1124 .deploy(&mut deployment);
1125
1126 deployment.deploy().await.unwrap();
1127
1128 let mut tick_trigger = nodes.connect(input_send).await;
1129 let mut external_out = nodes.connect(counts).await;
1130
1131 deployment.start().await.unwrap();
1132
1133 tick_trigger.send(()).await.unwrap();
1134
1135 assert_eq!(external_out.next().await.unwrap(), 1);
1136
1137 tick_trigger.send(()).await.unwrap();
1138
1139 assert_eq!(external_out.next().await.unwrap(), 1);
1140 }
1141
1142 #[cfg(feature = "sim")]
1143 #[test]
1144 #[should_panic]
1145 fn sim_fold_intermediate_states() {
1146 let flow = FlowBuilder::new();
1147 let node = flow.process::<()>();
1148
1149 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1150 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1151
1152 let tick = node.tick();
1153 let batch = folded.snapshot(&tick, nondet!(/** test */));
1154 let out_recv = batch.all_ticks().sim_output();
1155
1156 flow.sim().exhaustive(async || {
1157 assert_eq!(out_recv.next().await.unwrap(), 10);
1158 });
1159 }
1160
1161 #[cfg(feature = "sim")]
1162 #[test]
1163 fn sim_fold_intermediate_state_count() {
1164 let flow = FlowBuilder::new();
1165 let node = flow.process::<()>();
1166
1167 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1168 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1169
1170 let tick = node.tick();
1171 let batch = folded.snapshot(&tick, nondet!(/** test */));
1172 let out_recv = batch.all_ticks().sim_output();
1173
1174 let instance_count = flow.sim().exhaustive(async || {
1175 let out = out_recv.collect::<Vec<_>>().await;
1176 assert_eq!(out.last(), Some(&10));
1177 });
1178
1179 assert_eq!(
1180 instance_count,
1181 16 // 2^4 possible subsets of intermediates (including initial state)
1182 )
1183 }
1184
1185 #[cfg(feature = "sim")]
1186 #[test]
1187 fn sim_fold_no_repeat_initial() {
1188 // check that we don't repeat the initial state of the fold in autonomous decisions
1189
1190 let flow = FlowBuilder::new();
1191 let node = flow.process::<()>();
1192
1193 let (in_port, input) = node.sim_input();
1194 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1195
1196 let tick = node.tick();
1197 let batch = folded.snapshot(&tick, nondet!(/** test */));
1198 let out_recv = batch.all_ticks().sim_output();
1199
1200 flow.sim().exhaustive(async || {
1201 assert_eq!(out_recv.next().await.unwrap(), 0);
1202
1203 in_port.send(123);
1204
1205 assert_eq!(out_recv.next().await.unwrap(), 123);
1206 });
1207 }
1208
1209 #[cfg(feature = "sim")]
1210 #[test]
1211 #[should_panic]
1212 fn sim_fold_repeats_snapshots() {
1213 // when the tick is driven by a snapshot AND something else, the snapshot can
1214 // "stutter" and repeat the same state multiple times
1215
1216 let flow = FlowBuilder::new();
1217 let node = flow.process::<()>();
1218
1219 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1220 let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1221
1222 let tick = node.tick();
1223 let batch = source_iter
1224 .batch(&tick, nondet!(/** test */))
1225 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1226 let out_recv = batch.all_ticks().sim_output();
1227
1228 flow.sim().exhaustive(async || {
1229 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1230 {
1231 panic!("repeated snapshot");
1232 }
1233 });
1234 }
1235
1236 #[cfg(feature = "sim")]
1237 #[test]
1238 fn sim_fold_repeats_snapshots_count() {
1239 // check the number of instances
1240 let flow = FlowBuilder::new();
1241 let node = flow.process::<()>();
1242
1243 let source_iter = node.source_iter(q!(vec![1, 2]));
1244 let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1245
1246 let tick = node.tick();
1247 let batch = source_iter
1248 .batch(&tick, nondet!(/** test */))
1249 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1250 let out_recv = batch.all_ticks().sim_output();
1251
1252 let count = flow.sim().exhaustive(async || {
1253 let _ = out_recv.collect::<Vec<_>>().await;
1254 });
1255
1256 assert_eq!(count, 52);
1257 // don't have a combinatorial explanation for this number yet, but checked via logs
1258 }
1259
1260 #[cfg(feature = "sim")]
1261 #[test]
1262 fn sim_top_level_singleton_exhaustive() {
1263 // ensures that top-level singletons have only one snapshot
1264 let flow = FlowBuilder::new();
1265 let node = flow.process::<()>();
1266
1267 let singleton = node.singleton(q!(1));
1268 let tick = node.tick();
1269 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1270 let out_recv = batch.all_ticks().sim_output();
1271
1272 let count = flow.sim().exhaustive(async || {
1273 let _ = out_recv.collect::<Vec<_>>().await;
1274 });
1275
1276 assert_eq!(count, 1);
1277 }
1278
1279 #[cfg(feature = "sim")]
1280 #[test]
1281 fn sim_top_level_singleton_join_count() {
1282 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1283 // exploration
1284
1285 let flow = FlowBuilder::new();
1286 let node = flow.process::<()>();
1287
1288 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1289 let tick = node.tick();
1290 let batch = source_iter
1291 .batch(&tick, nondet!(/** test */))
1292 .cross_singleton(
1293 node.singleton(q!(123))
1294 .snapshot(&tick, nondet!(/** test */)),
1295 );
1296 let out_recv = batch.all_ticks().sim_output();
1297
1298 let instance_count = flow.sim().exhaustive(async || {
1299 let _ = out_recv.collect::<Vec<_>>().await;
1300 });
1301
1302 assert_eq!(
1303 instance_count,
1304 16 // 2^4 ways to split up (including a possibly empty first batch)
1305 )
1306 }
1307}