hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46 L: Location<'a>,
47{
48 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49 Singleton::new(singleton.location, singleton.ir_node.into_inner())
50 }
51}
52
53impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
60 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
61 location.clone(),
62 HydroNode::DeferTick {
63 input: Box::new(HydroNode::CycleSource {
64 ident,
65 metadata: location.new_node_metadata(Self::collection_kind()),
66 }),
67 metadata: location
68 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
69 },
70 );
71
72 from_previous_tick.unwrap_or(initial)
73 }
74}
75
76impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
77where
78 L: Location<'a>,
79{
80 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
81 assert_eq!(
82 Location::id(&self.location),
83 expected_location,
84 "locations do not match"
85 );
86 self.location
87 .flow_state()
88 .borrow_mut()
89 .push_root(HydroRoot::CycleSink {
90 ident,
91 input: Box::new(self.ir_node.into_inner()),
92 op_metadata: HydroIrOpMetadata::new(),
93 });
94 }
95}
96
97impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
104 Singleton::new(
105 location.clone(),
106 HydroNode::CycleSource {
107 ident,
108 metadata: location.new_node_metadata(Self::collection_kind()),
109 },
110 )
111 }
112}
113
114impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116 L: Location<'a>,
117{
118 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
119 assert_eq!(
120 Location::id(&self.location),
121 expected_location,
122 "locations do not match"
123 );
124 self.location
125 .flow_state()
126 .borrow_mut()
127 .push_root(HydroRoot::CycleSink {
128 ident,
129 input: Box::new(self.ir_node.into_inner()),
130 op_metadata: HydroIrOpMetadata::new(),
131 });
132 }
133}
134
135impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
136where
137 L: Location<'a> + NoTick,
138{
139 type Location = L;
140
141 fn create_source(ident: syn::Ident, location: L) -> Self {
142 Singleton::new(
143 location.clone(),
144 HydroNode::CycleSource {
145 ident,
146 metadata: location.new_node_metadata(Self::collection_kind()),
147 },
148 )
149 }
150}
151
152impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
174where
175 T: Clone,
176 L: Location<'a>,
177{
178 fn clone(&self) -> Self {
179 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
180 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
181 *self.ir_node.borrow_mut() = HydroNode::Tee {
182 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
183 metadata: self.location.new_node_metadata(Self::collection_kind()),
184 };
185 }
186
187 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
188 Singleton {
189 location: self.location.clone(),
190 ir_node: HydroNode::Tee {
191 inner: TeeNode(inner.0.clone()),
192 metadata: metadata.clone(),
193 }
194 .into(),
195 _phantom: PhantomData,
196 }
197 } else {
198 unreachable!()
199 }
200 }
201}
202
203#[cfg(stageleft_runtime)]
204fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
205 me: Singleton<T, Tick<L>, B>,
206 other: O,
207) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
208where
209 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
210{
211 check_matching_location(
212 &me.location,
213 &Singleton::<T, Tick<L>, B>::other_location(&other),
214 );
215
216 Singleton::<T, Tick<L>, B>::make(
217 me.location.clone(),
218 HydroNode::CrossSingleton {
219 left: Box::new(me.ir_node.into_inner()),
220 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
221 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
222 bound: B::BOUND_KIND,
223 element_type: stageleft::quote_type::<
224 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
225 >()
226 .into(),
227 }),
228 },
229 )
230}
231
232impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
233where
234 L: Location<'a>,
235{
236 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
237 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
238 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
239 Singleton {
240 location,
241 ir_node: RefCell::new(ir_node),
242 _phantom: PhantomData,
243 }
244 }
245
246 pub(crate) fn collection_kind() -> CollectionKind {
247 CollectionKind::Singleton {
248 bound: B::BOUND_KIND,
249 element_type: stageleft::quote_type::<T>().into(),
250 }
251 }
252
253 /// Transforms the singleton value by applying a function `f` to it,
254 /// continuously as the input is updated.
255 ///
256 /// # Example
257 /// ```rust
258 /// # use hydro_lang::prelude::*;
259 /// # use futures::StreamExt;
260 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
261 /// let tick = process.tick();
262 /// let singleton = tick.singleton(q!(5));
263 /// singleton.map(q!(|v| v * 2)).all_ticks()
264 /// # }, |mut stream| async move {
265 /// // 10
266 /// # assert_eq!(stream.next().await.unwrap(), 10);
267 /// # }));
268 /// ```
269 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
270 where
271 F: Fn(T) -> U + 'a,
272 {
273 let f = f.splice_fn1_ctx(&self.location).into();
274 Singleton::new(
275 self.location.clone(),
276 HydroNode::Map {
277 f,
278 input: Box::new(self.ir_node.into_inner()),
279 metadata: self
280 .location
281 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
282 },
283 )
284 }
285
286 /// Transforms the singleton value by applying a function `f` to it and then flattening
287 /// the result into a stream, preserving the order of elements.
288 ///
289 /// The function `f` is applied to the singleton value to produce an iterator, and all items
290 /// from that iterator are emitted in the output stream in deterministic order.
291 ///
292 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
293 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
294 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
295 ///
296 /// # Example
297 /// ```rust
298 /// # use hydro_lang::prelude::*;
299 /// # use futures::StreamExt;
300 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
301 /// let tick = process.tick();
302 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
303 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
304 /// # }, |mut stream| async move {
305 /// // 1, 2, 3
306 /// # for w in vec![1, 2, 3] {
307 /// # assert_eq!(stream.next().await.unwrap(), w);
308 /// # }
309 /// # }));
310 /// ```
311 pub fn flat_map_ordered<U, I, F>(
312 self,
313 f: impl IntoQuotedMut<'a, F, L>,
314 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
315 where
316 I: IntoIterator<Item = U>,
317 F: Fn(T) -> I + 'a,
318 {
319 let f = f.splice_fn1_ctx(&self.location).into();
320 Stream::new(
321 self.location.clone(),
322 HydroNode::FlatMap {
323 f,
324 input: Box::new(self.ir_node.into_inner()),
325 metadata: self.location.new_node_metadata(
326 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
327 ),
328 },
329 )
330 }
331
332 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
333 /// for the output type `I` to produce items in any order.
334 ///
335 /// The function `f` is applied to the singleton value to produce an iterator, and all items
336 /// from that iterator are emitted in the output stream in non-deterministic order.
337 ///
338 /// # Example
339 /// ```rust
340 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
341 /// # use futures::StreamExt;
342 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
343 /// let tick = process.tick();
344 /// let singleton = tick.singleton(q!(
345 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
346 /// ));
347 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
348 /// # }, |mut stream| async move {
349 /// // 1, 2, 3, but in no particular order
350 /// # let mut results = Vec::new();
351 /// # for _ in 0..3 {
352 /// # results.push(stream.next().await.unwrap());
353 /// # }
354 /// # results.sort();
355 /// # assert_eq!(results, vec![1, 2, 3]);
356 /// # }));
357 /// ```
358 pub fn flat_map_unordered<U, I, F>(
359 self,
360 f: impl IntoQuotedMut<'a, F, L>,
361 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
362 where
363 I: IntoIterator<Item = U>,
364 F: Fn(T) -> I + 'a,
365 {
366 let f = f.splice_fn1_ctx(&self.location).into();
367 Stream::new(
368 self.location.clone(),
369 HydroNode::FlatMap {
370 f,
371 input: Box::new(self.ir_node.into_inner()),
372 metadata: self
373 .location
374 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
375 },
376 )
377 }
378
379 /// Flattens the singleton value into a stream, preserving the order of elements.
380 ///
381 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
382 /// are emitted in the output stream in deterministic order.
383 ///
384 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
385 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
386 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
387 ///
388 /// # Example
389 /// ```rust
390 /// # use hydro_lang::prelude::*;
391 /// # use futures::StreamExt;
392 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
393 /// let tick = process.tick();
394 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
395 /// singleton.flatten_ordered().all_ticks()
396 /// # }, |mut stream| async move {
397 /// // 1, 2, 3
398 /// # for w in vec![1, 2, 3] {
399 /// # assert_eq!(stream.next().await.unwrap(), w);
400 /// # }
401 /// # }));
402 /// ```
403 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
404 where
405 T: IntoIterator<Item = U>,
406 {
407 self.flat_map_ordered(q!(|x| x))
408 }
409
410 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
411 /// for the element type `T` to produce items in any order.
412 ///
413 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
414 /// are emitted in the output stream in non-deterministic order.
415 ///
416 /// # Example
417 /// ```rust
418 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
419 /// # use futures::StreamExt;
420 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
421 /// let tick = process.tick();
422 /// let singleton = tick.singleton(q!(
423 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
424 /// ));
425 /// singleton.flatten_unordered().all_ticks()
426 /// # }, |mut stream| async move {
427 /// // 1, 2, 3, but in no particular order
428 /// # let mut results = Vec::new();
429 /// # for _ in 0..3 {
430 /// # results.push(stream.next().await.unwrap());
431 /// # }
432 /// # results.sort();
433 /// # assert_eq!(results, vec![1, 2, 3]);
434 /// # }));
435 /// ```
436 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
437 where
438 T: IntoIterator<Item = U>,
439 {
440 self.flat_map_unordered(q!(|x| x))
441 }
442
443 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
444 ///
445 /// If the predicate returns `true`, the output optional contains the same value.
446 /// If the predicate returns `false`, the output optional is empty.
447 ///
448 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
449 /// not modify or take ownership of the value. If you need to modify the value while filtering
450 /// use [`Singleton::filter_map`] instead.
451 ///
452 /// # Example
453 /// ```rust
454 /// # use hydro_lang::prelude::*;
455 /// # use futures::StreamExt;
456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457 /// let tick = process.tick();
458 /// let singleton = tick.singleton(q!(5));
459 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
460 /// # }, |mut stream| async move {
461 /// // 5
462 /// # assert_eq!(stream.next().await.unwrap(), 5);
463 /// # }));
464 /// ```
465 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
466 where
467 F: Fn(&T) -> bool + 'a,
468 {
469 let f = f.splice_fn1_borrow_ctx(&self.location).into();
470 Optional::new(
471 self.location.clone(),
472 HydroNode::Filter {
473 f,
474 input: Box::new(self.ir_node.into_inner()),
475 metadata: self
476 .location
477 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
478 },
479 )
480 }
481
482 /// An operator that both filters and maps. It yields the value only if the supplied
483 /// closure `f` returns `Some(value)`.
484 ///
485 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
486 /// If the closure returns `None`, the output optional is empty.
487 ///
488 /// # Example
489 /// ```rust
490 /// # use hydro_lang::prelude::*;
491 /// # use futures::StreamExt;
492 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
493 /// let tick = process.tick();
494 /// let singleton = tick.singleton(q!("42"));
495 /// singleton
496 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
497 /// .all_ticks()
498 /// # }, |mut stream| async move {
499 /// // 42
500 /// # assert_eq!(stream.next().await.unwrap(), 42);
501 /// # }));
502 /// ```
503 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
504 where
505 F: Fn(T) -> Option<U> + 'a,
506 {
507 let f = f.splice_fn1_ctx(&self.location).into();
508 Optional::new(
509 self.location.clone(),
510 HydroNode::FilterMap {
511 f,
512 input: Box::new(self.ir_node.into_inner()),
513 metadata: self
514 .location
515 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
516 },
517 )
518 }
519
520 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
521 ///
522 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
523 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
524 /// non-null. This is useful for combining several pieces of state together.
525 ///
526 /// # Example
527 /// ```rust
528 /// # use hydro_lang::prelude::*;
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531 /// let tick = process.tick();
532 /// let numbers = process
533 /// .source_iter(q!(vec![123, 456]))
534 /// .batch(&tick, nondet!(/** test */));
535 /// let count = numbers.clone().count(); // Singleton
536 /// let max = numbers.max(); // Optional
537 /// count.zip(max).all_ticks()
538 /// # }, |mut stream| async move {
539 /// // [(2, 456)]
540 /// # for w in vec![(2, 456)] {
541 /// # assert_eq!(stream.next().await.unwrap(), w);
542 /// # }
543 /// # }));
544 /// ```
545 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
546 where
547 Self: ZipResult<'a, O, Location = L>,
548 {
549 check_matching_location(&self.location, &Self::other_location(&other));
550
551 if L::is_top_level()
552 && let Some(tick) = self.location.try_tick()
553 {
554 let out = zip_inside_tick(
555 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
556 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
557 Self::other_location(&other),
558 Self::other_ir_node(other),
559 )
560 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
561 )
562 .latest();
563
564 Self::make(out.location, out.ir_node.into_inner())
565 } else {
566 Self::make(
567 self.location.clone(),
568 HydroNode::CrossSingleton {
569 left: Box::new(self.ir_node.into_inner()),
570 right: Box::new(Self::other_ir_node(other)),
571 metadata: self.location.new_node_metadata(CollectionKind::Optional {
572 bound: B::BOUND_KIND,
573 element_type: stageleft::quote_type::<
574 <Self as ZipResult<'a, O>>::ElementType,
575 >()
576 .into(),
577 }),
578 },
579 )
580 }
581 }
582
583 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
584 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
585 ///
586 /// Useful for conditionally processing, such as only emitting a singleton's value outside
587 /// a tick if some other condition is satisfied.
588 ///
589 /// # Example
590 /// ```rust
591 /// # use hydro_lang::prelude::*;
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594 /// let tick = process.tick();
595 /// // ticks are lazy by default, forces the second tick to run
596 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
597 ///
598 /// let batch_first_tick = process
599 /// .source_iter(q!(vec![1]))
600 /// .batch(&tick, nondet!(/** test */));
601 /// let batch_second_tick = process
602 /// .source_iter(q!(vec![1, 2, 3]))
603 /// .batch(&tick, nondet!(/** test */))
604 /// .defer_tick(); // appears on the second tick
605 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
606 /// batch_first_tick.chain(batch_second_tick).count()
607 /// .filter_if_some(some_on_first_tick)
608 /// .all_ticks()
609 /// # }, |mut stream| async move {
610 /// // [1]
611 /// # for w in vec![1] {
612 /// # assert_eq!(stream.next().await.unwrap(), w);
613 /// # }
614 /// # }));
615 /// ```
616 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
617 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
618 .map(q!(|(d, _signal)| d))
619 }
620
621 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
622 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
623 ///
624 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
625 /// the condition.
626 ///
627 /// # Example
628 /// ```rust
629 /// # use hydro_lang::prelude::*;
630 /// # use futures::StreamExt;
631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
632 /// let tick = process.tick();
633 /// // ticks are lazy by default, forces the second tick to run
634 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
635 ///
636 /// let batch_first_tick = process
637 /// .source_iter(q!(vec![1]))
638 /// .batch(&tick, nondet!(/** test */));
639 /// let batch_second_tick = process
640 /// .source_iter(q!(vec![1, 2, 3]))
641 /// .batch(&tick, nondet!(/** test */))
642 /// .defer_tick(); // appears on the second tick
643 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
644 /// batch_first_tick.chain(batch_second_tick).count()
645 /// .filter_if_none(some_on_first_tick)
646 /// .all_ticks()
647 /// # }, |mut stream| async move {
648 /// // [3]
649 /// # for w in vec![3] {
650 /// # assert_eq!(stream.next().await.unwrap(), w);
651 /// # }
652 /// # }));
653 /// ```
654 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
655 self.filter_if_some(
656 other
657 .map(q!(|_| ()))
658 .into_singleton()
659 .filter(q!(|o| o.is_none())),
660 )
661 }
662
663 /// An operator which allows you to "name" a `HydroNode`.
664 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
665 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
666 {
667 let mut node = self.ir_node.borrow_mut();
668 let metadata = node.metadata_mut();
669 metadata.tag = Some(name.to_string());
670 }
671 self
672 }
673}
674
675impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
676where
677 L: Location<'a> + NoTick,
678{
679 /// Returns a singleton value corresponding to the latest snapshot of the singleton
680 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
681 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
682 /// all snapshots of this singleton into the atomic-associated tick will observe the
683 /// same value each tick.
684 ///
685 /// # Non-Determinism
686 /// Because this picks a snapshot of a singleton whose value is continuously changing,
687 /// the output singleton has a non-deterministic value since the snapshot can be at an
688 /// arbitrary point in time.
689 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
690 Singleton::new(
691 self.location.clone().tick,
692 HydroNode::Batch {
693 inner: Box::new(self.ir_node.into_inner()),
694 metadata: self
695 .location
696 .tick
697 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
698 },
699 )
700 }
701
702 /// Returns this singleton back into a top-level, asynchronous execution context where updates
703 /// to the value will be asynchronously propagated.
704 pub fn end_atomic(self) -> Singleton<T, L, B> {
705 Singleton::new(
706 self.location.tick.l.clone(),
707 HydroNode::EndAtomic {
708 inner: Box::new(self.ir_node.into_inner()),
709 metadata: self
710 .location
711 .tick
712 .l
713 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
714 },
715 )
716 }
717}
718
719impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
720where
721 L: Location<'a>,
722{
723 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
724 /// will observe the same version of the value and will be executed synchronously before any
725 /// outputs are yielded (in [`Optional::end_atomic`]).
726 ///
727 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
728 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
729 /// a different version).
730 ///
731 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
732 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
733 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
734 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
735 let out_location = Atomic { tick: tick.clone() };
736 Singleton::new(
737 out_location.clone(),
738 HydroNode::BeginAtomic {
739 inner: Box::new(self.ir_node.into_inner()),
740 metadata: out_location
741 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
742 },
743 )
744 }
745
746 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
747 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
748 /// relevant data that contributed to the snapshot at tick `t`.
749 ///
750 /// # Non-Determinism
751 /// Because this picks a snapshot of a singleton whose value is continuously changing,
752 /// the output singleton has a non-deterministic value since the snapshot can be at an
753 /// arbitrary point in time.
754 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
755 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
756 Singleton::new(
757 tick.clone(),
758 HydroNode::Batch {
759 inner: Box::new(self.ir_node.into_inner()),
760 metadata: tick
761 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
762 },
763 )
764 }
765
766 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
767 /// with order corresponding to increasing prefixes of data contributing to the singleton.
768 ///
769 /// # Non-Determinism
770 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
771 /// to non-deterministic batching and arrival of inputs, the output stream is
772 /// non-deterministic.
773 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
774 where
775 L: NoTick,
776 {
777 let tick = self.location.tick();
778 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
779 }
780
781 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
782 /// value taken at various points in time. Because the input singleton may be
783 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
784 /// represent the value of the singleton given some prefix of the streams leading up to
785 /// it.
786 ///
787 /// # Non-Determinism
788 /// The output stream is non-deterministic in which elements are sampled, since this
789 /// is controlled by a clock.
790 pub fn sample_every(
791 self,
792 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
793 nondet: NonDet,
794 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
795 where
796 L: NoTick + NoAtomic,
797 {
798 let samples = self.location.source_interval(interval, nondet);
799 let tick = self.location.tick();
800
801 self.snapshot(&tick, nondet)
802 .filter_if_some(samples.batch(&tick, nondet).first())
803 .all_ticks()
804 .weakest_retries()
805 }
806}
807
808impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
809where
810 L: Location<'a>,
811{
812 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
813 /// which will stream the value computed in _each_ tick as a separate stream element.
814 ///
815 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
816 /// producing one element in the output for each tick. This is useful for batched computations,
817 /// where the results from each tick must be combined together.
818 ///
819 /// # Example
820 /// ```rust
821 /// # use hydro_lang::prelude::*;
822 /// # use futures::StreamExt;
823 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
824 /// let tick = process.tick();
825 /// # // ticks are lazy by default, forces the second tick to run
826 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
827 /// # let batch_first_tick = process
828 /// # .source_iter(q!(vec![1]))
829 /// # .batch(&tick, nondet!(/** test */));
830 /// # let batch_second_tick = process
831 /// # .source_iter(q!(vec![1, 2, 3]))
832 /// # .batch(&tick, nondet!(/** test */))
833 /// # .defer_tick(); // appears on the second tick
834 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
835 /// input_batch // first tick: [1], second tick: [1, 2, 3]
836 /// .count()
837 /// .all_ticks()
838 /// # }, |mut stream| async move {
839 /// // [1, 3]
840 /// # for w in vec![1, 3] {
841 /// # assert_eq!(stream.next().await.unwrap(), w);
842 /// # }
843 /// # }));
844 /// ```
845 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
846 self.into_stream().all_ticks()
847 }
848
849 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
850 /// which will stream the value computed in _each_ tick as a separate stream element.
851 ///
852 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
853 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
854 /// singleton's [`Tick`] context.
855 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
856 self.into_stream().all_ticks_atomic()
857 }
858
859 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
860 /// be asynchronously updated with the latest value of the singleton inside the tick.
861 ///
862 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
863 /// tick that tracks the inner value. This is useful for getting the value as of the
864 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
865 ///
866 /// # Example
867 /// ```rust
868 /// # use hydro_lang::prelude::*;
869 /// # use futures::StreamExt;
870 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
871 /// let tick = process.tick();
872 /// # // ticks are lazy by default, forces the second tick to run
873 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
874 /// # let batch_first_tick = process
875 /// # .source_iter(q!(vec![1]))
876 /// # .batch(&tick, nondet!(/** test */));
877 /// # let batch_second_tick = process
878 /// # .source_iter(q!(vec![1, 2, 3]))
879 /// # .batch(&tick, nondet!(/** test */))
880 /// # .defer_tick(); // appears on the second tick
881 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
882 /// input_batch // first tick: [1], second tick: [1, 2, 3]
883 /// .count()
884 /// .latest()
885 /// # .sample_eager(nondet!(/** test */))
886 /// # }, |mut stream| async move {
887 /// // asynchronously changes from 1 ~> 3
888 /// # for w in vec![1, 3] {
889 /// # assert_eq!(stream.next().await.unwrap(), w);
890 /// # }
891 /// # }));
892 /// ```
893 pub fn latest(self) -> Singleton<T, L, Unbounded> {
894 Singleton::new(
895 self.location.outer().clone(),
896 HydroNode::YieldConcat {
897 inner: Box::new(self.ir_node.into_inner()),
898 metadata: self
899 .location
900 .outer()
901 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
902 },
903 )
904 }
905
906 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
907 /// be updated with the latest value of the singleton inside the tick.
908 ///
909 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
910 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
911 /// singleton's [`Tick`] context.
912 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
913 let out_location = Atomic {
914 tick: self.location.clone(),
915 };
916 Singleton::new(
917 out_location.clone(),
918 HydroNode::YieldConcat {
919 inner: Box::new(self.ir_node.into_inner()),
920 metadata: out_location
921 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
922 },
923 )
924 }
925
926 #[deprecated(note = "use .into_stream().persist()")]
927 #[expect(missing_docs, reason = "deprecated")]
928 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
929 Stream::new(
930 self.location.clone(),
931 HydroNode::Persist {
932 inner: Box::new(self.ir_node.into_inner()),
933 metadata: self.location.new_node_metadata(Stream::<
934 T,
935 Tick<L>,
936 Bounded,
937 TotalOrder,
938 ExactlyOnce,
939 >::collection_kind()),
940 },
941 )
942 }
943
944 /// Converts this singleton into a [`Stream`] containing a single element, the value.
945 ///
946 /// # Example
947 /// ```rust
948 /// # use hydro_lang::prelude::*;
949 /// # use futures::StreamExt;
950 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
951 /// let tick = process.tick();
952 /// let batch_input = process
953 /// .source_iter(q!(vec![123, 456]))
954 /// .batch(&tick, nondet!(/** test */));
955 /// batch_input.clone().chain(
956 /// batch_input.count().into_stream()
957 /// ).all_ticks()
958 /// # }, |mut stream| async move {
959 /// // [123, 456, 2]
960 /// # for w in vec![123, 456, 2] {
961 /// # assert_eq!(stream.next().await.unwrap(), w);
962 /// # }
963 /// # }));
964 /// ```
965 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
966 Stream::new(
967 self.location.clone(),
968 HydroNode::Cast {
969 inner: Box::new(self.ir_node.into_inner()),
970 metadata: self.location.new_node_metadata(Stream::<
971 T,
972 Tick<L>,
973 Bounded,
974 TotalOrder,
975 ExactlyOnce,
976 >::collection_kind()),
977 },
978 )
979 }
980}
981
982#[doc(hidden)]
983/// Helper trait that determines the output collection type for [`Singleton::zip`].
984///
985/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
986/// [`Singleton`].
987#[sealed::sealed]
988pub trait ZipResult<'a, Other> {
989 /// The output collection type.
990 type Out;
991 /// The type of the tupled output value.
992 type ElementType;
993 /// The type of the other collection's value.
994 type OtherType;
995 /// The location where the tupled result will be materialized.
996 type Location: Location<'a>;
997
998 /// The location of the second input to the `zip`.
999 fn other_location(other: &Other) -> Self::Location;
1000 /// The IR node of the second input to the `zip`.
1001 fn other_ir_node(other: Other) -> HydroNode;
1002
1003 /// Constructs the output live collection given an IR node containing the zip result.
1004 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1005}
1006
1007#[sealed::sealed]
1008impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1009where
1010 L: Location<'a>,
1011{
1012 type Out = Singleton<(T, U), L, B>;
1013 type ElementType = (T, U);
1014 type OtherType = U;
1015 type Location = L;
1016
1017 fn other_location(other: &Singleton<U, L, B>) -> L {
1018 other.location.clone()
1019 }
1020
1021 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1022 other.ir_node.into_inner()
1023 }
1024
1025 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1026 Singleton::new(
1027 location.clone(),
1028 HydroNode::Cast {
1029 inner: Box::new(ir_node),
1030 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1031 },
1032 )
1033 }
1034}
1035
1036#[sealed::sealed]
1037impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1038where
1039 L: Location<'a>,
1040{
1041 type Out = Optional<(T, U), L, B>;
1042 type ElementType = (T, U);
1043 type OtherType = U;
1044 type Location = L;
1045
1046 fn other_location(other: &Optional<U, L, B>) -> L {
1047 other.location.clone()
1048 }
1049
1050 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1051 other.ir_node.into_inner()
1052 }
1053
1054 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1055 Optional::new(location, ir_node)
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use futures::{SinkExt, StreamExt};
1062 use hydro_deploy::Deployment;
1063 use stageleft::q;
1064
1065 use crate::compile::builder::FlowBuilder;
1066 use crate::live_collections::stream::ExactlyOnce;
1067 use crate::location::Location;
1068 use crate::nondet::nondet;
1069
1070 #[tokio::test]
1071 async fn tick_cycle_cardinality() {
1072 let mut deployment = Deployment::new();
1073
1074 let flow = FlowBuilder::new();
1075 let node = flow.process::<()>();
1076 let external = flow.external::<()>();
1077
1078 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1079
1080 let node_tick = node.tick();
1081 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1082 let counts = singleton
1083 .clone()
1084 .into_stream()
1085 .count()
1086 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1087 .all_ticks()
1088 .send_bincode_external(&external);
1089 complete_cycle.complete_next_tick(singleton);
1090
1091 let nodes = flow
1092 .with_process(&node, deployment.Localhost())
1093 .with_external(&external, deployment.Localhost())
1094 .deploy(&mut deployment);
1095
1096 deployment.deploy().await.unwrap();
1097
1098 let mut tick_trigger = nodes.connect(input_send).await;
1099 let mut external_out = nodes.connect(counts).await;
1100
1101 deployment.start().await.unwrap();
1102
1103 tick_trigger.send(()).await.unwrap();
1104
1105 assert_eq!(external_out.next().await.unwrap(), 1);
1106
1107 tick_trigger.send(()).await.unwrap();
1108
1109 assert_eq!(external_out.next().await.unwrap(), 1);
1110 }
1111}