hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42
43 _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
47where
48 T: Clone,
49 L: Location<'a> + NoTick,
50{
51 fn from(value: Singleton<T, L, Bounded>) -> Self {
52 let tick = value.location().tick();
53 value.clone_into_tick(&tick).latest()
54 }
55}
56
57impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
58where
59 L: Location<'a>,
60{
61 type Location = Tick<L>;
62
63 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
64 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
65 location.clone(),
66 HydroNode::DeferTick {
67 input: Box::new(HydroNode::CycleSource {
68 cycle_id,
69 metadata: location.new_node_metadata(Self::collection_kind()),
70 }),
71 metadata: location
72 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
73 },
74 );
75
76 from_previous_tick.unwrap_or(initial)
77 }
78}
79
80impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
81where
82 L: Location<'a>,
83{
84 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
85 assert_eq!(
86 Location::id(&self.location),
87 expected_location,
88 "locations do not match"
89 );
90 self.location
91 .flow_state()
92 .borrow_mut()
93 .push_root(HydroRoot::CycleSink {
94 cycle_id,
95 input: Box::new(self.ir_node.into_inner()),
96 op_metadata: HydroIrOpMetadata::new(),
97 });
98 }
99}
100
101impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
102where
103 L: Location<'a>,
104{
105 type Location = Tick<L>;
106
107 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
108 Singleton::new(
109 location.clone(),
110 HydroNode::CycleSource {
111 cycle_id,
112 metadata: location.new_node_metadata(Self::collection_kind()),
113 },
114 )
115 }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
119where
120 L: Location<'a>,
121{
122 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123 assert_eq!(
124 Location::id(&self.location),
125 expected_location,
126 "locations do not match"
127 );
128 self.location
129 .flow_state()
130 .borrow_mut()
131 .push_root(HydroRoot::CycleSink {
132 cycle_id,
133 input: Box::new(self.ir_node.into_inner()),
134 op_metadata: HydroIrOpMetadata::new(),
135 });
136 }
137}
138
139impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
140where
141 L: Location<'a> + NoTick,
142{
143 type Location = L;
144
145 fn create_source(cycle_id: CycleId, location: L) -> Self {
146 Singleton::new(
147 location.clone(),
148 HydroNode::CycleSource {
149 cycle_id,
150 metadata: location.new_node_metadata(Self::collection_kind()),
151 },
152 )
153 }
154}
155
156impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
157where
158 L: Location<'a> + NoTick,
159{
160 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161 assert_eq!(
162 Location::id(&self.location),
163 expected_location,
164 "locations do not match"
165 );
166 self.location
167 .flow_state()
168 .borrow_mut()
169 .push_root(HydroRoot::CycleSink {
170 cycle_id,
171 input: Box::new(self.ir_node.into_inner()),
172 op_metadata: HydroIrOpMetadata::new(),
173 });
174 }
175}
176
177impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
178where
179 T: Clone,
180 L: Location<'a>,
181{
182 fn clone(&self) -> Self {
183 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
184 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
185 *self.ir_node.borrow_mut() = HydroNode::Tee {
186 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
187 metadata: self.location.new_node_metadata(Self::collection_kind()),
188 };
189 }
190
191 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
192 Singleton {
193 location: self.location.clone(),
194 ir_node: HydroNode::Tee {
195 inner: TeeNode(inner.0.clone()),
196 metadata: metadata.clone(),
197 }
198 .into(),
199 _phantom: PhantomData,
200 }
201 } else {
202 unreachable!()
203 }
204 }
205}
206
207#[cfg(stageleft_runtime)]
208fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
209 me: Singleton<T, Tick<L>, B>,
210 other: Optional<O, Tick<L>, B>,
211) -> Optional<(T, O), Tick<L>, B> {
212 let me_as_optional: Optional<T, Tick<L>, B> = me.into();
213 super::optional::zip_inside_tick(me_as_optional, other)
214}
215
216impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
217where
218 L: Location<'a>,
219{
220 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
221 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
222 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
223 Singleton {
224 location,
225 ir_node: RefCell::new(ir_node),
226 _phantom: PhantomData,
227 }
228 }
229
230 pub(crate) fn collection_kind() -> CollectionKind {
231 CollectionKind::Singleton {
232 bound: B::BOUND_KIND,
233 element_type: stageleft::quote_type::<T>().into(),
234 }
235 }
236
237 /// Returns the [`Location`] where this singleton is being materialized.
238 pub fn location(&self) -> &L {
239 &self.location
240 }
241
242 /// Transforms the singleton value by applying a function `f` to it,
243 /// continuously as the input is updated.
244 ///
245 /// # Example
246 /// ```rust
247 /// # #[cfg(feature = "deploy")] {
248 /// # use hydro_lang::prelude::*;
249 /// # use futures::StreamExt;
250 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
251 /// let tick = process.tick();
252 /// let singleton = tick.singleton(q!(5));
253 /// singleton.map(q!(|v| v * 2)).all_ticks()
254 /// # }, |mut stream| async move {
255 /// // 10
256 /// # assert_eq!(stream.next().await.unwrap(), 10);
257 /// # }));
258 /// # }
259 /// ```
260 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
261 where
262 F: Fn(T) -> U + 'a,
263 {
264 let f = f.splice_fn1_ctx(&self.location).into();
265 Singleton::new(
266 self.location.clone(),
267 HydroNode::Map {
268 f,
269 input: Box::new(self.ir_node.into_inner()),
270 metadata: self
271 .location
272 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
273 },
274 )
275 }
276
277 /// Transforms the singleton value by applying a function `f` to it and then flattening
278 /// the result into a stream, preserving the order of elements.
279 ///
280 /// The function `f` is applied to the singleton value to produce an iterator, and all items
281 /// from that iterator are emitted in the output stream in deterministic order.
282 ///
283 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
284 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
285 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
286 ///
287 /// # Example
288 /// ```rust
289 /// # #[cfg(feature = "deploy")] {
290 /// # use hydro_lang::prelude::*;
291 /// # use futures::StreamExt;
292 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
293 /// let tick = process.tick();
294 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
295 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
296 /// # }, |mut stream| async move {
297 /// // 1, 2, 3
298 /// # for w in vec![1, 2, 3] {
299 /// # assert_eq!(stream.next().await.unwrap(), w);
300 /// # }
301 /// # }));
302 /// # }
303 /// ```
304 pub fn flat_map_ordered<U, I, F>(
305 self,
306 f: impl IntoQuotedMut<'a, F, L>,
307 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
308 where
309 I: IntoIterator<Item = U>,
310 F: Fn(T) -> I + 'a,
311 {
312 let f = f.splice_fn1_ctx(&self.location).into();
313 Stream::new(
314 self.location.clone(),
315 HydroNode::FlatMap {
316 f,
317 input: Box::new(self.ir_node.into_inner()),
318 metadata: self.location.new_node_metadata(
319 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
320 ),
321 },
322 )
323 }
324
325 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
326 /// for the output type `I` to produce items in any order.
327 ///
328 /// The function `f` is applied to the singleton value to produce an iterator, and all items
329 /// from that iterator are emitted in the output stream in non-deterministic order.
330 ///
331 /// # Example
332 /// ```rust
333 /// # #[cfg(feature = "deploy")] {
334 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
335 /// # use futures::StreamExt;
336 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
337 /// let tick = process.tick();
338 /// let singleton = tick.singleton(q!(
339 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
340 /// ));
341 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
342 /// # }, |mut stream| async move {
343 /// // 1, 2, 3, but in no particular order
344 /// # let mut results = Vec::new();
345 /// # for _ in 0..3 {
346 /// # results.push(stream.next().await.unwrap());
347 /// # }
348 /// # results.sort();
349 /// # assert_eq!(results, vec![1, 2, 3]);
350 /// # }));
351 /// # }
352 /// ```
353 pub fn flat_map_unordered<U, I, F>(
354 self,
355 f: impl IntoQuotedMut<'a, F, L>,
356 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
357 where
358 I: IntoIterator<Item = U>,
359 F: Fn(T) -> I + 'a,
360 {
361 let f = f.splice_fn1_ctx(&self.location).into();
362 Stream::new(
363 self.location.clone(),
364 HydroNode::FlatMap {
365 f,
366 input: Box::new(self.ir_node.into_inner()),
367 metadata: self
368 .location
369 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
370 },
371 )
372 }
373
374 /// Flattens the singleton value into a stream, preserving the order of elements.
375 ///
376 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
377 /// are emitted in the output stream in deterministic order.
378 ///
379 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
380 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
381 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
382 ///
383 /// # Example
384 /// ```rust
385 /// # #[cfg(feature = "deploy")] {
386 /// # use hydro_lang::prelude::*;
387 /// # use futures::StreamExt;
388 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
389 /// let tick = process.tick();
390 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
391 /// singleton.flatten_ordered().all_ticks()
392 /// # }, |mut stream| async move {
393 /// // 1, 2, 3
394 /// # for w in vec![1, 2, 3] {
395 /// # assert_eq!(stream.next().await.unwrap(), w);
396 /// # }
397 /// # }));
398 /// # }
399 /// ```
400 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
401 where
402 T: IntoIterator<Item = U>,
403 {
404 self.flat_map_ordered(q!(|x| x))
405 }
406
407 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
408 /// for the element type `T` to produce items in any order.
409 ///
410 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
411 /// are emitted in the output stream in non-deterministic order.
412 ///
413 /// # Example
414 /// ```rust
415 /// # #[cfg(feature = "deploy")] {
416 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
417 /// # use futures::StreamExt;
418 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
419 /// let tick = process.tick();
420 /// let singleton = tick.singleton(q!(
421 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
422 /// ));
423 /// singleton.flatten_unordered().all_ticks()
424 /// # }, |mut stream| async move {
425 /// // 1, 2, 3, but in no particular order
426 /// # let mut results = Vec::new();
427 /// # for _ in 0..3 {
428 /// # results.push(stream.next().await.unwrap());
429 /// # }
430 /// # results.sort();
431 /// # assert_eq!(results, vec![1, 2, 3]);
432 /// # }));
433 /// # }
434 /// ```
435 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
436 where
437 T: IntoIterator<Item = U>,
438 {
439 self.flat_map_unordered(q!(|x| x))
440 }
441
442 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
443 ///
444 /// If the predicate returns `true`, the output optional contains the same value.
445 /// If the predicate returns `false`, the output optional is empty.
446 ///
447 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
448 /// not modify or take ownership of the value. If you need to modify the value while filtering
449 /// use [`Singleton::filter_map`] instead.
450 ///
451 /// # Example
452 /// ```rust
453 /// # #[cfg(feature = "deploy")] {
454 /// # use hydro_lang::prelude::*;
455 /// # use futures::StreamExt;
456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457 /// let tick = process.tick();
458 /// let singleton = tick.singleton(q!(5));
459 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
460 /// # }, |mut stream| async move {
461 /// // 5
462 /// # assert_eq!(stream.next().await.unwrap(), 5);
463 /// # }));
464 /// # }
465 /// ```
466 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
467 where
468 F: Fn(&T) -> bool + 'a,
469 {
470 let f = f.splice_fn1_borrow_ctx(&self.location).into();
471 Optional::new(
472 self.location.clone(),
473 HydroNode::Filter {
474 f,
475 input: Box::new(self.ir_node.into_inner()),
476 metadata: self
477 .location
478 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
479 },
480 )
481 }
482
483 /// An operator that both filters and maps. It yields the value only if the supplied
484 /// closure `f` returns `Some(value)`.
485 ///
486 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
487 /// If the closure returns `None`, the output optional is empty.
488 ///
489 /// # Example
490 /// ```rust
491 /// # #[cfg(feature = "deploy")] {
492 /// # use hydro_lang::prelude::*;
493 /// # use futures::StreamExt;
494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
495 /// let tick = process.tick();
496 /// let singleton = tick.singleton(q!("42"));
497 /// singleton
498 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
499 /// .all_ticks()
500 /// # }, |mut stream| async move {
501 /// // 42
502 /// # assert_eq!(stream.next().await.unwrap(), 42);
503 /// # }));
504 /// # }
505 /// ```
506 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
507 where
508 F: Fn(T) -> Option<U> + 'a,
509 {
510 let f = f.splice_fn1_ctx(&self.location).into();
511 Optional::new(
512 self.location.clone(),
513 HydroNode::FilterMap {
514 f,
515 input: Box::new(self.ir_node.into_inner()),
516 metadata: self
517 .location
518 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
519 },
520 )
521 }
522
523 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
524 ///
525 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
526 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
527 /// non-null. This is useful for combining several pieces of state together.
528 ///
529 /// # Example
530 /// ```rust
531 /// # #[cfg(feature = "deploy")] {
532 /// # use hydro_lang::prelude::*;
533 /// # use futures::StreamExt;
534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535 /// let tick = process.tick();
536 /// let numbers = process
537 /// .source_iter(q!(vec![123, 456]))
538 /// .batch(&tick, nondet!(/** test */));
539 /// let count = numbers.clone().count(); // Singleton
540 /// let max = numbers.max(); // Optional
541 /// count.zip(max).all_ticks()
542 /// # }, |mut stream| async move {
543 /// // [(2, 456)]
544 /// # for w in vec![(2, 456)] {
545 /// # assert_eq!(stream.next().await.unwrap(), w);
546 /// # }
547 /// # }));
548 /// # }
549 /// ```
550 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
551 where
552 Self: ZipResult<'a, O, Location = L>,
553 B: IsBounded,
554 {
555 check_matching_location(&self.location, &Self::other_location(&other));
556
557 if L::is_top_level()
558 && let Some(tick) = self.location.try_tick()
559 {
560 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
561 let out = zip_inside_tick(
562 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
563 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
564 other_location.clone(),
565 HydroNode::Cast {
566 inner: Box::new(Self::other_ir_node(other)),
567 metadata: other_location.new_node_metadata(Optional::<
568 <Self as ZipResult<'a, O>>::OtherType,
569 Tick<L>,
570 Bounded,
571 >::collection_kind(
572 )),
573 },
574 )
575 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
576 )
577 .latest();
578
579 Self::make(out.location, out.ir_node.into_inner())
580 } else {
581 Self::make(
582 self.location.clone(),
583 HydroNode::CrossSingleton {
584 left: Box::new(self.ir_node.into_inner()),
585 right: Box::new(Self::other_ir_node(other)),
586 metadata: self.location.new_node_metadata(CollectionKind::Optional {
587 bound: B::BOUND_KIND,
588 element_type: stageleft::quote_type::<
589 <Self as ZipResult<'a, O>>::ElementType,
590 >()
591 .into(),
592 }),
593 },
594 )
595 }
596 }
597
598 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
599 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
600 ///
601 /// Useful for conditionally processing, such as only emitting a singleton's value outside
602 /// a tick if some other condition is satisfied.
603 ///
604 /// # Example
605 /// ```rust
606 /// # #[cfg(feature = "deploy")] {
607 /// # use hydro_lang::prelude::*;
608 /// # use futures::StreamExt;
609 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610 /// let tick = process.tick();
611 /// // ticks are lazy by default, forces the second tick to run
612 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
613 ///
614 /// let batch_first_tick = process
615 /// .source_iter(q!(vec![1]))
616 /// .batch(&tick, nondet!(/** test */));
617 /// let batch_second_tick = process
618 /// .source_iter(q!(vec![1, 2, 3]))
619 /// .batch(&tick, nondet!(/** test */))
620 /// .defer_tick(); // appears on the second tick
621 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
622 /// batch_first_tick.chain(batch_second_tick).count()
623 /// .filter_if_some(some_on_first_tick)
624 /// .all_ticks()
625 /// # }, |mut stream| async move {
626 /// // [1]
627 /// # for w in vec![1] {
628 /// # assert_eq!(stream.next().await.unwrap(), w);
629 /// # }
630 /// # }));
631 /// # }
632 /// ```
633 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
634 where
635 B: IsBounded,
636 {
637 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
638 .map(q!(|(d, _signal)| d))
639 }
640
641 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
642 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
643 ///
644 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
645 /// the condition.
646 ///
647 /// # Example
648 /// ```rust
649 /// # #[cfg(feature = "deploy")] {
650 /// # use hydro_lang::prelude::*;
651 /// # use futures::StreamExt;
652 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
653 /// let tick = process.tick();
654 /// // ticks are lazy by default, forces the second tick to run
655 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
656 ///
657 /// let batch_first_tick = process
658 /// .source_iter(q!(vec![1]))
659 /// .batch(&tick, nondet!(/** test */));
660 /// let batch_second_tick = process
661 /// .source_iter(q!(vec![1, 2, 3]))
662 /// .batch(&tick, nondet!(/** test */))
663 /// .defer_tick(); // appears on the second tick
664 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
665 /// batch_first_tick.chain(batch_second_tick).count()
666 /// .filter_if_none(some_on_first_tick)
667 /// .all_ticks()
668 /// # }, |mut stream| async move {
669 /// // [3]
670 /// # for w in vec![3] {
671 /// # assert_eq!(stream.next().await.unwrap(), w);
672 /// # }
673 /// # }));
674 /// # }
675 /// ```
676 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
677 where
678 B: IsBounded,
679 {
680 self.filter_if_some(
681 other
682 .map(q!(|_| ()))
683 .into_singleton()
684 .filter(q!(|o| o.is_none())),
685 )
686 }
687
688 /// An operator which allows you to "name" a `HydroNode`.
689 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
690 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
691 {
692 let mut node = self.ir_node.borrow_mut();
693 let metadata = node.metadata_mut();
694 metadata.tag = Some(name.to_owned());
695 }
696 self
697 }
698}
699
700impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
701where
702 L: Location<'a>,
703{
704 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
705 /// the inner `Option`.
706 ///
707 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
708 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
709 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
710 ///
711 /// # Example
712 /// ```rust
713 /// # #[cfg(feature = "deploy")] {
714 /// # use hydro_lang::prelude::*;
715 /// # use futures::StreamExt;
716 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
717 /// let tick = process.tick();
718 /// let singleton = tick.singleton(q!(Some(42)));
719 /// singleton.into_optional().all_ticks()
720 /// # }, |mut stream| async move {
721 /// // 42
722 /// # assert_eq!(stream.next().await.unwrap(), 42);
723 /// # }));
724 /// # }
725 /// ```
726 pub fn into_optional(self) -> Optional<T, L, B> {
727 self.filter_map(q!(|v| v))
728 }
729}
730
731impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
732where
733 L: Location<'a> + NoTick,
734{
735 /// Returns a singleton value corresponding to the latest snapshot of the singleton
736 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
737 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
738 /// all snapshots of this singleton into the atomic-associated tick will observe the
739 /// same value each tick.
740 ///
741 /// # Non-Determinism
742 /// Because this picks a snapshot of a singleton whose value is continuously changing,
743 /// the output singleton has a non-deterministic value since the snapshot can be at an
744 /// arbitrary point in time.
745 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
746 Singleton::new(
747 self.location.clone().tick,
748 HydroNode::Batch {
749 inner: Box::new(self.ir_node.into_inner()),
750 metadata: self
751 .location
752 .tick
753 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
754 },
755 )
756 }
757
758 /// Returns this singleton back into a top-level, asynchronous execution context where updates
759 /// to the value will be asynchronously propagated.
760 pub fn end_atomic(self) -> Singleton<T, L, B> {
761 Singleton::new(
762 self.location.tick.l.clone(),
763 HydroNode::EndAtomic {
764 inner: Box::new(self.ir_node.into_inner()),
765 metadata: self
766 .location
767 .tick
768 .l
769 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
770 },
771 )
772 }
773}
774
775impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
776where
777 L: Location<'a>,
778{
779 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
780 /// will observe the same version of the value and will be executed synchronously before any
781 /// outputs are yielded (in [`Optional::end_atomic`]).
782 ///
783 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
784 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
785 /// a different version).
786 ///
787 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
788 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
789 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
790 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
791 let out_location = Atomic { tick: tick.clone() };
792 Singleton::new(
793 out_location.clone(),
794 HydroNode::BeginAtomic {
795 inner: Box::new(self.ir_node.into_inner()),
796 metadata: out_location
797 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
798 },
799 )
800 }
801
802 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
803 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
804 /// relevant data that contributed to the snapshot at tick `t`.
805 ///
806 /// # Non-Determinism
807 /// Because this picks a snapshot of a singleton whose value is continuously changing,
808 /// the output singleton has a non-deterministic value since the snapshot can be at an
809 /// arbitrary point in time.
810 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
811 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
812 Singleton::new(
813 tick.clone(),
814 HydroNode::Batch {
815 inner: Box::new(self.ir_node.into_inner()),
816 metadata: tick
817 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
818 },
819 )
820 }
821
822 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
823 /// with order corresponding to increasing prefixes of data contributing to the singleton.
824 ///
825 /// # Non-Determinism
826 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
827 /// to non-deterministic batching and arrival of inputs, the output stream is
828 /// non-deterministic.
829 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
830 where
831 L: NoTick,
832 {
833 sliced! {
834 let snapshot = use(self, nondet);
835 snapshot.into_stream()
836 }
837 .weaken_retries()
838 }
839
840 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
841 /// value taken at various points in time. Because the input singleton may be
842 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
843 /// represent the value of the singleton given some prefix of the streams leading up to
844 /// it.
845 ///
846 /// # Non-Determinism
847 /// The output stream is non-deterministic in which elements are sampled, since this
848 /// is controlled by a clock.
849 pub fn sample_every(
850 self,
851 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
852 nondet: NonDet,
853 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
854 where
855 L: NoTick + NoAtomic,
856 {
857 let samples = self.location.source_interval(interval, nondet);
858 sliced! {
859 let snapshot = use(self, nondet);
860 let sample_batch = use(samples, nondet);
861
862 snapshot.filter_if_some(sample_batch.first()).into_stream()
863 }
864 .weaken_retries()
865 }
866
867 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
868 /// implies that `B == Bounded`.
869 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
870 where
871 B: IsBounded,
872 {
873 Singleton::new(self.location, self.ir_node.into_inner())
874 }
875
876 /// Clones this bounded singleton into a tick, returning a singleton that has the
877 /// same value as the outer singleton. Because the outer singleton is bounded, this
878 /// is deterministic because there is only a single immutable version.
879 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
880 where
881 B: IsBounded,
882 T: Clone,
883 {
884 // TODO(shadaj): avoid printing simulator logs for this snapshot
885 self.snapshot(
886 tick,
887 nondet!(/** bounded top-level singleton so deterministic */),
888 )
889 }
890
891 /// Converts this singleton into a [`Stream`] containing a single element, the value.
892 ///
893 /// # Example
894 /// ```rust
895 /// # #[cfg(feature = "deploy")] {
896 /// # use hydro_lang::prelude::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// let batch_input = process
901 /// .source_iter(q!(vec![123, 456]))
902 /// .batch(&tick, nondet!(/** test */));
903 /// batch_input.clone().chain(
904 /// batch_input.count().into_stream()
905 /// ).all_ticks()
906 /// # }, |mut stream| async move {
907 /// // [123, 456, 2]
908 /// # for w in vec![123, 456, 2] {
909 /// # assert_eq!(stream.next().await.unwrap(), w);
910 /// # }
911 /// # }));
912 /// # }
913 /// ```
914 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
915 where
916 B: IsBounded,
917 {
918 Stream::new(
919 self.location.clone(),
920 HydroNode::Cast {
921 inner: Box::new(self.ir_node.into_inner()),
922 metadata: self.location.new_node_metadata(Stream::<
923 T,
924 Tick<L>,
925 Bounded,
926 TotalOrder,
927 ExactlyOnce,
928 >::collection_kind()),
929 },
930 )
931 }
932}
933
934impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
935where
936 L: Location<'a>,
937{
938 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
939 /// which will stream the value computed in _each_ tick as a separate stream element.
940 ///
941 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
942 /// producing one element in the output for each tick. This is useful for batched computations,
943 /// where the results from each tick must be combined together.
944 ///
945 /// # Example
946 /// ```rust
947 /// # #[cfg(feature = "deploy")] {
948 /// # use hydro_lang::prelude::*;
949 /// # use futures::StreamExt;
950 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
951 /// let tick = process.tick();
952 /// # // ticks are lazy by default, forces the second tick to run
953 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
954 /// # let batch_first_tick = process
955 /// # .source_iter(q!(vec![1]))
956 /// # .batch(&tick, nondet!(/** test */));
957 /// # let batch_second_tick = process
958 /// # .source_iter(q!(vec![1, 2, 3]))
959 /// # .batch(&tick, nondet!(/** test */))
960 /// # .defer_tick(); // appears on the second tick
961 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
962 /// input_batch // first tick: [1], second tick: [1, 2, 3]
963 /// .count()
964 /// .all_ticks()
965 /// # }, |mut stream| async move {
966 /// // [1, 3]
967 /// # for w in vec![1, 3] {
968 /// # assert_eq!(stream.next().await.unwrap(), w);
969 /// # }
970 /// # }));
971 /// # }
972 /// ```
973 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
974 self.into_stream().all_ticks()
975 }
976
977 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
978 /// which will stream the value computed in _each_ tick as a separate stream element.
979 ///
980 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
981 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
982 /// singleton's [`Tick`] context.
983 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
984 self.into_stream().all_ticks_atomic()
985 }
986
987 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
988 /// be asynchronously updated with the latest value of the singleton inside the tick.
989 ///
990 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
991 /// tick that tracks the inner value. This is useful for getting the value as of the
992 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
993 ///
994 /// # Example
995 /// ```rust
996 /// # #[cfg(feature = "deploy")] {
997 /// # use hydro_lang::prelude::*;
998 /// # use futures::StreamExt;
999 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1000 /// let tick = process.tick();
1001 /// # // ticks are lazy by default, forces the second tick to run
1002 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1003 /// # let batch_first_tick = process
1004 /// # .source_iter(q!(vec![1]))
1005 /// # .batch(&tick, nondet!(/** test */));
1006 /// # let batch_second_tick = process
1007 /// # .source_iter(q!(vec![1, 2, 3]))
1008 /// # .batch(&tick, nondet!(/** test */))
1009 /// # .defer_tick(); // appears on the second tick
1010 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1011 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1012 /// .count()
1013 /// .latest()
1014 /// # .sample_eager(nondet!(/** test */))
1015 /// # }, |mut stream| async move {
1016 /// // asynchronously changes from 1 ~> 3
1017 /// # for w in vec![1, 3] {
1018 /// # assert_eq!(stream.next().await.unwrap(), w);
1019 /// # }
1020 /// # }));
1021 /// # }
1022 /// ```
1023 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1024 Singleton::new(
1025 self.location.outer().clone(),
1026 HydroNode::YieldConcat {
1027 inner: Box::new(self.ir_node.into_inner()),
1028 metadata: self
1029 .location
1030 .outer()
1031 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1032 },
1033 )
1034 }
1035
1036 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1037 /// be updated with the latest value of the singleton inside the tick.
1038 ///
1039 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1040 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1041 /// singleton's [`Tick`] context.
1042 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1043 let out_location = Atomic {
1044 tick: self.location.clone(),
1045 };
1046 Singleton::new(
1047 out_location.clone(),
1048 HydroNode::YieldConcat {
1049 inner: Box::new(self.ir_node.into_inner()),
1050 metadata: out_location
1051 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1052 },
1053 )
1054 }
1055}
1056
1057#[doc(hidden)]
1058/// Helper trait that determines the output collection type for [`Singleton::zip`].
1059///
1060/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1061/// [`Singleton`].
1062#[sealed::sealed]
1063pub trait ZipResult<'a, Other> {
1064 /// The output collection type.
1065 type Out;
1066 /// The type of the tupled output value.
1067 type ElementType;
1068 /// The type of the other collection's value.
1069 type OtherType;
1070 /// The location where the tupled result will be materialized.
1071 type Location: Location<'a>;
1072
1073 /// The location of the second input to the `zip`.
1074 fn other_location(other: &Other) -> Self::Location;
1075 /// The IR node of the second input to the `zip`.
1076 fn other_ir_node(other: Other) -> HydroNode;
1077
1078 /// Constructs the output live collection given an IR node containing the zip result.
1079 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1080}
1081
1082#[sealed::sealed]
1083impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1084where
1085 L: Location<'a>,
1086{
1087 type Out = Singleton<(T, U), L, B>;
1088 type ElementType = (T, U);
1089 type OtherType = U;
1090 type Location = L;
1091
1092 fn other_location(other: &Singleton<U, L, B>) -> L {
1093 other.location.clone()
1094 }
1095
1096 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1097 other.ir_node.into_inner()
1098 }
1099
1100 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1101 Singleton::new(
1102 location.clone(),
1103 HydroNode::Cast {
1104 inner: Box::new(ir_node),
1105 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1106 },
1107 )
1108 }
1109}
1110
1111#[sealed::sealed]
1112impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1113where
1114 L: Location<'a>,
1115{
1116 type Out = Optional<(T, U), L, B>;
1117 type ElementType = (T, U);
1118 type OtherType = U;
1119 type Location = L;
1120
1121 fn other_location(other: &Optional<U, L, B>) -> L {
1122 other.location.clone()
1123 }
1124
1125 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1126 other.ir_node.into_inner()
1127 }
1128
1129 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1130 Optional::new(location, ir_node)
1131 }
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136 #[cfg(feature = "deploy")]
1137 use futures::{SinkExt, StreamExt};
1138 #[cfg(feature = "deploy")]
1139 use hydro_deploy::Deployment;
1140 #[cfg(any(feature = "deploy", feature = "sim"))]
1141 use stageleft::q;
1142
1143 #[cfg(any(feature = "deploy", feature = "sim"))]
1144 use crate::compile::builder::FlowBuilder;
1145 #[cfg(feature = "deploy")]
1146 use crate::live_collections::stream::ExactlyOnce;
1147 #[cfg(any(feature = "deploy", feature = "sim"))]
1148 use crate::location::Location;
1149 #[cfg(any(feature = "deploy", feature = "sim"))]
1150 use crate::nondet::nondet;
1151
1152 #[cfg(feature = "deploy")]
1153 #[tokio::test]
1154 async fn tick_cycle_cardinality() {
1155 let mut deployment = Deployment::new();
1156
1157 let mut flow = FlowBuilder::new();
1158 let node = flow.process::<()>();
1159 let external = flow.external::<()>();
1160
1161 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1162
1163 let node_tick = node.tick();
1164 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1165 let counts = singleton
1166 .clone()
1167 .into_stream()
1168 .count()
1169 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1170 .all_ticks()
1171 .send_bincode_external(&external);
1172 complete_cycle.complete_next_tick(singleton);
1173
1174 let nodes = flow
1175 .with_process(&node, deployment.Localhost())
1176 .with_external(&external, deployment.Localhost())
1177 .deploy(&mut deployment);
1178
1179 deployment.deploy().await.unwrap();
1180
1181 let mut tick_trigger = nodes.connect(input_send).await;
1182 let mut external_out = nodes.connect(counts).await;
1183
1184 deployment.start().await.unwrap();
1185
1186 tick_trigger.send(()).await.unwrap();
1187
1188 assert_eq!(external_out.next().await.unwrap(), 1);
1189
1190 tick_trigger.send(()).await.unwrap();
1191
1192 assert_eq!(external_out.next().await.unwrap(), 1);
1193 }
1194
1195 #[cfg(feature = "sim")]
1196 #[test]
1197 #[should_panic]
1198 fn sim_fold_intermediate_states() {
1199 let mut flow = FlowBuilder::new();
1200 let node = flow.process::<()>();
1201
1202 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1203 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1204
1205 let tick = node.tick();
1206 let batch = folded.snapshot(&tick, nondet!(/** test */));
1207 let out_recv = batch.all_ticks().sim_output();
1208
1209 flow.sim().exhaustive(async || {
1210 assert_eq!(out_recv.next().await.unwrap(), 10);
1211 });
1212 }
1213
1214 #[cfg(feature = "sim")]
1215 #[test]
1216 fn sim_fold_intermediate_state_count() {
1217 let mut flow = FlowBuilder::new();
1218 let node = flow.process::<()>();
1219
1220 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1221 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1222
1223 let tick = node.tick();
1224 let batch = folded.snapshot(&tick, nondet!(/** test */));
1225 let out_recv = batch.all_ticks().sim_output();
1226
1227 let instance_count = flow.sim().exhaustive(async || {
1228 let out = out_recv.collect::<Vec<_>>().await;
1229 assert_eq!(out.last(), Some(&10));
1230 });
1231
1232 assert_eq!(
1233 instance_count,
1234 16 // 2^4 possible subsets of intermediates (including initial state)
1235 )
1236 }
1237
1238 #[cfg(feature = "sim")]
1239 #[test]
1240 fn sim_fold_no_repeat_initial() {
1241 // check that we don't repeat the initial state of the fold in autonomous decisions
1242
1243 let mut flow = FlowBuilder::new();
1244 let node = flow.process::<()>();
1245
1246 let (in_port, input) = node.sim_input();
1247 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1248
1249 let tick = node.tick();
1250 let batch = folded.snapshot(&tick, nondet!(/** test */));
1251 let out_recv = batch.all_ticks().sim_output();
1252
1253 flow.sim().exhaustive(async || {
1254 assert_eq!(out_recv.next().await.unwrap(), 0);
1255
1256 in_port.send(123);
1257
1258 assert_eq!(out_recv.next().await.unwrap(), 123);
1259 });
1260 }
1261
1262 #[cfg(feature = "sim")]
1263 #[test]
1264 #[should_panic]
1265 fn sim_fold_repeats_snapshots() {
1266 // when the tick is driven by a snapshot AND something else, the snapshot can
1267 // "stutter" and repeat the same state multiple times
1268
1269 let mut flow = FlowBuilder::new();
1270 let node = flow.process::<()>();
1271
1272 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1273 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1274
1275 let tick = node.tick();
1276 let batch = source
1277 .batch(&tick, nondet!(/** test */))
1278 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1279 let out_recv = batch.all_ticks().sim_output();
1280
1281 flow.sim().exhaustive(async || {
1282 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1283 {
1284 panic!("repeated snapshot");
1285 }
1286 });
1287 }
1288
1289 #[cfg(feature = "sim")]
1290 #[test]
1291 fn sim_fold_repeats_snapshots_count() {
1292 // check the number of instances
1293 let mut flow = FlowBuilder::new();
1294 let node = flow.process::<()>();
1295
1296 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1297 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1298
1299 let tick = node.tick();
1300 let batch = source
1301 .batch(&tick, nondet!(/** test */))
1302 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1303 let out_recv = batch.all_ticks().sim_output();
1304
1305 let count = flow.sim().exhaustive(async || {
1306 let _ = out_recv.collect::<Vec<_>>().await;
1307 });
1308
1309 assert_eq!(count, 52);
1310 // don't have a combinatorial explanation for this number yet, but checked via logs
1311 }
1312
1313 #[cfg(feature = "sim")]
1314 #[test]
1315 fn sim_top_level_singleton_exhaustive() {
1316 // ensures that top-level singletons have only one snapshot
1317 let mut flow = FlowBuilder::new();
1318 let node = flow.process::<()>();
1319
1320 let singleton = node.singleton(q!(1));
1321 let tick = node.tick();
1322 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1323 let out_recv = batch.all_ticks().sim_output();
1324
1325 let count = flow.sim().exhaustive(async || {
1326 let _ = out_recv.collect::<Vec<_>>().await;
1327 });
1328
1329 assert_eq!(count, 1);
1330 }
1331
1332 #[cfg(feature = "sim")]
1333 #[test]
1334 fn sim_top_level_singleton_join_count() {
1335 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1336 // exploration
1337
1338 let mut flow = FlowBuilder::new();
1339 let node = flow.process::<()>();
1340
1341 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1342 let tick = node.tick();
1343 let batch = source_iter
1344 .batch(&tick, nondet!(/** test */))
1345 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1346 let out_recv = batch.all_ticks().sim_output();
1347
1348 let instance_count = flow.sim().exhaustive(async || {
1349 let _ = out_recv.collect::<Vec<_>>().await;
1350 });
1351
1352 assert_eq!(
1353 instance_count,
1354 16 // 2^4 ways to split up (including a possibly empty first batch)
1355 )
1356 }
1357
1358 #[cfg(feature = "sim")]
1359 #[test]
1360 fn top_level_singleton_into_stream_no_replay() {
1361 let mut flow = FlowBuilder::new();
1362 let node = flow.process::<()>();
1363
1364 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1365 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1366
1367 let out_recv = folded.into_stream().sim_output();
1368
1369 flow.sim().exhaustive(async || {
1370 out_recv.assert_yields_only([10]).await;
1371 });
1372 }
1373
1374 #[cfg(feature = "sim")]
1375 #[test]
1376 fn inside_tick_singleton_zip() {
1377 use crate::live_collections::Stream;
1378 use crate::live_collections::sliced::sliced;
1379
1380 let mut flow = FlowBuilder::new();
1381 let node = flow.process::<()>();
1382
1383 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1384 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1385
1386 let out_recv = sliced! {
1387 let v = use(folded, nondet!(/** test */));
1388 v.clone().zip(v).into_stream()
1389 }
1390 .sim_output();
1391
1392 let count = flow.sim().exhaustive(async || {
1393 let out = out_recv.collect::<Vec<_>>().await;
1394 assert_eq!(out.last(), Some(&(3, 3)));
1395 });
1396
1397 assert_eq!(count, 4);
1398 }
1399}