hydro_lang/
singleton.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11    TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::{Bounded, Optional, Stream, Unbounded};
17
18pub struct Singleton<T, L, B> {
19    pub(crate) location: L,
20    pub(crate) ir_node: RefCell<HydroNode>,
21
22    _phantom: PhantomData<(T, L, B)>,
23}
24
25impl<'a, T, L: Location<'a>, B> Singleton<T, L, B> {
26    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
27        Singleton {
28            location,
29            ir_node: RefCell::new(ir_node),
30            _phantom: PhantomData,
31        }
32    }
33
34    fn location_kind(&self) -> LocationId {
35        self.location.id()
36    }
37}
38
39impl<'a, T, L: Location<'a>> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded> {
40    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
41        Singleton::new(singleton.location, singleton.ir_node.into_inner())
42    }
43}
44
45impl<'a, T, L: Location<'a>> DeferTick for Singleton<T, Tick<L>, Bounded> {
46    fn defer_tick(self) -> Self {
47        Singleton::defer_tick(self)
48    }
49}
50
51impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycleMarker>
52    for Singleton<T, Tick<L>, Bounded>
53{
54    type Location = Tick<L>;
55
56    fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
57        let location_id = location.id();
58        Singleton::new(
59            location.clone(),
60            HydroNode::Chain {
61                first: Box::new(HydroNode::CycleSource {
62                    ident,
63                    location_kind: location_id,
64                    metadata: location.new_node_metadata::<T>(),
65                }),
66                second: initial.ir_node.into_inner().into(),
67                metadata: location.new_node_metadata::<T>(),
68            },
69        )
70    }
71}
72
73impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded> {
74    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75        assert_eq!(
76            self.location.id(),
77            expected_location,
78            "locations do not match"
79        );
80        self.location
81            .flow_state()
82            .borrow_mut()
83            .leaves
84            .as_mut()
85            .expect(FLOW_USED_MESSAGE)
86            .push(HydroLeaf::CycleSink {
87                ident,
88                location_kind: self.location_kind(),
89                input: Box::new(self.ir_node.into_inner()),
90                metadata: self.location.new_node_metadata::<T>(),
91            });
92    }
93}
94
95impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker>
96    for Singleton<T, Tick<L>, Bounded>
97{
98    type Location = Tick<L>;
99
100    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
101        let location_id = location.id();
102        Singleton::new(
103            location.clone(),
104            HydroNode::CycleSource {
105                ident,
106                location_kind: location_id,
107                metadata: location.new_node_metadata::<T>(),
108            },
109        )
110    }
111}
112
113impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker>
114    for Singleton<T, Tick<L>, Bounded>
115{
116    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
117        assert_eq!(
118            self.location.id(),
119            expected_location,
120            "locations do not match"
121        );
122        self.location
123            .flow_state()
124            .borrow_mut()
125            .leaves
126            .as_mut()
127            .expect(FLOW_USED_MESSAGE)
128            .push(HydroLeaf::CycleSink {
129                ident,
130                location_kind: self.location_kind(),
131                input: Box::new(self.ir_node.into_inner()),
132                metadata: self.location.new_node_metadata::<T>(),
133            });
134    }
135}
136
137impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker>
138    for Singleton<T, L, B>
139{
140    type Location = L;
141
142    fn create_source(ident: syn::Ident, location: L) -> Self {
143        let location_id = location.id();
144        Singleton::new(
145            location.clone(),
146            HydroNode::Persist {
147                inner: Box::new(HydroNode::CycleSource {
148                    ident,
149                    location_kind: location_id,
150                    metadata: location.new_node_metadata::<T>(),
151                }),
152                metadata: location.new_node_metadata::<T>(),
153            },
154        )
155    }
156}
157
158impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker>
159    for Singleton<T, L, B>
160{
161    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
162        assert_eq!(
163            self.location.id(),
164            expected_location,
165            "locations do not match"
166        );
167        let metadata = self.location.new_node_metadata::<T>();
168        self.location
169            .flow_state()
170            .borrow_mut()
171            .leaves
172            .as_mut()
173            .expect(FLOW_USED_MESSAGE)
174            .push(HydroLeaf::CycleSink {
175                ident,
176                location_kind: self.location_kind(),
177                input: Box::new(HydroNode::Unpersist {
178                    inner: Box::new(self.ir_node.into_inner()),
179                    metadata: metadata.clone(),
180                }),
181                metadata,
182            });
183    }
184}
185
186impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton<T, L, B> {
187    fn clone(&self) -> Self {
188        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
189            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
190            *self.ir_node.borrow_mut() = HydroNode::Tee {
191                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
192                metadata: self.location.new_node_metadata::<T>(),
193            };
194        }
195
196        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
197            Singleton {
198                location: self.location.clone(),
199                ir_node: HydroNode::Tee {
200                    inner: TeeNode(inner.0.clone()),
201                    metadata: metadata.clone(),
202                }
203                .into(),
204                _phantom: PhantomData,
205            }
206        } else {
207            unreachable!()
208        }
209    }
210}
211
212impl<'a, T, L: Location<'a>, B> Singleton<T, L, B> {
213    pub fn map<U, F: Fn(T) -> U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B> {
214        let f = f.splice_fn1_ctx(&self.location).into();
215        Singleton::new(
216            self.location.clone(),
217            HydroNode::Map {
218                f,
219                input: Box::new(self.ir_node.into_inner()),
220                metadata: self.location.new_node_metadata::<U>(),
221            },
222        )
223    }
224
225    pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
226        self,
227        f: impl IntoQuotedMut<'a, F, L>,
228    ) -> Stream<U, L, B> {
229        let f = f.splice_fn1_ctx(&self.location).into();
230        Stream::new(
231            self.location.clone(),
232            HydroNode::FlatMap {
233                f,
234                input: Box::new(self.ir_node.into_inner()),
235                metadata: self.location.new_node_metadata::<U>(),
236            },
237        )
238    }
239
240    pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
241        self,
242        f: impl IntoQuotedMut<'a, F, L>,
243    ) -> Stream<U, L, B> {
244        let f = f.splice_fn1_ctx(&self.location).into();
245        Stream::new(
246            self.location.clone(),
247            HydroNode::FlatMap {
248                f,
249                input: Box::new(self.ir_node.into_inner()),
250                metadata: self.location.new_node_metadata::<U>(),
251            },
252        )
253    }
254
255    pub fn filter<F: Fn(&T) -> bool + 'a>(
256        self,
257        f: impl IntoQuotedMut<'a, F, L>,
258    ) -> Optional<T, L, B> {
259        let f = f.splice_fn1_borrow_ctx(&self.location).into();
260        Optional::new(
261            self.location.clone(),
262            HydroNode::Filter {
263                f,
264                input: Box::new(self.ir_node.into_inner()),
265                metadata: self.location.new_node_metadata::<T>(),
266            },
267        )
268    }
269
270    pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
271        self,
272        f: impl IntoQuotedMut<'a, F, L>,
273    ) -> Optional<U, L, B> {
274        let f = f.splice_fn1_ctx(&self.location).into();
275        Optional::new(
276            self.location.clone(),
277            HydroNode::FilterMap {
278                f,
279                input: Box::new(self.ir_node.into_inner()),
280                metadata: self.location.new_node_metadata::<U>(),
281            },
282        )
283    }
284
285    pub fn zip<Other>(self, other: Other) -> <Self as ZipResult<'a, Other>>::Out
286    where
287        Self: ZipResult<'a, Other, Location = L>,
288    {
289        check_matching_location(&self.location, &Self::other_location(&other));
290
291        if L::is_top_level() {
292            let left_ir_node = self.ir_node.into_inner();
293            let left_ir_node_metadata = left_ir_node.metadata().clone();
294            let right_ir_node = Self::other_ir_node(other);
295            let right_ir_node_metadata = right_ir_node.metadata().clone();
296
297            Self::make(
298                self.location.clone(),
299                HydroNode::Persist {
300                    inner: Box::new(HydroNode::CrossSingleton {
301                        left: Box::new(HydroNode::Unpersist {
302                            inner: Box::new(left_ir_node),
303                            metadata: left_ir_node_metadata,
304                        }),
305                        right: Box::new(HydroNode::Unpersist {
306                            inner: Box::new(right_ir_node),
307                            metadata: right_ir_node_metadata,
308                        }),
309                        metadata: self
310                            .location
311                            .new_node_metadata::<<Self as ZipResult<'a, Other>>::ElementType>(),
312                    }),
313                    metadata: self
314                        .location
315                        .new_node_metadata::<<Self as ZipResult<'a, Other>>::ElementType>(),
316                },
317            )
318        } else {
319            Self::make(
320                self.location.clone(),
321                HydroNode::CrossSingleton {
322                    left: Box::new(self.ir_node.into_inner()),
323                    right: Box::new(Self::other_ir_node(other)),
324                    metadata: self
325                        .location
326                        .new_node_metadata::<<Self as ZipResult<'a, Other>>::ElementType>(),
327                },
328            )
329        }
330    }
331
332    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
333    where
334        Self: ZipResult<
335                'a,
336                Optional<(), L, Bounded>,
337                Location = L,
338                Out = Optional<(T, ()), L, Bounded>,
339            >,
340    {
341        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
342    }
343
344    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
345    where
346        Singleton<T, L, B>: ZipResult<
347                'a,
348                Optional<(), L, Bounded>,
349                Location = L,
350                Out = Optional<(T, ()), L, Bounded>,
351            >,
352    {
353        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
354    }
355}
356
357impl<'a, T, L: Location<'a> + NoTick, B> Singleton<T, Atomic<L>, B> {
358    /// Returns a singleton value corresponding to the latest snapshot of the singleton
359    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
360    /// at least all relevant data that contributed to the snapshot at tick `t`.
361    ///
362    /// # Safety
363    /// Because this picks a snapshot of a singleton whose value is continuously changing,
364    /// the output singleton has a non-deterministic value since the snapshot can be at an
365    /// arbitrary point in time.
366    pub unsafe fn latest_tick(self) -> Singleton<T, Tick<L>, Bounded> {
367        Singleton::new(
368            self.location.clone().tick,
369            HydroNode::Unpersist {
370                inner: Box::new(self.ir_node.into_inner()),
371                metadata: self.location.new_node_metadata::<T>(),
372            },
373        )
374    }
375
376    pub fn end_atomic(self) -> Optional<T, L, B> {
377        Optional::new(self.location.tick.l, self.ir_node.into_inner())
378    }
379}
380
381impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Singleton<T, L, B> {
382    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
383        Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
384    }
385
386    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
387    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
388    /// relevant data that contributed to the snapshot at tick `t`.
389    ///
390    /// # Safety
391    /// Because this picks a snapshot of a singleton whose value is continuously changing,
392    /// the output singleton has a non-deterministic value since the snapshot can be at an
393    /// arbitrary point in time.
394    pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
395    where
396        L: NoTick,
397    {
398        unsafe { self.atomic(tick).latest_tick() }
399    }
400
401    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
402    /// with order corresponding to increasing prefixes of data contributing to the singleton.
403    ///
404    /// # Safety
405    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
406    /// to non-deterministic batching and arrival of inputs, the output stream is
407    /// non-deterministic.
408    pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
409        let tick = self.location.tick();
410
411        unsafe {
412            // SAFETY: source of intentional non-determinism
413            self.latest_tick(&tick).all_ticks()
414        }
415    }
416
417    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
418    /// value taken at various points in time. Because the input singleton may be
419    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
420    /// represent the value of the singleton given some prefix of the streams leading up to
421    /// it.
422    ///
423    /// # Safety
424    /// The output stream is non-deterministic in which elements are sampled, since this
425    /// is controlled by a clock.
426    pub unsafe fn sample_every(
427        self,
428        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
429    ) -> Stream<T, L, Unbounded>
430    where
431        L: NoAtomic,
432    {
433        let samples = unsafe {
434            // SAFETY: source of intentional non-determinism
435            self.location.source_interval(interval)
436        };
437        let tick = self.location.tick();
438
439        unsafe {
440            // SAFETY: source of intentional non-determinism
441            self.latest_tick(&tick)
442                .continue_if(samples.tick_batch(&tick).first())
443                .all_ticks()
444        }
445    }
446}
447
448impl<'a, T, L: Location<'a>> Singleton<T, Tick<L>, Bounded> {
449    pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
450        Stream::new(
451            self.location.outer().clone(),
452            HydroNode::Persist {
453                inner: Box::new(self.ir_node.into_inner()),
454                metadata: self.location.new_node_metadata::<T>(),
455            },
456        )
457    }
458
459    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
460        Stream::new(
461            Atomic {
462                tick: self.location.clone(),
463            },
464            HydroNode::Persist {
465                inner: Box::new(self.ir_node.into_inner()),
466                metadata: self.location.new_node_metadata::<T>(),
467            },
468        )
469    }
470
471    pub fn latest(self) -> Singleton<T, L, Unbounded> {
472        Singleton::new(
473            self.location.outer().clone(),
474            HydroNode::Persist {
475                inner: Box::new(self.ir_node.into_inner()),
476                metadata: self.location.new_node_metadata::<T>(),
477            },
478        )
479    }
480
481    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
482        Singleton::new(
483            Atomic {
484                tick: self.location.clone(),
485            },
486            HydroNode::Persist {
487                inner: Box::new(self.ir_node.into_inner()),
488                metadata: self.location.new_node_metadata::<T>(),
489            },
490        )
491    }
492
493    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
494        Singleton::new(
495            self.location.clone(),
496            HydroNode::DeferTick {
497                input: Box::new(self.ir_node.into_inner()),
498                metadata: self.location.new_node_metadata::<T>(),
499            },
500        )
501    }
502
503    pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
504        Stream::new(
505            self.location.clone(),
506            HydroNode::Persist {
507                inner: Box::new(self.ir_node.into_inner()),
508                metadata: self.location.new_node_metadata::<T>(),
509            },
510        )
511    }
512
513    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
514        Optional::new(
515            self.location.clone(),
516            HydroNode::Delta {
517                inner: Box::new(self.ir_node.into_inner()),
518                metadata: self.location.new_node_metadata::<T>(),
519            },
520        )
521    }
522
523    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded> {
524        Stream::new(self.location, self.ir_node.into_inner())
525    }
526}
527
528pub trait ZipResult<'a, Other> {
529    type Out;
530    type ElementType;
531    type Location;
532
533    fn other_location(other: &Other) -> Self::Location;
534    fn other_ir_node(other: Other) -> HydroNode;
535
536    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
537}
538
539impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton<U, Tick<L>, B>>
540    for Singleton<T, Tick<L>, B>
541{
542    type Out = Singleton<(T, U), Tick<L>, B>;
543    type ElementType = (T, U);
544    type Location = Tick<L>;
545
546    fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
547        other.location.clone()
548    }
549
550    fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
551        other.ir_node.into_inner()
552    }
553
554    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
555        Singleton::new(location, ir_node)
556    }
557}
558
559impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional<U, Tick<L>, B>>
560    for Singleton<T, Tick<L>, B>
561{
562    type Out = Optional<(T, U), Tick<L>, B>;
563    type ElementType = (T, U);
564    type Location = Tick<L>;
565
566    fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
567        other.location.clone()
568    }
569
570    fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
571        other.ir_node.into_inner()
572    }
573
574    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
575        Optional::new(location, ir_node)
576    }
577}