hydro_lang/
optional.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::NoOrder;
16use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded};
17
18pub struct Optional<Type, Loc, Bound> {
19    pub(crate) location: Loc,
20    pub(crate) ir_node: RefCell<HydroNode>,
21
22    _phantom: PhantomData<(Type, Loc, Bound)>,
23}
24
25impl<'a, T, L, B> Optional<T, L, B>
26where
27    L: Location<'a>,
28{
29    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
30        Optional {
31            location,
32            ir_node: RefCell::new(ir_node),
33            _phantom: PhantomData,
34        }
35    }
36
37    pub fn some(singleton: Singleton<T, L, B>) -> Self {
38        Optional::new(singleton.location, singleton.ir_node.into_inner())
39    }
40
41    fn location_kind(&self) -> LocationId {
42        self.location.id()
43    }
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48    L: Location<'a>,
49{
50    fn defer_tick(self) -> Self {
51        Optional::defer_tick(self)
52    }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
56where
57    L: Location<'a>,
58{
59    type Location = Tick<L>;
60
61    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62        let location_id = location.id();
63        Optional::new(
64            location.clone(),
65            HydroNode::CycleSource {
66                ident,
67                location_kind: location_id,
68                metadata: location.new_node_metadata::<T>(),
69            },
70        )
71    }
72}
73
74impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
75where
76    L: Location<'a>,
77{
78    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
79        assert_eq!(
80            self.location.id(),
81            expected_location,
82            "locations do not match"
83        );
84        self.location
85            .flow_state()
86            .borrow_mut()
87            .leaves
88            .as_mut()
89            .expect(FLOW_USED_MESSAGE)
90            .push(HydroLeaf::CycleSink {
91                ident,
92                location_kind: self.location_kind(),
93                input: Box::new(self.ir_node.into_inner()),
94                metadata: self.location.new_node_metadata::<T>(),
95            });
96    }
97}
98
99impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
100where
101    L: Location<'a>,
102{
103    type Location = Tick<L>;
104
105    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
106        let location_id = location.id();
107        Optional::new(
108            location.clone(),
109            HydroNode::CycleSource {
110                ident,
111                location_kind: location_id,
112                metadata: location.new_node_metadata::<T>(),
113            },
114        )
115    }
116}
117
118impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
119where
120    L: Location<'a>,
121{
122    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
123        assert_eq!(
124            self.location.id(),
125            expected_location,
126            "locations do not match"
127        );
128        self.location
129            .flow_state()
130            .borrow_mut()
131            .leaves
132            .as_mut()
133            .expect(FLOW_USED_MESSAGE)
134            .push(HydroLeaf::CycleSink {
135                ident,
136                location_kind: self.location_kind(),
137                input: Box::new(self.ir_node.into_inner()),
138                metadata: self.location.new_node_metadata::<T>(),
139            });
140    }
141}
142
143impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
144where
145    L: Location<'a> + NoTick,
146{
147    type Location = L;
148
149    fn create_source(ident: syn::Ident, location: L) -> Self {
150        let location_id = location.id();
151        Optional::new(
152            location.clone(),
153            HydroNode::Persist {
154                inner: Box::new(HydroNode::CycleSource {
155                    ident,
156                    location_kind: location_id,
157                    metadata: location.new_node_metadata::<T>(),
158                }),
159                metadata: location.new_node_metadata::<T>(),
160            },
161        )
162    }
163}
164
165impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
166where
167    L: Location<'a> + NoTick,
168{
169    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
170        assert_eq!(
171            self.location.id(),
172            expected_location,
173            "locations do not match"
174        );
175        let metadata = self.location.new_node_metadata::<T>();
176        self.location
177            .flow_state()
178            .borrow_mut()
179            .leaves
180            .as_mut()
181            .expect(FLOW_USED_MESSAGE)
182            .push(HydroLeaf::CycleSink {
183                ident,
184                location_kind: self.location_kind(),
185                input: Box::new(HydroNode::Unpersist {
186                    inner: Box::new(self.ir_node.into_inner()),
187                    metadata: metadata.clone(),
188                }),
189                metadata,
190            });
191    }
192}
193
194impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
195where
196    L: Location<'a>,
197{
198    fn from(singleton: Optional<T, L, Bounded>) -> Self {
199        Optional::new(singleton.location, singleton.ir_node.into_inner())
200    }
201}
202
203impl<'a, T, L, B> From<Singleton<T, L, B>> for Optional<T, L, B>
204where
205    L: Location<'a>,
206{
207    fn from(singleton: Singleton<T, L, B>) -> Self {
208        Optional::some(singleton)
209    }
210}
211
212impl<'a, T, L, B> Clone for Optional<T, L, B>
213where
214    T: Clone,
215    L: Location<'a>,
216{
217    fn clone(&self) -> Self {
218        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
219            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
220            *self.ir_node.borrow_mut() = HydroNode::Tee {
221                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
222                metadata: self.location.new_node_metadata::<T>(),
223            };
224        }
225
226        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
227            Optional {
228                location: self.location.clone(),
229                ir_node: HydroNode::Tee {
230                    inner: TeeNode(inner.0.clone()),
231                    metadata: metadata.clone(),
232                }
233                .into(),
234                _phantom: PhantomData,
235            }
236        } else {
237            unreachable!()
238        }
239    }
240}
241
242impl<'a, T, L, B> Optional<T, L, B>
243where
244    L: Location<'a>,
245{
246    /// Transforms the optional value by applying a function `f` to it,
247    /// continuously as the input is updated.
248    ///
249    /// Whenever the optional is empty, the output optional is also empty.
250    ///
251    /// # Example
252    /// ```rust
253    /// # use hydro_lang::*;
254    /// # use futures::StreamExt;
255    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
256    /// let tick = process.tick();
257    /// let optional = tick.optional_first_tick(q!(1));
258    /// optional.map(q!(|v| v + 1)).all_ticks()
259    /// # }, |mut stream| async move {
260    /// // 2
261    /// # assert_eq!(stream.next().await.unwrap(), 2);
262    /// # }));
263    /// ```
264    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
265    where
266        F: Fn(T) -> U + 'a,
267    {
268        let f = f.splice_fn1_ctx(&self.location).into();
269        Optional::new(
270            self.location.clone(),
271            HydroNode::Map {
272                f,
273                input: Box::new(self.ir_node.into_inner()),
274                metadata: self.location.new_node_metadata::<U>(),
275            },
276        )
277    }
278
279    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B>
280    where
281        I: IntoIterator<Item = U>,
282        F: Fn(T) -> I + 'a,
283    {
284        let f = f.splice_fn1_ctx(&self.location).into();
285        Stream::new(
286            self.location.clone(),
287            HydroNode::FlatMap {
288                f,
289                input: Box::new(self.ir_node.into_inner()),
290                metadata: self.location.new_node_metadata::<U>(),
291            },
292        )
293    }
294
295    pub fn flat_map_unordered<U, I, F>(
296        self,
297        f: impl IntoQuotedMut<'a, F, L>,
298    ) -> Stream<U, L, B, NoOrder>
299    where
300        I: IntoIterator<Item = U>,
301        F: Fn(T) -> I + 'a,
302    {
303        let f = f.splice_fn1_ctx(&self.location).into();
304        Stream::new(
305            self.location.clone(),
306            HydroNode::FlatMap {
307                f,
308                input: Box::new(self.ir_node.into_inner()),
309                metadata: self.location.new_node_metadata::<U>(),
310            },
311        )
312    }
313
314    pub fn flatten_ordered<U>(self) -> Stream<U, L, B>
315    where
316        T: IntoIterator<Item = U>,
317    {
318        self.flat_map_ordered(q!(|v| v))
319    }
320
321    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
322    where
323        T: IntoIterator<Item = U>,
324    {
325        self.flat_map_unordered(q!(|v| v))
326    }
327
328    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
329    where
330        F: Fn(&T) -> bool + 'a,
331    {
332        let f = f.splice_fn1_borrow_ctx(&self.location).into();
333        Optional::new(
334            self.location.clone(),
335            HydroNode::Filter {
336                f,
337                input: Box::new(self.ir_node.into_inner()),
338                metadata: self.location.new_node_metadata::<T>(),
339            },
340        )
341    }
342
343    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
344    where
345        F: Fn(T) -> Option<U> + 'a,
346    {
347        let f = f.splice_fn1_ctx(&self.location).into();
348        Optional::new(
349            self.location.clone(),
350            HydroNode::FilterMap {
351                f,
352                input: Box::new(self.ir_node.into_inner()),
353                metadata: self.location.new_node_metadata::<U>(),
354            },
355        )
356    }
357
358    pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
359        check_matching_location(&self.location, &other.location);
360
361        if L::is_top_level() {
362            Optional::new(
363                self.location.clone(),
364                HydroNode::Persist {
365                    inner: Box::new(HydroNode::Chain {
366                        first: Box::new(HydroNode::Unpersist {
367                            inner: Box::new(self.ir_node.into_inner()),
368                            metadata: self.location.new_node_metadata::<T>(),
369                        }),
370                        second: Box::new(HydroNode::Unpersist {
371                            inner: Box::new(other.ir_node.into_inner()),
372                            metadata: self.location.new_node_metadata::<T>(),
373                        }),
374                        metadata: self.location.new_node_metadata::<T>(),
375                    }),
376                    metadata: self.location.new_node_metadata::<T>(),
377                },
378            )
379        } else {
380            Optional::new(
381                self.location.clone(),
382                HydroNode::Chain {
383                    first: Box::new(self.ir_node.into_inner()),
384                    second: Box::new(other.ir_node.into_inner()),
385                    metadata: self.location.new_node_metadata::<T>(),
386                },
387            )
388        }
389    }
390
391    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
392    where
393        O: Clone,
394    {
395        let other: Optional<O, L, B> = other.into();
396        check_matching_location(&self.location, &other.location);
397
398        if L::is_top_level() {
399            Optional::new(
400                self.location.clone(),
401                HydroNode::Persist {
402                    inner: Box::new(HydroNode::CrossSingleton {
403                        left: Box::new(HydroNode::Unpersist {
404                            inner: Box::new(self.ir_node.into_inner()),
405                            metadata: self.location.new_node_metadata::<T>(),
406                        }),
407                        right: Box::new(HydroNode::Unpersist {
408                            inner: Box::new(other.ir_node.into_inner()),
409                            metadata: self.location.new_node_metadata::<O>(),
410                        }),
411                        metadata: self.location.new_node_metadata::<(T, O)>(),
412                    }),
413                    metadata: self.location.new_node_metadata::<(T, O)>(),
414                },
415            )
416        } else {
417            Optional::new(
418                self.location.clone(),
419                HydroNode::CrossSingleton {
420                    left: Box::new(self.ir_node.into_inner()),
421                    right: Box::new(other.ir_node.into_inner()),
422                    metadata: self.location.new_node_metadata::<(T, O)>(),
423                },
424            )
425        }
426    }
427
428    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
429        check_matching_location(&self.location, &other.location);
430
431        if L::is_top_level() {
432            Singleton::new(
433                self.location.clone(),
434                HydroNode::Persist {
435                    inner: Box::new(HydroNode::Chain {
436                        first: Box::new(HydroNode::Unpersist {
437                            inner: Box::new(self.ir_node.into_inner()),
438                            metadata: self.location.new_node_metadata::<T>(),
439                        }),
440                        second: Box::new(HydroNode::Unpersist {
441                            inner: Box::new(other.ir_node.into_inner()),
442                            metadata: self.location.new_node_metadata::<T>(),
443                        }),
444                        metadata: self.location.new_node_metadata::<T>(),
445                    }),
446                    metadata: self.location.new_node_metadata::<T>(),
447                },
448            )
449        } else {
450            Singleton::new(
451                self.location.clone(),
452                HydroNode::Chain {
453                    first: Box::new(self.ir_node.into_inner()),
454                    second: Box::new(other.ir_node.into_inner()),
455                    metadata: self.location.new_node_metadata::<T>(),
456                },
457            )
458        }
459    }
460
461    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
462    where
463        T: Clone,
464    {
465        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
466        let core_ir = HydroNode::Persist {
467            inner: Box::new(HydroNode::Source {
468                source: HydroSource::Iter(none.into()),
469                location_kind: self.location.id().root().clone(),
470                metadata: self.location.new_node_metadata::<Option<T>>(),
471            }),
472            metadata: self.location.new_node_metadata::<Option<T>>(),
473        };
474
475        let none_singleton = if L::is_top_level() {
476            Singleton::new(
477                self.location.clone(),
478                HydroNode::Persist {
479                    inner: Box::new(core_ir),
480                    metadata: self.location.new_node_metadata::<Option<T>>(),
481                },
482            )
483        } else {
484            Singleton::new(self.location.clone(), core_ir)
485        };
486
487        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
488    }
489}
490
491impl<'a, T, L> Optional<T, L, Bounded>
492where
493    L: Location<'a>,
494{
495    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
496        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
497    }
498
499    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
500        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
501    }
502
503    pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
504    where
505        Singleton<U, L, Bounded>: ZipResult<
506                'a,
507                Optional<(), L, Bounded>,
508                Location = L,
509                Out = Optional<(U, ()), L, Bounded>,
510            >,
511    {
512        value.continue_if(self)
513    }
514
515    pub fn into_stream(self) -> Stream<T, L, Bounded> {
516        if L::is_top_level() {
517            panic!("Converting an optional to a stream is not yet supported at the top level");
518        }
519
520        Stream::new(self.location, self.ir_node.into_inner())
521    }
522}
523
524impl<'a, T, L, B> Optional<T, Atomic<L>, B>
525where
526    L: Location<'a> + NoTick,
527{
528    /// Returns an optional value corresponding to the latest snapshot of the optional
529    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
530    /// at least all relevant data that contributed to the snapshot at tick `t`.
531    ///
532    /// # Safety
533    /// Because this picks a snapshot of a optional whose value is continuously changing,
534    /// the output optional has a non-deterministic value since the snapshot can be at an
535    /// arbitrary point in time.
536    pub unsafe fn latest_tick(self) -> Optional<T, Tick<L>, Bounded> {
537        Optional::new(
538            self.location.clone().tick,
539            HydroNode::Unpersist {
540                inner: Box::new(self.ir_node.into_inner()),
541                metadata: self.location.new_node_metadata::<T>(),
542            },
543        )
544    }
545
546    pub fn end_atomic(self) -> Optional<T, L, B> {
547        Optional::new(self.location.tick.l, self.ir_node.into_inner())
548    }
549}
550
551impl<'a, T, L, B> Optional<T, L, B>
552where
553    L: Location<'a> + NoTick + NoAtomic,
554{
555    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
556        Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
557    }
558
559    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
560    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
561    /// relevant data that contributed to the snapshot at tick `t`.
562    ///
563    /// # Safety
564    /// Because this picks a snapshot of a optional whose value is continuously changing,
565    /// the output optional has a non-deterministic value since the snapshot can be at an
566    /// arbitrary point in time.
567    pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
568        unsafe { self.atomic(tick).latest_tick() }
569    }
570
571    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
572    /// with order corresponding to increasing prefixes of data contributing to the optional.
573    ///
574    /// # Safety
575    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
576    /// to non-deterministic batching and arrival of inputs, the output stream is
577    /// non-deterministic.
578    pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
579        let tick = self.location.tick();
580
581        unsafe {
582            // SAFETY: source of intentional non-determinism
583            self.latest_tick(&tick).all_ticks()
584        }
585    }
586
587    /// Given a time interval, returns a stream corresponding to snapshots of the optional
588    /// value taken at various points in time. Because the input optional may be
589    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
590    /// represent the value of the optional given some prefix of the streams leading up to
591    /// it.
592    ///
593    /// # Safety
594    /// The output stream is non-deterministic in which elements are sampled, since this
595    /// is controlled by a clock.
596    pub unsafe fn sample_every(
597        self,
598        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
599    ) -> Stream<T, L, Unbounded> {
600        let samples = unsafe {
601            // SAFETY: source of intentional non-determinism
602            self.location.source_interval(interval)
603        };
604        let tick = self.location.tick();
605
606        unsafe {
607            // SAFETY: source of intentional non-determinism
608            self.latest_tick(&tick)
609                .continue_if(samples.tick_batch(&tick).first())
610                .all_ticks()
611        }
612    }
613}
614
615impl<'a, T, L> Optional<T, Tick<L>, Bounded>
616where
617    L: Location<'a>,
618{
619    pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
620        Stream::new(
621            self.location.outer().clone(),
622            HydroNode::Persist {
623                inner: Box::new(self.ir_node.into_inner()),
624                metadata: self.location.new_node_metadata::<T>(),
625            },
626        )
627    }
628
629    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
630        Stream::new(
631            Atomic {
632                tick: self.location.clone(),
633            },
634            HydroNode::Persist {
635                inner: Box::new(self.ir_node.into_inner()),
636                metadata: self.location.new_node_metadata::<T>(),
637            },
638        )
639    }
640
641    pub fn latest(self) -> Optional<T, L, Unbounded> {
642        Optional::new(
643            self.location.outer().clone(),
644            HydroNode::Persist {
645                inner: Box::new(self.ir_node.into_inner()),
646                metadata: self.location.new_node_metadata::<T>(),
647            },
648        )
649    }
650
651    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
652        Optional::new(
653            Atomic {
654                tick: self.location.clone(),
655            },
656            HydroNode::Persist {
657                inner: Box::new(self.ir_node.into_inner()),
658                metadata: self.location.new_node_metadata::<T>(),
659            },
660        )
661    }
662
663    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
664        Optional::new(
665            self.location.clone(),
666            HydroNode::DeferTick {
667                input: Box::new(self.ir_node.into_inner()),
668                metadata: self.location.new_node_metadata::<T>(),
669            },
670        )
671    }
672
673    pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
674        Stream::new(
675            self.location.clone(),
676            HydroNode::Persist {
677                inner: Box::new(self.ir_node.into_inner()),
678                metadata: self.location.new_node_metadata::<T>(),
679            },
680        )
681    }
682
683    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
684        Optional::new(
685            self.location.clone(),
686            HydroNode::Delta {
687                inner: Box::new(self.ir_node.into_inner()),
688                metadata: self.location.new_node_metadata::<T>(),
689            },
690        )
691    }
692}