hydro_lang/
optional.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::boundedness::Boundedness;
10use crate::builder::FLOW_USED_MESSAGE;
11use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
12use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
13use crate::location::tick::{Atomic, NoAtomic};
14use crate::location::{LocationId, NoTick, check_matching_location};
15use crate::singleton::ZipResult;
16use crate::stream::{AtLeastOnce, ExactlyOnce, NoOrder};
17use crate::unsafety::NonDet;
18use crate::{Bounded, Location, Singleton, Stream, Tick, TotalOrder, Unbounded};
19
20pub struct Optional<Type, Loc, Bound: Boundedness> {
21    pub(crate) location: Loc,
22    pub(crate) ir_node: RefCell<HydroNode>,
23
24    _phantom: PhantomData<(Type, Loc, Bound)>,
25}
26
27impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
28where
29    L: Location<'a>,
30{
31    fn defer_tick(self) -> Self {
32        Optional::defer_tick(self)
33    }
34}
35
36impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
37where
38    L: Location<'a>,
39{
40    type Location = Tick<L>;
41
42    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
43        Optional::new(
44            location.clone(),
45            HydroNode::CycleSource {
46                ident,
47                metadata: location.new_node_metadata::<T>(),
48            },
49        )
50    }
51}
52
53impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
58        assert_eq!(
59            self.location.id(),
60            expected_location,
61            "locations do not match"
62        );
63        self.location
64            .flow_state()
65            .borrow_mut()
66            .leaves
67            .as_mut()
68            .expect(FLOW_USED_MESSAGE)
69            .push(HydroLeaf::CycleSink {
70                ident,
71                input: Box::new(self.ir_node.into_inner()),
72                metadata: self.location.new_node_metadata::<T>(),
73            });
74    }
75}
76
77impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
78where
79    L: Location<'a>,
80{
81    type Location = Tick<L>;
82
83    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
84        Optional::new(
85            location.clone(),
86            HydroNode::CycleSource {
87                ident,
88                metadata: location.new_node_metadata::<T>(),
89            },
90        )
91    }
92}
93
94impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
95where
96    L: Location<'a>,
97{
98    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
99        assert_eq!(
100            self.location.id(),
101            expected_location,
102            "locations do not match"
103        );
104        self.location
105            .flow_state()
106            .borrow_mut()
107            .leaves
108            .as_mut()
109            .expect(FLOW_USED_MESSAGE)
110            .push(HydroLeaf::CycleSink {
111                ident,
112                input: Box::new(self.ir_node.into_inner()),
113                metadata: self.location.new_node_metadata::<T>(),
114            });
115    }
116}
117
118impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
119where
120    L: Location<'a> + NoTick,
121{
122    type Location = L;
123
124    fn create_source(ident: syn::Ident, location: L) -> Self {
125        Optional::new(
126            location.clone(),
127            HydroNode::Persist {
128                inner: Box::new(HydroNode::CycleSource {
129                    ident,
130                    metadata: location.new_node_metadata::<T>(),
131                }),
132                metadata: location.new_node_metadata::<T>(),
133            },
134        )
135    }
136}
137
138impl<'a, T, L, B: Boundedness> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
139where
140    L: Location<'a> + NoTick,
141{
142    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
143        assert_eq!(
144            self.location.id(),
145            expected_location,
146            "locations do not match"
147        );
148        let metadata = self.location.new_node_metadata::<T>();
149        self.location
150            .flow_state()
151            .borrow_mut()
152            .leaves
153            .as_mut()
154            .expect(FLOW_USED_MESSAGE)
155            .push(HydroLeaf::CycleSink {
156                ident,
157                input: Box::new(HydroNode::Unpersist {
158                    inner: Box::new(self.ir_node.into_inner()),
159                    metadata: metadata.clone(),
160                }),
161                metadata,
162            });
163    }
164}
165
166impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
167where
168    L: Location<'a>,
169{
170    fn from(singleton: Optional<T, L, Bounded>) -> Self {
171        Optional::new(singleton.location, singleton.ir_node.into_inner())
172    }
173}
174
175impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
176where
177    L: Location<'a>,
178{
179    fn from(singleton: Singleton<T, L, B>) -> Self {
180        Optional::some(singleton)
181    }
182}
183
184impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
185where
186    T: Clone,
187    L: Location<'a>,
188{
189    fn clone(&self) -> Self {
190        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192            *self.ir_node.borrow_mut() = HydroNode::Tee {
193                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194                metadata: self.location.new_node_metadata::<T>(),
195            };
196        }
197
198        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199            Optional {
200                location: self.location.clone(),
201                ir_node: HydroNode::Tee {
202                    inner: TeeNode(inner.0.clone()),
203                    metadata: metadata.clone(),
204                }
205                .into(),
206                _phantom: PhantomData,
207            }
208        } else {
209            unreachable!()
210        }
211    }
212}
213
214impl<'a, T, L, B: Boundedness> Optional<T, L, B>
215where
216    L: Location<'a>,
217{
218    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
219        Optional {
220            location,
221            ir_node: RefCell::new(ir_node),
222            _phantom: PhantomData,
223        }
224    }
225
226    pub fn some(singleton: Singleton<T, L, B>) -> Self {
227        Optional::new(singleton.location, singleton.ir_node.into_inner())
228    }
229
230    /// Transforms the optional value by applying a function `f` to it,
231    /// continuously as the input is updated.
232    ///
233    /// Whenever the optional is empty, the output optional is also empty.
234    ///
235    /// # Example
236    /// ```rust
237    /// # use hydro_lang::*;
238    /// # use futures::StreamExt;
239    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
240    /// let tick = process.tick();
241    /// let optional = tick.optional_first_tick(q!(1));
242    /// optional.map(q!(|v| v + 1)).all_ticks()
243    /// # }, |mut stream| async move {
244    /// // 2
245    /// # assert_eq!(stream.next().await.unwrap(), 2);
246    /// # }));
247    /// ```
248    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
249    where
250        F: Fn(T) -> U + 'a,
251    {
252        let f = f.splice_fn1_ctx(&self.location).into();
253        Optional::new(
254            self.location.clone(),
255            HydroNode::Map {
256                f,
257                input: Box::new(self.ir_node.into_inner()),
258                metadata: self.location.new_node_metadata::<U>(),
259            },
260        )
261    }
262
263    pub fn flat_map_ordered<U, I, F>(
264        self,
265        f: impl IntoQuotedMut<'a, F, L>,
266    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
267    where
268        I: IntoIterator<Item = U>,
269        F: Fn(T) -> I + 'a,
270    {
271        let f = f.splice_fn1_ctx(&self.location).into();
272        Stream::new(
273            self.location.clone(),
274            HydroNode::FlatMap {
275                f,
276                input: Box::new(self.ir_node.into_inner()),
277                metadata: self.location.new_node_metadata::<U>(),
278            },
279        )
280    }
281
282    pub fn flat_map_unordered<U, I, F>(
283        self,
284        f: impl IntoQuotedMut<'a, F, L>,
285    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
286    where
287        I: IntoIterator<Item = U>,
288        F: Fn(T) -> I + 'a,
289    {
290        let f = f.splice_fn1_ctx(&self.location).into();
291        Stream::new(
292            self.location.clone(),
293            HydroNode::FlatMap {
294                f,
295                input: Box::new(self.ir_node.into_inner()),
296                metadata: self.location.new_node_metadata::<U>(),
297            },
298        )
299    }
300
301    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
302    where
303        T: IntoIterator<Item = U>,
304    {
305        self.flat_map_ordered(q!(|v| v))
306    }
307
308    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
309    where
310        T: IntoIterator<Item = U>,
311    {
312        self.flat_map_unordered(q!(|v| v))
313    }
314
315    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
316    where
317        F: Fn(&T) -> bool + 'a,
318    {
319        let f = f.splice_fn1_borrow_ctx(&self.location).into();
320        Optional::new(
321            self.location.clone(),
322            HydroNode::Filter {
323                f,
324                input: Box::new(self.ir_node.into_inner()),
325                metadata: self.location.new_node_metadata::<T>(),
326            },
327        )
328    }
329
330    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
331    where
332        F: Fn(T) -> Option<U> + 'a,
333    {
334        let f = f.splice_fn1_ctx(&self.location).into();
335        Optional::new(
336            self.location.clone(),
337            HydroNode::FilterMap {
338                f,
339                input: Box::new(self.ir_node.into_inner()),
340                metadata: self.location.new_node_metadata::<U>(),
341            },
342        )
343    }
344
345    pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
346        check_matching_location(&self.location, &other.location);
347
348        if L::is_top_level() {
349            Optional::new(
350                self.location.clone(),
351                HydroNode::Persist {
352                    inner: Box::new(HydroNode::Chain {
353                        first: Box::new(HydroNode::Unpersist {
354                            inner: Box::new(self.ir_node.into_inner()),
355                            metadata: self.location.new_node_metadata::<T>(),
356                        }),
357                        second: Box::new(HydroNode::Unpersist {
358                            inner: Box::new(other.ir_node.into_inner()),
359                            metadata: self.location.new_node_metadata::<T>(),
360                        }),
361                        metadata: self.location.new_node_metadata::<T>(),
362                    }),
363                    metadata: self.location.new_node_metadata::<T>(),
364                },
365            )
366        } else {
367            Optional::new(
368                self.location.clone(),
369                HydroNode::Chain {
370                    first: Box::new(self.ir_node.into_inner()),
371                    second: Box::new(other.ir_node.into_inner()),
372                    metadata: self.location.new_node_metadata::<T>(),
373                },
374            )
375        }
376    }
377
378    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
379    where
380        O: Clone,
381    {
382        let other: Optional<O, L, B> = other.into();
383        check_matching_location(&self.location, &other.location);
384
385        if L::is_top_level() {
386            Optional::new(
387                self.location.clone(),
388                HydroNode::Persist {
389                    inner: Box::new(HydroNode::CrossSingleton {
390                        left: Box::new(HydroNode::Unpersist {
391                            inner: Box::new(self.ir_node.into_inner()),
392                            metadata: self.location.new_node_metadata::<T>(),
393                        }),
394                        right: Box::new(HydroNode::Unpersist {
395                            inner: Box::new(other.ir_node.into_inner()),
396                            metadata: self.location.new_node_metadata::<O>(),
397                        }),
398                        metadata: self.location.new_node_metadata::<(T, O)>(),
399                    }),
400                    metadata: self.location.new_node_metadata::<(T, O)>(),
401                },
402            )
403        } else {
404            Optional::new(
405                self.location.clone(),
406                HydroNode::CrossSingleton {
407                    left: Box::new(self.ir_node.into_inner()),
408                    right: Box::new(other.ir_node.into_inner()),
409                    metadata: self.location.new_node_metadata::<(T, O)>(),
410                },
411            )
412        }
413    }
414
415    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
416        check_matching_location(&self.location, &other.location);
417
418        if L::is_top_level() {
419            Singleton::new(
420                self.location.clone(),
421                HydroNode::Persist {
422                    inner: Box::new(HydroNode::Chain {
423                        first: Box::new(HydroNode::Unpersist {
424                            inner: Box::new(self.ir_node.into_inner()),
425                            metadata: self.location.new_node_metadata::<T>(),
426                        }),
427                        second: Box::new(HydroNode::Unpersist {
428                            inner: Box::new(other.ir_node.into_inner()),
429                            metadata: self.location.new_node_metadata::<T>(),
430                        }),
431                        metadata: self.location.new_node_metadata::<T>(),
432                    }),
433                    metadata: self.location.new_node_metadata::<T>(),
434                },
435            )
436        } else {
437            Singleton::new(
438                self.location.clone(),
439                HydroNode::Chain {
440                    first: Box::new(self.ir_node.into_inner()),
441                    second: Box::new(other.ir_node.into_inner()),
442                    metadata: self.location.new_node_metadata::<T>(),
443                },
444            )
445        }
446    }
447
448    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
449    where
450        T: Clone,
451    {
452        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
453        let core_ir = HydroNode::Persist {
454            inner: Box::new(HydroNode::Source {
455                source: HydroSource::Iter(none.into()),
456                metadata: self.location.root().new_node_metadata::<Option<T>>(),
457            }),
458            metadata: self.location.new_node_metadata::<Option<T>>(),
459        };
460
461        let none_singleton = if L::is_top_level() {
462            Singleton::new(
463                self.location.clone(),
464                HydroNode::Persist {
465                    inner: Box::new(core_ir),
466                    metadata: self.location.new_node_metadata::<Option<T>>(),
467                },
468            )
469        } else {
470            Singleton::new(self.location.clone(), core_ir)
471        };
472
473        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
474    }
475
476    /// An operator which allows you to "name" a `HydroNode`.
477    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
478    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
479        {
480            let mut node = self.ir_node.borrow_mut();
481            let metadata = node.metadata_mut();
482            metadata.tag = Some(name.to_string());
483        }
484        self
485    }
486}
487
488impl<'a, T, L> Optional<T, L, Bounded>
489where
490    L: Location<'a>,
491{
492    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
493        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
494    }
495
496    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
497        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
498    }
499
500    pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
501    where
502        Singleton<U, L, Bounded>: ZipResult<
503                'a,
504                Optional<(), L, Bounded>,
505                Location = L,
506                Out = Optional<(U, ()), L, Bounded>,
507            >,
508    {
509        value.continue_if(self)
510    }
511
512    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
513        if L::is_top_level() {
514            panic!("Converting an optional to a stream is not yet supported at the top level");
515        }
516
517        Stream::new(self.location, self.ir_node.into_inner())
518    }
519}
520
521impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
522where
523    L: Location<'a> + NoTick,
524{
525    /// Returns an optional value corresponding to the latest snapshot of the optional
526    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
527    /// at least all relevant data that contributed to the snapshot at tick `t`.
528    ///
529    /// # Non-Determinism
530    /// Because this picks a snapshot of a optional whose value is continuously changing,
531    /// the output optional has a non-deterministic value since the snapshot can be at an
532    /// arbitrary point in time.
533    pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
534        Optional::new(
535            self.location.clone().tick,
536            HydroNode::Unpersist {
537                inner: Box::new(self.ir_node.into_inner()),
538                metadata: self.location.new_node_metadata::<T>(),
539            },
540        )
541    }
542
543    pub fn end_atomic(self) -> Optional<T, L, B> {
544        Optional::new(self.location.tick.l, self.ir_node.into_inner())
545    }
546}
547
548impl<'a, T, L, B: Boundedness> Optional<T, L, B>
549where
550    L: Location<'a> + NoTick + NoAtomic,
551{
552    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
553        Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
554    }
555
556    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
557    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
558    /// relevant data that contributed to the snapshot at tick `t`.
559    ///
560    /// # Non-Determinism
561    /// Because this picks a snapshot of a optional whose value is continuously changing,
562    /// the output optional has a non-deterministic value since the snapshot can be at an
563    /// arbitrary point in time.
564    pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
565        self.atomic(tick).snapshot(nondet)
566    }
567
568    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
569    /// with order corresponding to increasing prefixes of data contributing to the optional.
570    ///
571    /// # Non-Determinism
572    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
573    /// to non-deterministic batching and arrival of inputs, the output stream is
574    /// non-deterministic.
575    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
576        let tick = self.location.tick();
577        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
578    }
579
580    /// Given a time interval, returns a stream corresponding to snapshots of the optional
581    /// value taken at various points in time. Because the input optional may be
582    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
583    /// represent the value of the optional given some prefix of the streams leading up to
584    /// it.
585    ///
586    /// # Non-Determinism
587    /// The output stream is non-deterministic in which elements are sampled, since this
588    /// is controlled by a clock.
589    pub fn sample_every(
590        self,
591        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
592        nondet: NonDet,
593    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
594        let samples = self.location.source_interval(interval, nondet);
595        let tick = self.location.tick();
596
597        self.snapshot(&tick, nondet)
598            .continue_if(samples.batch(&tick, nondet).first())
599            .all_ticks()
600            .weakest_retries()
601    }
602}
603
604impl<'a, T, L> Optional<T, Tick<L>, Bounded>
605where
606    L: Location<'a>,
607{
608    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
609        Stream::new(
610            self.location.outer().clone(),
611            HydroNode::Persist {
612                inner: Box::new(self.ir_node.into_inner()),
613                metadata: self.location.new_node_metadata::<T>(),
614            },
615        )
616    }
617
618    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
619        Stream::new(
620            Atomic {
621                tick: self.location.clone(),
622            },
623            HydroNode::Persist {
624                inner: Box::new(self.ir_node.into_inner()),
625                metadata: self.location.new_node_metadata::<T>(),
626            },
627        )
628    }
629
630    pub fn latest(self) -> Optional<T, L, Unbounded> {
631        Optional::new(
632            self.location.outer().clone(),
633            HydroNode::Persist {
634                inner: Box::new(self.ir_node.into_inner()),
635                metadata: self.location.new_node_metadata::<T>(),
636            },
637        )
638    }
639
640    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
641        Optional::new(
642            Atomic {
643                tick: self.location.clone(),
644            },
645            HydroNode::Persist {
646                inner: Box::new(self.ir_node.into_inner()),
647                metadata: self.location.new_node_metadata::<T>(),
648            },
649        )
650    }
651
652    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
653        Optional::new(
654            self.location.clone(),
655            HydroNode::DeferTick {
656                input: Box::new(self.ir_node.into_inner()),
657                metadata: self.location.new_node_metadata::<T>(),
658            },
659        )
660    }
661
662    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
663        Stream::new(
664            self.location.clone(),
665            HydroNode::Persist {
666                inner: Box::new(self.ir_node.into_inner()),
667                metadata: self.location.new_node_metadata::<T>(),
668            },
669        )
670    }
671
672    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
673        Optional::new(
674            self.location.clone(),
675            HydroNode::Delta {
676                inner: Box::new(self.ir_node.into_inner()),
677                metadata: self.location.new_node_metadata::<T>(),
678            },
679        )
680    }
681}