hydro_lang/
singleton.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::boundedness::Boundedness;
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{
11    CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
12    TickCycleMarker,
13};
14use crate::ir::{HydroLeaf, HydroNode, TeeNode};
15use crate::location::tick::{Atomic, NoAtomic};
16use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
17use crate::stream::{AtLeastOnce, ExactlyOnce};
18use crate::unsafety::NonDet;
19use crate::{Bounded, NoOrder, Optional, Stream, TotalOrder, Unbounded};
20
21pub struct Singleton<Type, Loc, Bound: Boundedness> {
22    pub(crate) location: Loc,
23    pub(crate) ir_node: RefCell<HydroNode>,
24
25    _phantom: PhantomData<(Type, Loc, Bound)>,
26}
27
28impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
29where
30    L: Location<'a>,
31{
32    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
33        Singleton::new(singleton.location, singleton.ir_node.into_inner())
34    }
35}
36
37impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
38where
39    L: Location<'a>,
40{
41    fn defer_tick(self) -> Self {
42        Singleton::defer_tick(self)
43    }
44}
45
46impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
47where
48    L: Location<'a>,
49{
50    type Location = Tick<L>;
51
52    fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
53        Singleton::new(
54            location.clone(),
55            HydroNode::Chain {
56                first: Box::new(HydroNode::CycleSource {
57                    ident,
58                    metadata: location.new_node_metadata::<T>(),
59                }),
60                second: initial
61                    .continue_if(location.optional_first_tick(q!(())))
62                    .ir_node
63                    .into_inner()
64                    .into(),
65                metadata: location.new_node_metadata::<T>(),
66            },
67        )
68    }
69}
70
71impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
72where
73    L: Location<'a>,
74{
75    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
76        assert_eq!(
77            self.location.id(),
78            expected_location,
79            "locations do not match"
80        );
81        self.location
82            .flow_state()
83            .borrow_mut()
84            .leaves
85            .as_mut()
86            .expect(FLOW_USED_MESSAGE)
87            .push(HydroLeaf::CycleSink {
88                ident,
89                input: Box::new(self.ir_node.into_inner()),
90                metadata: self.location.new_node_metadata::<T>(),
91            });
92    }
93}
94
95impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
96where
97    L: Location<'a>,
98{
99    type Location = Tick<L>;
100
101    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
102        Singleton::new(
103            location.clone(),
104            HydroNode::CycleSource {
105                ident,
106                metadata: location.new_node_metadata::<T>(),
107            },
108        )
109    }
110}
111
112impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
113where
114    L: Location<'a>,
115{
116    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
117        assert_eq!(
118            self.location.id(),
119            expected_location,
120            "locations do not match"
121        );
122        self.location
123            .flow_state()
124            .borrow_mut()
125            .leaves
126            .as_mut()
127            .expect(FLOW_USED_MESSAGE)
128            .push(HydroLeaf::CycleSink {
129                ident,
130                input: Box::new(self.ir_node.into_inner()),
131                metadata: self.location.new_node_metadata::<T>(),
132            });
133    }
134}
135
136impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
137where
138    L: Location<'a> + NoTick,
139{
140    type Location = L;
141
142    fn create_source(ident: syn::Ident, location: L) -> Self {
143        Singleton::new(
144            location.clone(),
145            HydroNode::Persist {
146                inner: Box::new(HydroNode::CycleSource {
147                    ident,
148                    metadata: location.new_node_metadata::<T>(),
149                }),
150                metadata: location.new_node_metadata::<T>(),
151            },
152        )
153    }
154}
155
156impl<'a, T, L, B: Boundedness> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
157where
158    L: Location<'a> + NoTick,
159{
160    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
161        assert_eq!(
162            self.location.id(),
163            expected_location,
164            "locations do not match"
165        );
166        let metadata = self.location.new_node_metadata::<T>();
167        self.location
168            .flow_state()
169            .borrow_mut()
170            .leaves
171            .as_mut()
172            .expect(FLOW_USED_MESSAGE)
173            .push(HydroLeaf::CycleSink {
174                ident,
175                input: Box::new(HydroNode::Unpersist {
176                    inner: Box::new(self.ir_node.into_inner()),
177                    metadata: metadata.clone(),
178                }),
179                metadata,
180            });
181    }
182}
183
184impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
185where
186    T: Clone,
187    L: Location<'a>,
188{
189    fn clone(&self) -> Self {
190        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192            *self.ir_node.borrow_mut() = HydroNode::Tee {
193                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194                metadata: self.location.new_node_metadata::<T>(),
195            };
196        }
197
198        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199            Singleton {
200                location: self.location.clone(),
201                ir_node: HydroNode::Tee {
202                    inner: TeeNode(inner.0.clone()),
203                    metadata: metadata.clone(),
204                }
205                .into(),
206                _phantom: PhantomData,
207            }
208        } else {
209            unreachable!()
210        }
211    }
212}
213
214impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
215where
216    L: Location<'a>,
217{
218    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
219        Singleton {
220            location,
221            ir_node: RefCell::new(ir_node),
222            _phantom: PhantomData,
223        }
224    }
225
226    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
227    where
228        F: Fn(T) -> U + 'a,
229    {
230        let f = f.splice_fn1_ctx(&self.location).into();
231        Singleton::new(
232            self.location.clone(),
233            HydroNode::Map {
234                f,
235                input: Box::new(self.ir_node.into_inner()),
236                metadata: self.location.new_node_metadata::<U>(),
237            },
238        )
239    }
240
241    pub fn flat_map_ordered<U, I, F>(
242        self,
243        f: impl IntoQuotedMut<'a, F, L>,
244    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
245    where
246        I: IntoIterator<Item = U>,
247        F: Fn(T) -> I + 'a,
248    {
249        let f = f.splice_fn1_ctx(&self.location).into();
250        Stream::new(
251            self.location.clone(),
252            HydroNode::FlatMap {
253                f,
254                input: Box::new(self.ir_node.into_inner()),
255                metadata: self.location.new_node_metadata::<U>(),
256            },
257        )
258    }
259
260    pub fn flat_map_unordered<U, I, F>(
261        self,
262        f: impl IntoQuotedMut<'a, F, L>,
263    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
264    where
265        I: IntoIterator<Item = U>,
266        F: Fn(T) -> I + 'a,
267    {
268        let f = f.splice_fn1_ctx(&self.location).into();
269        Stream::new(
270            self.location.clone(),
271            HydroNode::FlatMap {
272                f,
273                input: Box::new(self.ir_node.into_inner()),
274                metadata: self.location.new_node_metadata::<U>(),
275            },
276        )
277    }
278
279    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
280    where
281        T: IntoIterator<Item = U>,
282    {
283        self.flat_map_ordered(q!(|x| x))
284    }
285
286    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
287    where
288        T: IntoIterator<Item = U>,
289    {
290        self.flat_map_unordered(q!(|x| x))
291    }
292
293    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
294    where
295        F: Fn(&T) -> bool + 'a,
296    {
297        let f = f.splice_fn1_borrow_ctx(&self.location).into();
298        Optional::new(
299            self.location.clone(),
300            HydroNode::Filter {
301                f,
302                input: Box::new(self.ir_node.into_inner()),
303                metadata: self.location.new_node_metadata::<T>(),
304            },
305        )
306    }
307
308    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
309    where
310        F: Fn(T) -> Option<U> + 'a,
311    {
312        let f = f.splice_fn1_ctx(&self.location).into();
313        Optional::new(
314            self.location.clone(),
315            HydroNode::FilterMap {
316                f,
317                input: Box::new(self.ir_node.into_inner()),
318                metadata: self.location.new_node_metadata::<U>(),
319            },
320        )
321    }
322
323    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
324    where
325        Self: ZipResult<'a, O, Location = L>,
326    {
327        check_matching_location(&self.location, &Self::other_location(&other));
328
329        if L::is_top_level() {
330            let left_ir_node = self.ir_node.into_inner();
331            let left_ir_node_metadata = left_ir_node.metadata().clone();
332            let right_ir_node = Self::other_ir_node(other);
333            let right_ir_node_metadata = right_ir_node.metadata().clone();
334
335            Self::make(
336                self.location.clone(),
337                HydroNode::Persist {
338                    inner: Box::new(HydroNode::CrossSingleton {
339                        left: Box::new(HydroNode::Unpersist {
340                            inner: Box::new(left_ir_node),
341                            metadata: left_ir_node_metadata,
342                        }),
343                        right: Box::new(HydroNode::Unpersist {
344                            inner: Box::new(right_ir_node),
345                            metadata: right_ir_node_metadata,
346                        }),
347                        metadata: self
348                            .location
349                            .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
350                    }),
351                    metadata: self
352                        .location
353                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
354                },
355            )
356        } else {
357            Self::make(
358                self.location.clone(),
359                HydroNode::CrossSingleton {
360                    left: Box::new(self.ir_node.into_inner()),
361                    right: Box::new(Self::other_ir_node(other)),
362                    metadata: self
363                        .location
364                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
365                },
366            )
367        }
368    }
369
370    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
371    where
372        Self: ZipResult<
373                'a,
374                Optional<(), L, Bounded>,
375                Location = L,
376                Out = Optional<(T, ()), L, Bounded>,
377            >,
378    {
379        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
380    }
381
382    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
383    where
384        Singleton<T, L, B>: ZipResult<
385                'a,
386                Optional<(), L, Bounded>,
387                Location = L,
388                Out = Optional<(T, ()), L, Bounded>,
389            >,
390    {
391        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
392    }
393
394    /// An operator which allows you to "name" a `HydroNode`.
395    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
396    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
397        {
398            let mut node = self.ir_node.borrow_mut();
399            let metadata = node.metadata_mut();
400            metadata.tag = Some(name.to_string());
401        }
402        self
403    }
404}
405
406impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
407where
408    L: Location<'a> + NoTick,
409{
410    /// Returns a singleton value corresponding to the latest snapshot of the singleton
411    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
412    /// at least all relevant data that contributed to the snapshot at tick `t`.
413    ///
414    /// # Non-Determinism
415    /// Because this picks a snapshot of a singleton whose value is continuously changing,
416    /// the output singleton has a non-deterministic value since the snapshot can be at an
417    /// arbitrary point in time.
418    pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
419        Singleton::new(
420            self.location.clone().tick,
421            HydroNode::Unpersist {
422                inner: Box::new(self.ir_node.into_inner()),
423                metadata: self.location.new_node_metadata::<T>(),
424            },
425        )
426    }
427
428    pub fn end_atomic(self) -> Optional<T, L, B> {
429        Optional::new(self.location.tick.l, self.ir_node.into_inner())
430    }
431}
432
433impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
434where
435    L: Location<'a> + NoTick + NoAtomic,
436{
437    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
438        Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
439    }
440
441    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
442    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
443    /// relevant data that contributed to the snapshot at tick `t`.
444    ///
445    /// # Non-Determinism
446    /// Because this picks a snapshot of a singleton whose value is continuously changing,
447    /// the output singleton has a non-deterministic value since the snapshot can be at an
448    /// arbitrary point in time.
449    pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
450    where
451        L: NoTick,
452    {
453        self.atomic(tick).snapshot(nondet)
454    }
455
456    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
457    /// with order corresponding to increasing prefixes of data contributing to the singleton.
458    ///
459    /// # Non-Determinism
460    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
461    /// to non-deterministic batching and arrival of inputs, the output stream is
462    /// non-deterministic.
463    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
464        let tick = self.location.tick();
465        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
466    }
467
468    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
469    /// value taken at various points in time. Because the input singleton may be
470    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
471    /// represent the value of the singleton given some prefix of the streams leading up to
472    /// it.
473    ///
474    /// # Non-Determinism
475    /// The output stream is non-deterministic in which elements are sampled, since this
476    /// is controlled by a clock.
477    pub fn sample_every(
478        self,
479        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
480        nondet: NonDet,
481    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
482        let samples = self.location.source_interval(interval, nondet);
483        let tick = self.location.tick();
484
485        self.snapshot(&tick, nondet)
486            .continue_if(samples.batch(&tick, nondet).first())
487            .all_ticks()
488            .weakest_retries()
489    }
490}
491
492impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
493where
494    L: Location<'a>,
495{
496    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
497        Stream::new(
498            self.location.outer().clone(),
499            HydroNode::Persist {
500                inner: Box::new(self.ir_node.into_inner()),
501                metadata: self.location.new_node_metadata::<T>(),
502            },
503        )
504    }
505
506    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
507        Stream::new(
508            Atomic {
509                tick: self.location.clone(),
510            },
511            HydroNode::Persist {
512                inner: Box::new(self.ir_node.into_inner()),
513                metadata: self.location.new_node_metadata::<T>(),
514            },
515        )
516    }
517
518    pub fn latest(self) -> Singleton<T, L, Unbounded> {
519        Singleton::new(
520            self.location.outer().clone(),
521            HydroNode::Persist {
522                inner: Box::new(self.ir_node.into_inner()),
523                metadata: self.location.new_node_metadata::<T>(),
524            },
525        )
526    }
527
528    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
529        Singleton::new(
530            Atomic {
531                tick: self.location.clone(),
532            },
533            HydroNode::Persist {
534                inner: Box::new(self.ir_node.into_inner()),
535                metadata: self.location.new_node_metadata::<T>(),
536            },
537        )
538    }
539
540    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
541        Singleton::new(
542            self.location.clone(),
543            HydroNode::DeferTick {
544                input: Box::new(self.ir_node.into_inner()),
545                metadata: self.location.new_node_metadata::<T>(),
546            },
547        )
548    }
549
550    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
551        Stream::new(
552            self.location.clone(),
553            HydroNode::Persist {
554                inner: Box::new(self.ir_node.into_inner()),
555                metadata: self.location.new_node_metadata::<T>(),
556            },
557        )
558    }
559
560    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
561        Optional::new(
562            self.location.clone(),
563            HydroNode::Delta {
564                inner: Box::new(self.ir_node.into_inner()),
565                metadata: self.location.new_node_metadata::<T>(),
566            },
567        )
568    }
569
570    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
571        Stream::new(self.location, self.ir_node.into_inner())
572    }
573}
574
575pub trait ZipResult<'a, Other> {
576    type Out;
577    type ElementType;
578    type Location;
579
580    fn other_location(other: &Other) -> Self::Location;
581    fn other_ir_node(other: Other) -> HydroNode;
582
583    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
584}
585
586impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, Tick<L>, B>>
587    for Singleton<T, Tick<L>, B>
588where
589    U: Clone,
590    L: Location<'a>,
591{
592    type Out = Singleton<(T, U), Tick<L>, B>;
593    type ElementType = (T, U);
594    type Location = Tick<L>;
595
596    fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
597        other.location.clone()
598    }
599
600    fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
601        other.ir_node.into_inner()
602    }
603
604    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
605        Singleton::new(location, ir_node)
606    }
607}
608
609impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, Tick<L>, B>>
610    for Singleton<T, Tick<L>, B>
611where
612    U: Clone,
613    L: Location<'a>,
614{
615    type Out = Optional<(T, U), Tick<L>, B>;
616    type ElementType = (T, U);
617    type Location = Tick<L>;
618
619    fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
620        other.location.clone()
621    }
622
623    fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
624        other.ir_node.into_inner()
625    }
626
627    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
628        Optional::new(location, ir_node)
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use futures::{SinkExt, StreamExt};
635    use hydro_deploy::Deployment;
636    use stageleft::q;
637
638    use crate::*;
639
640    #[tokio::test]
641    async fn tick_cycle_cardinality() {
642        let mut deployment = Deployment::new();
643
644        let flow = FlowBuilder::new();
645        let node = flow.process::<()>();
646        let external = flow.external::<()>();
647
648        let (input_send, input) = node.source_external_bincode(&external);
649
650        let node_tick = node.tick();
651        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
652        let counts = singleton
653            .clone()
654            .into_stream()
655            .count()
656            .continue_if(input.batch(&node_tick, nondet!(/** testing */)).first())
657            .all_ticks()
658            .send_bincode_external(&external);
659        complete_cycle.complete_next_tick(singleton);
660
661        let nodes = flow
662            .with_process(&node, deployment.Localhost())
663            .with_external(&external, deployment.Localhost())
664            .deploy(&mut deployment);
665
666        deployment.deploy().await.unwrap();
667
668        let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
669        let mut external_out = nodes.connect_source_bincode(counts).await;
670
671        deployment.start().await.unwrap();
672
673        tick_trigger.send(()).await.unwrap();
674
675        assert_eq!(external_out.next().await.unwrap(), 1);
676
677        tick_trigger.send(()).await.unwrap();
678
679        assert_eq!(external_out.next().await.unwrap(), 1);
680    }
681}