hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{ApplyMonotoneStream, Proved};
28
29/// A marker trait indicating which components of a [`Singleton`] may change.
30///
31/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
32/// includes an additional variant [`Monotonic`], which means that the value will only grow.
33pub trait SingletonBound {
34 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
35 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
36
37 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
38 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
39
40 /// Returns the [`SingletonBoundKind`] corresponding to this type.
41 fn bound_kind() -> SingletonBoundKind;
42}
43
44impl SingletonBound for Unbounded {
45 type UnderlyingBound = Unbounded;
46
47 type StreamToMonotone = Monotonic;
48
49 fn bound_kind() -> SingletonBoundKind {
50 SingletonBoundKind::Unbounded
51 }
52}
53
54impl SingletonBound for Bounded {
55 type UnderlyingBound = Bounded;
56
57 type StreamToMonotone = Bounded;
58
59 fn bound_kind() -> SingletonBoundKind {
60 SingletonBoundKind::Bounded
61 }
62}
63
64/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
65pub struct Monotonic;
66
67impl SingletonBound for Monotonic {
68 type UnderlyingBound = Unbounded;
69
70 type StreamToMonotone = Monotonic;
71
72 fn bound_kind() -> SingletonBoundKind {
73 SingletonBoundKind::Monotonic
74 }
75}
76
77#[sealed]
78#[diagnostic::on_unimplemented(
79 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
80 label = "required here",
81 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
82)]
83/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
84pub trait IsMonotonic: SingletonBound {}
85
86#[sealed]
87#[diagnostic::do_not_recommend]
88impl IsMonotonic for Monotonic {}
89
90#[sealed]
91#[diagnostic::do_not_recommend]
92impl<B: IsBounded> IsMonotonic for B {}
93
94/// A single Rust value that can asynchronously change over time.
95///
96/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
97/// [`Unbounded`], the value will asynchronously change over time.
98///
99/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
100/// a single number that will asynchronously change as events are processed. Singletons also appear
101/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
102/// such as getting the length of a batch of requests.
103///
104/// Type Parameters:
105/// - `Type`: the type of the value in this singleton
106/// - `Loc`: the [`Location`] where the singleton is materialized
107/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
108pub struct Singleton<Type, Loc, Bound: SingletonBound> {
109 pub(crate) location: Loc,
110 pub(crate) ir_node: RefCell<HydroNode>,
111 pub(crate) flow_state: FlowState,
112
113 _phantom: PhantomData<(Type, Loc, Bound)>,
114}
115
116impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
117 fn drop(&mut self) {
118 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
119 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
120 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
121 input: Box::new(ir_node),
122 op_metadata: HydroIrOpMetadata::new(),
123 });
124 }
125 }
126}
127
128impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
129where
130 T: Clone,
131 L: Location<'a> + NoTick,
132{
133 fn from(value: Singleton<T, L, Bounded>) -> Self {
134 let tick = value.location().tick();
135 value.clone_into_tick(&tick).latest()
136 }
137}
138
139impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
140where
141 L: Location<'a>,
142{
143 type Location = Tick<L>;
144
145 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
146 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
147 location.clone(),
148 HydroNode::DeferTick {
149 input: Box::new(HydroNode::CycleSource {
150 cycle_id,
151 metadata: location.new_node_metadata(Self::collection_kind()),
152 }),
153 metadata: location
154 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
155 },
156 );
157
158 from_previous_tick.unwrap_or(initial)
159 }
160}
161
162impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
163where
164 L: Location<'a>,
165{
166 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
167 assert_eq!(
168 Location::id(&self.location),
169 expected_location,
170 "locations do not match"
171 );
172 self.location
173 .flow_state()
174 .borrow_mut()
175 .push_root(HydroRoot::CycleSink {
176 cycle_id,
177 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
178 op_metadata: HydroIrOpMetadata::new(),
179 });
180 }
181}
182
183impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
184where
185 L: Location<'a>,
186{
187 type Location = Tick<L>;
188
189 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
190 Singleton::new(
191 location.clone(),
192 HydroNode::CycleSource {
193 cycle_id,
194 metadata: location.new_node_metadata(Self::collection_kind()),
195 },
196 )
197 }
198}
199
200impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
201where
202 L: Location<'a>,
203{
204 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
205 assert_eq!(
206 Location::id(&self.location),
207 expected_location,
208 "locations do not match"
209 );
210 self.location
211 .flow_state()
212 .borrow_mut()
213 .push_root(HydroRoot::CycleSink {
214 cycle_id,
215 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
216 op_metadata: HydroIrOpMetadata::new(),
217 });
218 }
219}
220
221impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
222where
223 L: Location<'a> + NoTick,
224{
225 type Location = L;
226
227 fn create_source(cycle_id: CycleId, location: L) -> Self {
228 Singleton::new(
229 location.clone(),
230 HydroNode::CycleSource {
231 cycle_id,
232 metadata: location.new_node_metadata(Self::collection_kind()),
233 },
234 )
235 }
236}
237
238impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
239where
240 L: Location<'a> + NoTick,
241{
242 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
243 assert_eq!(
244 Location::id(&self.location),
245 expected_location,
246 "locations do not match"
247 );
248 self.location
249 .flow_state()
250 .borrow_mut()
251 .push_root(HydroRoot::CycleSink {
252 cycle_id,
253 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
254 op_metadata: HydroIrOpMetadata::new(),
255 });
256 }
257}
258
259impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
260where
261 T: Clone,
262 L: Location<'a>,
263{
264 fn clone(&self) -> Self {
265 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
266 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
267 *self.ir_node.borrow_mut() = HydroNode::Tee {
268 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
269 metadata: self.location.new_node_metadata(Self::collection_kind()),
270 };
271 }
272
273 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
274 Singleton {
275 location: self.location.clone(),
276 flow_state: self.flow_state.clone(),
277 ir_node: HydroNode::Tee {
278 inner: SharedNode(inner.0.clone()),
279 metadata: metadata.clone(),
280 }
281 .into(),
282 _phantom: PhantomData,
283 }
284 } else {
285 unreachable!()
286 }
287 }
288}
289
290#[cfg(stageleft_runtime)]
291fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
292 me: Singleton<T, Tick<L>, B>,
293 other: Optional<O, Tick<L>, B::UnderlyingBound>,
294) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
295 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
296 super::optional::zip_inside_tick(me_as_optional, other)
297}
298
299impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
300where
301 L: Location<'a>,
302{
303 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
304 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
305 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
306 let flow_state = location.flow_state().clone();
307 Singleton {
308 location,
309 flow_state,
310 ir_node: RefCell::new(ir_node),
311 _phantom: PhantomData,
312 }
313 }
314
315 pub(crate) fn collection_kind() -> CollectionKind {
316 CollectionKind::Singleton {
317 bound: B::bound_kind(),
318 element_type: stageleft::quote_type::<T>().into(),
319 }
320 }
321
322 /// Returns the [`Location`] where this singleton is being materialized.
323 pub fn location(&self) -> &L {
324 &self.location
325 }
326
327 /// Drops the monotonicity property of the [`Singleton`].
328 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
329 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
330 Singleton::new(
331 self.location.clone(),
332 self.ir_node.replace(HydroNode::Placeholder),
333 )
334 } else {
335 Singleton::new(
336 self.location.clone(),
337 HydroNode::Cast {
338 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
339 metadata:
340 self.location.new_node_metadata(
341 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
342 ),
343 },
344 )
345 }
346 }
347
348 /// Transforms the singleton value by applying a function `f` to it,
349 /// continuously as the input is updated.
350 ///
351 /// # Example
352 /// ```rust
353 /// # #[cfg(feature = "deploy")] {
354 /// # use hydro_lang::prelude::*;
355 /// # use futures::StreamExt;
356 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
357 /// let tick = process.tick();
358 /// let singleton = tick.singleton(q!(5));
359 /// singleton.map(q!(|v| v * 2)).all_ticks()
360 /// # }, |mut stream| async move {
361 /// // 10
362 /// # assert_eq!(stream.next().await.unwrap(), 10);
363 /// # }));
364 /// # }
365 /// ```
366 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B::UnderlyingBound>
367 where
368 F: Fn(T) -> U + 'a,
369 {
370 let f = f.splice_fn1_ctx(&self.location).into();
371 Singleton::new(
372 self.location.clone(),
373 HydroNode::Map {
374 f,
375 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376 metadata: self
377 .location
378 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
379 },
380 )
381 }
382
383 /// Transforms the singleton value by applying a function `f` to it and then flattening
384 /// the result into a stream, preserving the order of elements.
385 ///
386 /// The function `f` is applied to the singleton value to produce an iterator, and all items
387 /// from that iterator are emitted in the output stream in deterministic order.
388 ///
389 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
390 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
391 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
392 ///
393 /// # Example
394 /// ```rust
395 /// # #[cfg(feature = "deploy")] {
396 /// # use hydro_lang::prelude::*;
397 /// # use futures::StreamExt;
398 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
399 /// let tick = process.tick();
400 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
401 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
402 /// # }, |mut stream| async move {
403 /// // 1, 2, 3
404 /// # for w in vec![1, 2, 3] {
405 /// # assert_eq!(stream.next().await.unwrap(), w);
406 /// # }
407 /// # }));
408 /// # }
409 /// ```
410 pub fn flat_map_ordered<U, I, F>(
411 self,
412 f: impl IntoQuotedMut<'a, F, L>,
413 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
414 where
415 B: IsBounded,
416 I: IntoIterator<Item = U>,
417 F: Fn(T) -> I + 'a,
418 {
419 self.into_stream().flat_map_ordered(f)
420 }
421
422 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
423 /// for the output type `I` to produce items in any order.
424 ///
425 /// The function `f` is applied to the singleton value to produce an iterator, and all items
426 /// from that iterator are emitted in the output stream in non-deterministic order.
427 ///
428 /// # Example
429 /// ```rust
430 /// # #[cfg(feature = "deploy")] {
431 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
432 /// # use futures::StreamExt;
433 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
434 /// let tick = process.tick();
435 /// let singleton = tick.singleton(q!(
436 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
437 /// ));
438 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
439 /// # }, |mut stream| async move {
440 /// // 1, 2, 3, but in no particular order
441 /// # let mut results = Vec::new();
442 /// # for _ in 0..3 {
443 /// # results.push(stream.next().await.unwrap());
444 /// # }
445 /// # results.sort();
446 /// # assert_eq!(results, vec![1, 2, 3]);
447 /// # }));
448 /// # }
449 /// ```
450 pub fn flat_map_unordered<U, I, F>(
451 self,
452 f: impl IntoQuotedMut<'a, F, L>,
453 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
454 where
455 B: IsBounded,
456 I: IntoIterator<Item = U>,
457 F: Fn(T) -> I + 'a,
458 {
459 self.into_stream().flat_map_unordered(f)
460 }
461
462 /// Flattens the singleton value into a stream, preserving the order of elements.
463 ///
464 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
465 /// are emitted in the output stream in deterministic order.
466 ///
467 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
468 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
469 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
470 ///
471 /// # Example
472 /// ```rust
473 /// # #[cfg(feature = "deploy")] {
474 /// # use hydro_lang::prelude::*;
475 /// # use futures::StreamExt;
476 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
477 /// let tick = process.tick();
478 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
479 /// singleton.flatten_ordered().all_ticks()
480 /// # }, |mut stream| async move {
481 /// // 1, 2, 3
482 /// # for w in vec![1, 2, 3] {
483 /// # assert_eq!(stream.next().await.unwrap(), w);
484 /// # }
485 /// # }));
486 /// # }
487 /// ```
488 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
489 where
490 B: IsBounded,
491 T: IntoIterator<Item = U>,
492 {
493 self.flat_map_ordered(q!(|x| x))
494 }
495
496 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
497 /// for the element type `T` to produce items in any order.
498 ///
499 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
500 /// are emitted in the output stream in non-deterministic order.
501 ///
502 /// # Example
503 /// ```rust
504 /// # #[cfg(feature = "deploy")] {
505 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
506 /// # use futures::StreamExt;
507 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
508 /// let tick = process.tick();
509 /// let singleton = tick.singleton(q!(
510 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
511 /// ));
512 /// singleton.flatten_unordered().all_ticks()
513 /// # }, |mut stream| async move {
514 /// // 1, 2, 3, but in no particular order
515 /// # let mut results = Vec::new();
516 /// # for _ in 0..3 {
517 /// # results.push(stream.next().await.unwrap());
518 /// # }
519 /// # results.sort();
520 /// # assert_eq!(results, vec![1, 2, 3]);
521 /// # }));
522 /// # }
523 /// ```
524 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
525 where
526 B: IsBounded,
527 T: IntoIterator<Item = U>,
528 {
529 self.flat_map_unordered(q!(|x| x))
530 }
531
532 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
533 ///
534 /// If the predicate returns `true`, the output optional contains the same value.
535 /// If the predicate returns `false`, the output optional is empty.
536 ///
537 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
538 /// not modify or take ownership of the value. If you need to modify the value while filtering
539 /// use [`Singleton::filter_map`] instead.
540 ///
541 /// # Example
542 /// ```rust
543 /// # #[cfg(feature = "deploy")] {
544 /// # use hydro_lang::prelude::*;
545 /// # use futures::StreamExt;
546 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
547 /// let tick = process.tick();
548 /// let singleton = tick.singleton(q!(5));
549 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
550 /// # }, |mut stream| async move {
551 /// // 5
552 /// # assert_eq!(stream.next().await.unwrap(), 5);
553 /// # }));
554 /// # }
555 /// ```
556 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
557 where
558 F: Fn(&T) -> bool + 'a,
559 {
560 let f = f.splice_fn1_borrow_ctx(&self.location).into();
561 Optional::new(
562 self.location.clone(),
563 HydroNode::Filter {
564 f,
565 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
566 metadata: self
567 .location
568 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
569 },
570 )
571 }
572
573 /// An operator that both filters and maps. It yields the value only if the supplied
574 /// closure `f` returns `Some(value)`.
575 ///
576 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
577 /// If the closure returns `None`, the output optional is empty.
578 ///
579 /// # Example
580 /// ```rust
581 /// # #[cfg(feature = "deploy")] {
582 /// # use hydro_lang::prelude::*;
583 /// # use futures::StreamExt;
584 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
585 /// let tick = process.tick();
586 /// let singleton = tick.singleton(q!("42"));
587 /// singleton
588 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
589 /// .all_ticks()
590 /// # }, |mut stream| async move {
591 /// // 42
592 /// # assert_eq!(stream.next().await.unwrap(), 42);
593 /// # }));
594 /// # }
595 /// ```
596 pub fn filter_map<U, F>(
597 self,
598 f: impl IntoQuotedMut<'a, F, L>,
599 ) -> Optional<U, L, B::UnderlyingBound>
600 where
601 F: Fn(T) -> Option<U> + 'a,
602 {
603 let f = f.splice_fn1_ctx(&self.location).into();
604 Optional::new(
605 self.location.clone(),
606 HydroNode::FilterMap {
607 f,
608 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609 metadata: self
610 .location
611 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
612 },
613 )
614 }
615
616 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
617 ///
618 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
619 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
620 /// non-null. This is useful for combining several pieces of state together.
621 ///
622 /// # Example
623 /// ```rust
624 /// # #[cfg(feature = "deploy")] {
625 /// # use hydro_lang::prelude::*;
626 /// # use futures::StreamExt;
627 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
628 /// let tick = process.tick();
629 /// let numbers = process
630 /// .source_iter(q!(vec![123, 456]))
631 /// .batch(&tick, nondet!(/** test */));
632 /// let count = numbers.clone().count(); // Singleton
633 /// let max = numbers.max(); // Optional
634 /// count.zip(max).all_ticks()
635 /// # }, |mut stream| async move {
636 /// // [(2, 456)]
637 /// # for w in vec![(2, 456)] {
638 /// # assert_eq!(stream.next().await.unwrap(), w);
639 /// # }
640 /// # }));
641 /// # }
642 /// ```
643 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
644 where
645 Self: ZipResult<'a, O, Location = L>,
646 B: IsBounded,
647 {
648 check_matching_location(&self.location, &Self::other_location(&other));
649
650 if L::is_top_level()
651 && let Some(tick) = self.location.try_tick()
652 {
653 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
654 let out = zip_inside_tick(
655 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
656 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
657 other_location.clone(),
658 HydroNode::Cast {
659 inner: Box::new(Self::other_ir_node(other)),
660 metadata: other_location.new_node_metadata(Optional::<
661 <Self as ZipResult<'a, O>>::OtherType,
662 Tick<L>,
663 Bounded,
664 >::collection_kind(
665 )),
666 },
667 )
668 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
669 )
670 .latest();
671
672 Self::make(
673 out.location.clone(),
674 out.ir_node.replace(HydroNode::Placeholder),
675 )
676 } else {
677 Self::make(
678 self.location.clone(),
679 HydroNode::CrossSingleton {
680 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
681 right: Box::new(Self::other_ir_node(other)),
682 metadata: self.location.new_node_metadata(CollectionKind::Optional {
683 bound: B::BOUND_KIND,
684 element_type: stageleft::quote_type::<
685 <Self as ZipResult<'a, O>>::ElementType,
686 >()
687 .into(),
688 }),
689 },
690 )
691 }
692 }
693
694 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
695 /// boolean signal is `true`, otherwise the output is null.
696 ///
697 /// # Example
698 /// ```rust
699 /// # #[cfg(feature = "deploy")] {
700 /// # use hydro_lang::prelude::*;
701 /// # use futures::StreamExt;
702 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
703 /// let tick = process.tick();
704 /// // ticks are lazy by default, forces the second tick to run
705 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
706 ///
707 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
708 /// let batch_first_tick = process
709 /// .source_iter(q!(vec![1]))
710 /// .batch(&tick, nondet!(/** test */));
711 /// let batch_second_tick = process
712 /// .source_iter(q!(vec![1, 2, 3]))
713 /// .batch(&tick, nondet!(/** test */))
714 /// .defer_tick();
715 /// batch_first_tick.chain(batch_second_tick).count()
716 /// .filter_if(signal)
717 /// .all_ticks()
718 /// # }, |mut stream| async move {
719 /// // [1]
720 /// # for w in vec![1] {
721 /// # assert_eq!(stream.next().await.unwrap(), w);
722 /// # }
723 /// # }));
724 /// # }
725 /// ```
726 pub fn filter_if(
727 self,
728 signal: Singleton<bool, L, B>,
729 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
730 where
731 B: IsBounded,
732 {
733 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
734 }
735
736 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
737 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
738 ///
739 /// Useful for conditionally processing, such as only emitting a singleton's value outside
740 /// a tick if some other condition is satisfied.
741 ///
742 /// # Example
743 /// ```rust
744 /// # #[cfg(feature = "deploy")] {
745 /// # use hydro_lang::prelude::*;
746 /// # use futures::StreamExt;
747 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
748 /// let tick = process.tick();
749 /// // ticks are lazy by default, forces the second tick to run
750 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
751 ///
752 /// let batch_first_tick = process
753 /// .source_iter(q!(vec![1]))
754 /// .batch(&tick, nondet!(/** test */));
755 /// let batch_second_tick = process
756 /// .source_iter(q!(vec![1, 2, 3]))
757 /// .batch(&tick, nondet!(/** test */))
758 /// .defer_tick(); // appears on the second tick
759 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
760 /// batch_first_tick.chain(batch_second_tick).count()
761 /// .filter_if_some(some_on_first_tick)
762 /// .all_ticks()
763 /// # }, |mut stream| async move {
764 /// // [1]
765 /// # for w in vec![1] {
766 /// # assert_eq!(stream.next().await.unwrap(), w);
767 /// # }
768 /// # }));
769 /// # }
770 /// ```
771 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
772 pub fn filter_if_some<U>(
773 self,
774 signal: Optional<U, L, B>,
775 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
776 where
777 B: IsBounded,
778 {
779 self.filter_if(signal.is_some())
780 }
781
782 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
783 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
784 ///
785 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
786 /// the condition.
787 ///
788 /// # Example
789 /// ```rust
790 /// # #[cfg(feature = "deploy")] {
791 /// # use hydro_lang::prelude::*;
792 /// # use futures::StreamExt;
793 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
794 /// let tick = process.tick();
795 /// // ticks are lazy by default, forces the second tick to run
796 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
797 ///
798 /// let batch_first_tick = process
799 /// .source_iter(q!(vec![1]))
800 /// .batch(&tick, nondet!(/** test */));
801 /// let batch_second_tick = process
802 /// .source_iter(q!(vec![1, 2, 3]))
803 /// .batch(&tick, nondet!(/** test */))
804 /// .defer_tick(); // appears on the second tick
805 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
806 /// batch_first_tick.chain(batch_second_tick).count()
807 /// .filter_if_none(some_on_first_tick)
808 /// .all_ticks()
809 /// # }, |mut stream| async move {
810 /// // [3]
811 /// # for w in vec![3] {
812 /// # assert_eq!(stream.next().await.unwrap(), w);
813 /// # }
814 /// # }));
815 /// # }
816 /// ```
817 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
818 pub fn filter_if_none<U>(
819 self,
820 other: Optional<U, L, B>,
821 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
822 where
823 B: IsBounded,
824 {
825 self.filter_if(other.is_none())
826 }
827
828 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
829 ///
830 /// # Example
831 /// ```rust
832 /// # #[cfg(feature = "deploy")] {
833 /// # use hydro_lang::prelude::*;
834 /// # use futures::StreamExt;
835 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
836 /// let tick = process.tick();
837 /// let a = tick.singleton(q!(5));
838 /// let b = tick.singleton(q!(5));
839 /// a.equals(b).all_ticks()
840 /// # }, |mut stream| async move {
841 /// // [true]
842 /// # assert_eq!(stream.next().await.unwrap(), true);
843 /// # }));
844 /// # }
845 /// ```
846 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
847 where
848 T: PartialEq,
849 B: IsBounded,
850 {
851 self.zip(other).map(q!(|(a, b)| a == b))
852 }
853
854 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
855 /// greater than or equal to the provided threshold. The event will have the value of the
856 /// given threshold.
857 ///
858 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
859 /// the threshold would be non-deterministic.
860 ///
861 /// # Example
862 /// ```rust
863 /// # #[cfg(feature = "deploy")] {
864 /// # use hydro_lang::prelude::*;
865 /// # use futures::StreamExt;
866 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
867 /// let a = // singleton 1 ~> 5 ~> 10
868 /// # process.singleton(q!(5));
869 /// let b = process.singleton(q!(4));
870 /// a.threshold_greater_or_equal(b)
871 /// # }, |mut stream| async move {
872 /// // [4]
873 /// # assert_eq!(stream.next().await.unwrap(), 4);
874 /// # }));
875 /// # }
876 /// ```
877 pub fn threshold_greater_or_equal<B2: IsBounded>(
878 self,
879 threshold: Singleton<T, L, B2>,
880 ) -> Stream<T, L, B::UnderlyingBound>
881 where
882 T: Clone + PartialOrd,
883 B: IsMonotonic,
884 {
885 let threshold = threshold.make_bounded();
886 match self.try_make_bounded() {
887 Ok(bounded) => {
888 let uncasted = threshold
889 .zip(bounded)
890 .into_stream()
891 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
892
893 Stream::new(
894 uncasted.location.clone(),
895 uncasted.ir_node.replace(HydroNode::Placeholder),
896 )
897 }
898 Err(me) => {
899 let uncasted = sliced! {
900 let me = use(me, nondet!(/** thresholds are deterministic */));
901 let mut remaining_threshold = use::state(|l| {
902 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
903 as_option
904 });
905
906 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
907 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
908 passed.map(q!(|(t, _)| t))
909 };
910
911 Stream::new(
912 uncasted.location.clone(),
913 uncasted.ir_node.replace(HydroNode::Placeholder),
914 )
915 }
916 }
917 }
918
919 /// An operator which allows you to "name" a `HydroNode`.
920 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
921 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
922 {
923 let mut node = self.ir_node.borrow_mut();
924 let metadata = node.metadata_mut();
925 metadata.tag = Some(name.to_owned());
926 }
927 self
928 }
929}
930
931impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
932 type Output = Singleton<bool, L, B::UnderlyingBound>;
933
934 fn not(self) -> Self::Output {
935 self.map(q!(|b| !b))
936 }
937}
938
939impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
940where
941 L: Location<'a>,
942{
943 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
944 /// the inner `Option`.
945 ///
946 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
947 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
948 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
949 ///
950 /// # Example
951 /// ```rust
952 /// # #[cfg(feature = "deploy")] {
953 /// # use hydro_lang::prelude::*;
954 /// # use futures::StreamExt;
955 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
956 /// let tick = process.tick();
957 /// let singleton = tick.singleton(q!(Some(42)));
958 /// singleton.into_optional().all_ticks()
959 /// # }, |mut stream| async move {
960 /// // 42
961 /// # assert_eq!(stream.next().await.unwrap(), 42);
962 /// # }));
963 /// # }
964 /// ```
965 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
966 self.filter_map(q!(|v| v))
967 }
968}
969
970impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
971where
972 L: Location<'a>,
973{
974 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
975 ///
976 /// # Example
977 /// ```rust
978 /// # #[cfg(feature = "deploy")] {
979 /// # use hydro_lang::prelude::*;
980 /// # use futures::StreamExt;
981 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
982 /// let tick = process.tick();
983 /// // ticks are lazy by default, forces the second tick to run
984 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
985 ///
986 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
987 /// let b = tick.singleton(q!(true)); // true, true
988 /// a.and(b).all_ticks()
989 /// # }, |mut stream| async move {
990 /// // [true, false]
991 /// # for w in vec![true, false] {
992 /// # assert_eq!(stream.next().await.unwrap(), w);
993 /// # }
994 /// # }));
995 /// # }
996 /// ```
997 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
998 where
999 B: IsBounded,
1000 {
1001 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1002 }
1003
1004 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1005 ///
1006 /// # Example
1007 /// ```rust
1008 /// # #[cfg(feature = "deploy")] {
1009 /// # use hydro_lang::prelude::*;
1010 /// # use futures::StreamExt;
1011 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1012 /// let tick = process.tick();
1013 /// // ticks are lazy by default, forces the second tick to run
1014 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1015 ///
1016 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1017 /// let b = tick.singleton(q!(false)); // false, false
1018 /// a.or(b).all_ticks()
1019 /// # }, |mut stream| async move {
1020 /// // [true, false]
1021 /// # for w in vec![true, false] {
1022 /// # assert_eq!(stream.next().await.unwrap(), w);
1023 /// # }
1024 /// # }));
1025 /// # }
1026 /// ```
1027 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1028 where
1029 B: IsBounded,
1030 {
1031 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1032 }
1033}
1034
1035impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1036where
1037 L: Location<'a> + NoTick,
1038{
1039 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1040 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1041 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1042 /// all snapshots of this singleton into the atomic-associated tick will observe the
1043 /// same value each tick.
1044 ///
1045 /// # Non-Determinism
1046 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1047 /// the output singleton has a non-deterministic value since the snapshot can be at an
1048 /// arbitrary point in time.
1049 pub fn snapshot_atomic(
1050 self,
1051 tick: &Tick<L>,
1052 _nondet: NonDet,
1053 ) -> Singleton<T, Tick<L>, Bounded> {
1054 Singleton::new(
1055 tick.clone(),
1056 HydroNode::Batch {
1057 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1058 metadata: tick
1059 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1060 },
1061 )
1062 }
1063
1064 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1065 /// to the value will be asynchronously propagated.
1066 pub fn end_atomic(self) -> Singleton<T, L, B> {
1067 Singleton::new(
1068 self.location.tick.l.clone(),
1069 HydroNode::EndAtomic {
1070 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1071 metadata: self
1072 .location
1073 .tick
1074 .l
1075 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1076 },
1077 )
1078 }
1079}
1080
1081impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1082where
1083 L: Location<'a>,
1084{
1085 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1086 /// will observe the same version of the value and will be executed synchronously before any
1087 /// outputs are yielded (in [`Optional::end_atomic`]).
1088 ///
1089 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1090 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1091 /// a different version).
1092 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1093 let id = self.location.flow_state().borrow_mut().next_clock_id();
1094 let out_location = Atomic {
1095 tick: Tick {
1096 id,
1097 l: self.location.clone(),
1098 },
1099 };
1100 Singleton::new(
1101 out_location.clone(),
1102 HydroNode::BeginAtomic {
1103 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1104 metadata: out_location
1105 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1106 },
1107 )
1108 }
1109
1110 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1111 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1112 /// relevant data that contributed to the snapshot at tick `t`.
1113 ///
1114 /// # Non-Determinism
1115 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1116 /// the output singleton has a non-deterministic value since the snapshot can be at an
1117 /// arbitrary point in time.
1118 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1119 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1120 Singleton::new(
1121 tick.clone(),
1122 HydroNode::Batch {
1123 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1124 metadata: tick
1125 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1126 },
1127 )
1128 }
1129
1130 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1131 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1132 ///
1133 /// # Non-Determinism
1134 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1135 /// to non-deterministic batching and arrival of inputs, the output stream is
1136 /// non-deterministic.
1137 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1138 where
1139 L: NoTick,
1140 {
1141 sliced! {
1142 let snapshot = use(self, nondet);
1143 snapshot.into_stream()
1144 }
1145 .weaken_retries()
1146 }
1147
1148 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1149 /// value taken at various points in time. Because the input singleton may be
1150 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1151 /// represent the value of the singleton given some prefix of the streams leading up to
1152 /// it.
1153 ///
1154 /// # Non-Determinism
1155 /// The output stream is non-deterministic in which elements are sampled, since this
1156 /// is controlled by a clock.
1157 pub fn sample_every(
1158 self,
1159 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1160 nondet: NonDet,
1161 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1162 where
1163 L: NoTick + NoAtomic,
1164 {
1165 let samples = self.location.source_interval(interval, nondet);
1166 sliced! {
1167 let snapshot = use(self, nondet);
1168 let sample_batch = use(samples, nondet);
1169
1170 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1171 }
1172 .weaken_retries()
1173 }
1174
1175 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1176 /// implies that `B == Bounded`.
1177 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1178 where
1179 B: IsBounded,
1180 {
1181 Singleton::new(
1182 self.location.clone(),
1183 self.ir_node.replace(HydroNode::Placeholder),
1184 )
1185 }
1186
1187 #[expect(clippy::result_large_err, reason = "internal use only")]
1188 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1189 if B::UnderlyingBound::BOUNDED {
1190 Ok(Singleton::new(
1191 self.location.clone(),
1192 self.ir_node.replace(HydroNode::Placeholder),
1193 ))
1194 } else {
1195 Err(self)
1196 }
1197 }
1198
1199 /// Clones this bounded singleton into a tick, returning a singleton that has the
1200 /// same value as the outer singleton. Because the outer singleton is bounded, this
1201 /// is deterministic because there is only a single immutable version.
1202 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1203 where
1204 B: IsBounded,
1205 T: Clone,
1206 {
1207 // TODO(shadaj): avoid printing simulator logs for this snapshot
1208 self.snapshot(
1209 tick,
1210 nondet!(/** bounded top-level singleton so deterministic */),
1211 )
1212 }
1213
1214 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1215 ///
1216 /// # Example
1217 /// ```rust
1218 /// # #[cfg(feature = "deploy")] {
1219 /// # use hydro_lang::prelude::*;
1220 /// # use futures::StreamExt;
1221 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1222 /// let tick = process.tick();
1223 /// let batch_input = process
1224 /// .source_iter(q!(vec![123, 456]))
1225 /// .batch(&tick, nondet!(/** test */));
1226 /// batch_input.clone().chain(
1227 /// batch_input.count().into_stream()
1228 /// ).all_ticks()
1229 /// # }, |mut stream| async move {
1230 /// // [123, 456, 2]
1231 /// # for w in vec![123, 456, 2] {
1232 /// # assert_eq!(stream.next().await.unwrap(), w);
1233 /// # }
1234 /// # }));
1235 /// # }
1236 /// ```
1237 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1238 where
1239 B: IsBounded,
1240 {
1241 Stream::new(
1242 self.location.clone(),
1243 HydroNode::Cast {
1244 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1245 metadata: self.location.new_node_metadata(Stream::<
1246 T,
1247 Tick<L>,
1248 Bounded,
1249 TotalOrder,
1250 ExactlyOnce,
1251 >::collection_kind()),
1252 },
1253 )
1254 }
1255
1256 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1257 /// producing a singleton of the resolved output.
1258 ///
1259 /// This is useful when the singleton contains an async computation that must
1260 /// be awaited before further processing. The future is polled to completion
1261 /// before the output value is emitted.
1262 ///
1263 /// # Example
1264 /// ```rust
1265 /// # #[cfg(feature = "deploy")] {
1266 /// # use hydro_lang::prelude::*;
1267 /// # use futures::StreamExt;
1268 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1269 /// let tick = process.tick();
1270 /// let singleton = tick.singleton(q!(5));
1271 /// singleton
1272 /// .map(q!(|v| async move { v * 2 }))
1273 /// .resolve_future_blocking()
1274 /// .all_ticks()
1275 /// # }, |mut stream| async move {
1276 /// // 10
1277 /// # assert_eq!(stream.next().await.unwrap(), 10);
1278 /// # }));
1279 /// # }
1280 /// ```
1281 pub fn resolve_future_blocking(
1282 self,
1283 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1284 where
1285 T: Future,
1286 B: IsBounded,
1287 {
1288 Singleton::new(
1289 self.location.clone(),
1290 HydroNode::ResolveFuturesBlocking {
1291 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1292 metadata: self
1293 .location
1294 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1295 },
1296 )
1297 }
1298}
1299
1300impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1301where
1302 L: Location<'a>,
1303{
1304 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1305 /// which will stream the value computed in _each_ tick as a separate stream element.
1306 ///
1307 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1308 /// producing one element in the output for each tick. This is useful for batched computations,
1309 /// where the results from each tick must be combined together.
1310 ///
1311 /// # Example
1312 /// ```rust
1313 /// # #[cfg(feature = "deploy")] {
1314 /// # use hydro_lang::prelude::*;
1315 /// # use futures::StreamExt;
1316 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1317 /// let tick = process.tick();
1318 /// # // ticks are lazy by default, forces the second tick to run
1319 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1320 /// # let batch_first_tick = process
1321 /// # .source_iter(q!(vec![1]))
1322 /// # .batch(&tick, nondet!(/** test */));
1323 /// # let batch_second_tick = process
1324 /// # .source_iter(q!(vec![1, 2, 3]))
1325 /// # .batch(&tick, nondet!(/** test */))
1326 /// # .defer_tick(); // appears on the second tick
1327 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1328 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1329 /// .count()
1330 /// .all_ticks()
1331 /// # }, |mut stream| async move {
1332 /// // [1, 3]
1333 /// # for w in vec![1, 3] {
1334 /// # assert_eq!(stream.next().await.unwrap(), w);
1335 /// # }
1336 /// # }));
1337 /// # }
1338 /// ```
1339 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1340 self.into_stream().all_ticks()
1341 }
1342
1343 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1344 /// which will stream the value computed in _each_ tick as a separate stream element.
1345 ///
1346 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1347 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1348 /// singleton's [`Tick`] context.
1349 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1350 self.into_stream().all_ticks_atomic()
1351 }
1352
1353 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1354 /// be asynchronously updated with the latest value of the singleton inside the tick.
1355 ///
1356 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1357 /// tick that tracks the inner value. This is useful for getting the value as of the
1358 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1359 ///
1360 /// # Example
1361 /// ```rust
1362 /// # #[cfg(feature = "deploy")] {
1363 /// # use hydro_lang::prelude::*;
1364 /// # use futures::StreamExt;
1365 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1366 /// let tick = process.tick();
1367 /// # // ticks are lazy by default, forces the second tick to run
1368 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1369 /// # let batch_first_tick = process
1370 /// # .source_iter(q!(vec![1]))
1371 /// # .batch(&tick, nondet!(/** test */));
1372 /// # let batch_second_tick = process
1373 /// # .source_iter(q!(vec![1, 2, 3]))
1374 /// # .batch(&tick, nondet!(/** test */))
1375 /// # .defer_tick(); // appears on the second tick
1376 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1377 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1378 /// .count()
1379 /// .latest()
1380 /// # .sample_eager(nondet!(/** test */))
1381 /// # }, |mut stream| async move {
1382 /// // asynchronously changes from 1 ~> 3
1383 /// # for w in vec![1, 3] {
1384 /// # assert_eq!(stream.next().await.unwrap(), w);
1385 /// # }
1386 /// # }));
1387 /// # }
1388 /// ```
1389 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1390 Singleton::new(
1391 self.location.outer().clone(),
1392 HydroNode::YieldConcat {
1393 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1394 metadata: self
1395 .location
1396 .outer()
1397 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1398 },
1399 )
1400 }
1401
1402 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1403 /// be updated with the latest value of the singleton inside the tick.
1404 ///
1405 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1406 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1407 /// singleton's [`Tick`] context.
1408 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1409 let out_location = Atomic {
1410 tick: self.location.clone(),
1411 };
1412 Singleton::new(
1413 out_location.clone(),
1414 HydroNode::YieldConcat {
1415 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1416 metadata: out_location
1417 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1418 },
1419 )
1420 }
1421}
1422
1423#[doc(hidden)]
1424/// Helper trait that determines the output collection type for [`Singleton::zip`].
1425///
1426/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1427/// [`Singleton`].
1428#[sealed::sealed]
1429pub trait ZipResult<'a, Other> {
1430 /// The output collection type.
1431 type Out;
1432 /// The type of the tupled output value.
1433 type ElementType;
1434 /// The type of the other collection's value.
1435 type OtherType;
1436 /// The location where the tupled result will be materialized.
1437 type Location: Location<'a>;
1438
1439 /// The location of the second input to the `zip`.
1440 fn other_location(other: &Other) -> Self::Location;
1441 /// The IR node of the second input to the `zip`.
1442 fn other_ir_node(other: Other) -> HydroNode;
1443
1444 /// Constructs the output live collection given an IR node containing the zip result.
1445 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1446}
1447
1448#[sealed::sealed]
1449impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1450where
1451 L: Location<'a>,
1452{
1453 type Out = Singleton<(T, U), L, B>;
1454 type ElementType = (T, U);
1455 type OtherType = U;
1456 type Location = L;
1457
1458 fn other_location(other: &Singleton<U, L, B>) -> L {
1459 other.location.clone()
1460 }
1461
1462 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1463 other.ir_node.replace(HydroNode::Placeholder)
1464 }
1465
1466 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1467 Singleton::new(
1468 location.clone(),
1469 HydroNode::Cast {
1470 inner: Box::new(ir_node),
1471 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1472 },
1473 )
1474 }
1475}
1476
1477#[sealed::sealed]
1478impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1479 for Singleton<T, L, B>
1480where
1481 L: Location<'a>,
1482{
1483 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1484 type ElementType = (T, U);
1485 type OtherType = U;
1486 type Location = L;
1487
1488 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1489 other.location.clone()
1490 }
1491
1492 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1493 other.ir_node.replace(HydroNode::Placeholder)
1494 }
1495
1496 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1497 Optional::new(location, ir_node)
1498 }
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503 #[cfg(feature = "deploy")]
1504 use futures::{SinkExt, StreamExt};
1505 #[cfg(feature = "deploy")]
1506 use hydro_deploy::Deployment;
1507 #[cfg(any(feature = "deploy", feature = "sim"))]
1508 use stageleft::q;
1509
1510 #[cfg(any(feature = "deploy", feature = "sim"))]
1511 use crate::compile::builder::FlowBuilder;
1512 #[cfg(feature = "deploy")]
1513 use crate::live_collections::stream::ExactlyOnce;
1514 #[cfg(any(feature = "deploy", feature = "sim"))]
1515 use crate::location::Location;
1516 #[cfg(any(feature = "deploy", feature = "sim"))]
1517 use crate::nondet::nondet;
1518
1519 #[cfg(feature = "deploy")]
1520 #[tokio::test]
1521 async fn tick_cycle_cardinality() {
1522 let mut deployment = Deployment::new();
1523
1524 let mut flow = FlowBuilder::new();
1525 let node = flow.process::<()>();
1526 let external = flow.external::<()>();
1527
1528 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1529
1530 let node_tick = node.tick();
1531 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1532 let counts = singleton
1533 .clone()
1534 .into_stream()
1535 .count()
1536 .filter_if(
1537 input
1538 .batch(&node_tick, nondet!(/** testing */))
1539 .first()
1540 .is_some(),
1541 )
1542 .all_ticks()
1543 .send_bincode_external(&external);
1544 complete_cycle.complete_next_tick(singleton);
1545
1546 let nodes = flow
1547 .with_process(&node, deployment.Localhost())
1548 .with_external(&external, deployment.Localhost())
1549 .deploy(&mut deployment);
1550
1551 deployment.deploy().await.unwrap();
1552
1553 let mut tick_trigger = nodes.connect(input_send).await;
1554 let mut external_out = nodes.connect(counts).await;
1555
1556 deployment.start().await.unwrap();
1557
1558 tick_trigger.send(()).await.unwrap();
1559
1560 assert_eq!(external_out.next().await.unwrap(), 1);
1561
1562 tick_trigger.send(()).await.unwrap();
1563
1564 assert_eq!(external_out.next().await.unwrap(), 1);
1565 }
1566
1567 #[cfg(feature = "sim")]
1568 #[test]
1569 #[should_panic]
1570 fn sim_fold_intermediate_states() {
1571 let mut flow = FlowBuilder::new();
1572 let node = flow.process::<()>();
1573
1574 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1575 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1576
1577 let tick = node.tick();
1578 let batch = folded.snapshot(&tick, nondet!(/** test */));
1579 let out_recv = batch.all_ticks().sim_output();
1580
1581 flow.sim().exhaustive(async || {
1582 assert_eq!(out_recv.next().await.unwrap(), 10);
1583 });
1584 }
1585
1586 #[cfg(feature = "sim")]
1587 #[test]
1588 fn sim_fold_intermediate_state_count() {
1589 let mut flow = FlowBuilder::new();
1590 let node = flow.process::<()>();
1591
1592 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1593 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1594
1595 let tick = node.tick();
1596 let batch = folded.snapshot(&tick, nondet!(/** test */));
1597 let out_recv = batch.all_ticks().sim_output();
1598
1599 let instance_count = flow.sim().exhaustive(async || {
1600 let out = out_recv.collect::<Vec<_>>().await;
1601 assert_eq!(out.last(), Some(&10));
1602 });
1603
1604 assert_eq!(
1605 instance_count,
1606 16 // 2^4 possible subsets of intermediates (including initial state)
1607 )
1608 }
1609
1610 #[cfg(feature = "sim")]
1611 #[test]
1612 fn sim_fold_no_repeat_initial() {
1613 // check that we don't repeat the initial state of the fold in autonomous decisions
1614
1615 let mut flow = FlowBuilder::new();
1616 let node = flow.process::<()>();
1617
1618 let (in_port, input) = node.sim_input();
1619 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1620
1621 let tick = node.tick();
1622 let batch = folded.snapshot(&tick, nondet!(/** test */));
1623 let out_recv = batch.all_ticks().sim_output();
1624
1625 flow.sim().exhaustive(async || {
1626 assert_eq!(out_recv.next().await.unwrap(), 0);
1627
1628 in_port.send(123);
1629
1630 assert_eq!(out_recv.next().await.unwrap(), 123);
1631 });
1632 }
1633
1634 #[cfg(feature = "sim")]
1635 #[test]
1636 #[should_panic]
1637 fn sim_fold_repeats_snapshots() {
1638 // when the tick is driven by a snapshot AND something else, the snapshot can
1639 // "stutter" and repeat the same state multiple times
1640
1641 let mut flow = FlowBuilder::new();
1642 let node = flow.process::<()>();
1643
1644 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1645 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1646
1647 let tick = node.tick();
1648 let batch = source
1649 .batch(&tick, nondet!(/** test */))
1650 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1651 let out_recv = batch.all_ticks().sim_output();
1652
1653 flow.sim().exhaustive(async || {
1654 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1655 {
1656 panic!("repeated snapshot");
1657 }
1658 });
1659 }
1660
1661 #[cfg(feature = "sim")]
1662 #[test]
1663 fn sim_fold_repeats_snapshots_count() {
1664 // check the number of instances
1665 let mut flow = FlowBuilder::new();
1666 let node = flow.process::<()>();
1667
1668 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1669 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1670
1671 let tick = node.tick();
1672 let batch = source
1673 .batch(&tick, nondet!(/** test */))
1674 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1675 let out_recv = batch.all_ticks().sim_output();
1676
1677 let count = flow.sim().exhaustive(async || {
1678 let _ = out_recv.collect::<Vec<_>>().await;
1679 });
1680
1681 assert_eq!(count, 52);
1682 // don't have a combinatorial explanation for this number yet, but checked via logs
1683 }
1684
1685 #[cfg(feature = "sim")]
1686 #[test]
1687 fn sim_top_level_singleton_exhaustive() {
1688 // ensures that top-level singletons have only one snapshot
1689 let mut flow = FlowBuilder::new();
1690 let node = flow.process::<()>();
1691
1692 let singleton = node.singleton(q!(1));
1693 let tick = node.tick();
1694 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1695 let out_recv = batch.all_ticks().sim_output();
1696
1697 let count = flow.sim().exhaustive(async || {
1698 let _ = out_recv.collect::<Vec<_>>().await;
1699 });
1700
1701 assert_eq!(count, 1);
1702 }
1703
1704 #[cfg(feature = "sim")]
1705 #[test]
1706 fn sim_top_level_singleton_join_count() {
1707 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1708 // exploration
1709
1710 let mut flow = FlowBuilder::new();
1711 let node = flow.process::<()>();
1712
1713 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1714 let tick = node.tick();
1715 let batch = source_iter
1716 .batch(&tick, nondet!(/** test */))
1717 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1718 let out_recv = batch.all_ticks().sim_output();
1719
1720 let instance_count = flow.sim().exhaustive(async || {
1721 let _ = out_recv.collect::<Vec<_>>().await;
1722 });
1723
1724 assert_eq!(
1725 instance_count,
1726 16 // 2^4 ways to split up (including a possibly empty first batch)
1727 )
1728 }
1729
1730 #[cfg(feature = "sim")]
1731 #[test]
1732 fn top_level_singleton_into_stream_no_replay() {
1733 let mut flow = FlowBuilder::new();
1734 let node = flow.process::<()>();
1735
1736 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1737 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1738
1739 let out_recv = folded.into_stream().sim_output();
1740
1741 flow.sim().exhaustive(async || {
1742 out_recv.assert_yields_only([10]).await;
1743 });
1744 }
1745
1746 #[cfg(feature = "sim")]
1747 #[test]
1748 fn inside_tick_singleton_zip() {
1749 use crate::live_collections::Stream;
1750 use crate::live_collections::sliced::sliced;
1751
1752 let mut flow = FlowBuilder::new();
1753 let node = flow.process::<()>();
1754
1755 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1756 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1757
1758 let out_recv = sliced! {
1759 let v = use(folded, nondet!(/** test */));
1760 v.clone().zip(v).into_stream()
1761 }
1762 .sim_output();
1763
1764 let count = flow.sim().exhaustive(async || {
1765 let out = out_recv.collect::<Vec<_>>().await;
1766 assert_eq!(out.last(), Some(&(3, 3)));
1767 });
1768
1769 assert_eq!(count, 4);
1770 }
1771}