1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::NoOrder;
16use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded};
17
18pub struct Optional<Type, Loc, Bound> {
19 pub(crate) location: Loc,
20 pub(crate) ir_node: RefCell<HydroNode>,
21
22 _phantom: PhantomData<(Type, Loc, Bound)>,
23}
24
25impl<'a, T, L, B> Optional<T, L, B>
26where
27 L: Location<'a>,
28{
29 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
30 Optional {
31 location,
32 ir_node: RefCell::new(ir_node),
33 _phantom: PhantomData,
34 }
35 }
36
37 pub fn some(singleton: Singleton<T, L, B>) -> Self {
38 Optional::new(singleton.location, singleton.ir_node.into_inner())
39 }
40
41 fn location_kind(&self) -> LocationId {
42 self.location.id()
43 }
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48 L: Location<'a>,
49{
50 fn defer_tick(self) -> Self {
51 Optional::defer_tick(self)
52 }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
56where
57 L: Location<'a>,
58{
59 type Location = Tick<L>;
60
61 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62 let location_id = location.id();
63 Optional::new(
64 location.clone(),
65 HydroNode::CycleSource {
66 ident,
67 location_kind: location_id,
68 metadata: location.new_node_metadata::<T>(),
69 },
70 )
71 }
72}
73
74impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
75where
76 L: Location<'a>,
77{
78 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
79 assert_eq!(
80 self.location.id(),
81 expected_location,
82 "locations do not match"
83 );
84 self.location
85 .flow_state()
86 .borrow_mut()
87 .leaves
88 .as_mut()
89 .expect(FLOW_USED_MESSAGE)
90 .push(HydroLeaf::CycleSink {
91 ident,
92 location_kind: self.location_kind(),
93 input: Box::new(self.ir_node.into_inner()),
94 metadata: self.location.new_node_metadata::<T>(),
95 });
96 }
97}
98
99impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
100where
101 L: Location<'a>,
102{
103 type Location = Tick<L>;
104
105 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
106 let location_id = location.id();
107 Optional::new(
108 location.clone(),
109 HydroNode::CycleSource {
110 ident,
111 location_kind: location_id,
112 metadata: location.new_node_metadata::<T>(),
113 },
114 )
115 }
116}
117
118impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
119where
120 L: Location<'a>,
121{
122 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
123 assert_eq!(
124 self.location.id(),
125 expected_location,
126 "locations do not match"
127 );
128 self.location
129 .flow_state()
130 .borrow_mut()
131 .leaves
132 .as_mut()
133 .expect(FLOW_USED_MESSAGE)
134 .push(HydroLeaf::CycleSink {
135 ident,
136 location_kind: self.location_kind(),
137 input: Box::new(self.ir_node.into_inner()),
138 metadata: self.location.new_node_metadata::<T>(),
139 });
140 }
141}
142
143impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
144where
145 L: Location<'a> + NoTick,
146{
147 type Location = L;
148
149 fn create_source(ident: syn::Ident, location: L) -> Self {
150 let location_id = location.id();
151 Optional::new(
152 location.clone(),
153 HydroNode::Persist {
154 inner: Box::new(HydroNode::CycleSource {
155 ident,
156 location_kind: location_id,
157 metadata: location.new_node_metadata::<T>(),
158 }),
159 metadata: location.new_node_metadata::<T>(),
160 },
161 )
162 }
163}
164
165impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
166where
167 L: Location<'a> + NoTick,
168{
169 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
170 assert_eq!(
171 self.location.id(),
172 expected_location,
173 "locations do not match"
174 );
175 let metadata = self.location.new_node_metadata::<T>();
176 self.location
177 .flow_state()
178 .borrow_mut()
179 .leaves
180 .as_mut()
181 .expect(FLOW_USED_MESSAGE)
182 .push(HydroLeaf::CycleSink {
183 ident,
184 location_kind: self.location_kind(),
185 input: Box::new(HydroNode::Unpersist {
186 inner: Box::new(self.ir_node.into_inner()),
187 metadata: metadata.clone(),
188 }),
189 metadata,
190 });
191 }
192}
193
194impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
195where
196 L: Location<'a>,
197{
198 fn from(singleton: Optional<T, L, Bounded>) -> Self {
199 Optional::new(singleton.location, singleton.ir_node.into_inner())
200 }
201}
202
203impl<'a, T, L, B> From<Singleton<T, L, B>> for Optional<T, L, B>
204where
205 L: Location<'a>,
206{
207 fn from(singleton: Singleton<T, L, B>) -> Self {
208 Optional::some(singleton)
209 }
210}
211
212impl<'a, T, L, B> Clone for Optional<T, L, B>
213where
214 T: Clone,
215 L: Location<'a>,
216{
217 fn clone(&self) -> Self {
218 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
219 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
220 *self.ir_node.borrow_mut() = HydroNode::Tee {
221 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
222 metadata: self.location.new_node_metadata::<T>(),
223 };
224 }
225
226 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
227 Optional {
228 location: self.location.clone(),
229 ir_node: HydroNode::Tee {
230 inner: TeeNode(inner.0.clone()),
231 metadata: metadata.clone(),
232 }
233 .into(),
234 _phantom: PhantomData,
235 }
236 } else {
237 unreachable!()
238 }
239 }
240}
241
242impl<'a, T, L, B> Optional<T, L, B>
243where
244 L: Location<'a>,
245{
246 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
265 where
266 F: Fn(T) -> U + 'a,
267 {
268 let f = f.splice_fn1_ctx(&self.location).into();
269 Optional::new(
270 self.location.clone(),
271 HydroNode::Map {
272 f,
273 input: Box::new(self.ir_node.into_inner()),
274 metadata: self.location.new_node_metadata::<U>(),
275 },
276 )
277 }
278
279 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B>
280 where
281 I: IntoIterator<Item = U>,
282 F: Fn(T) -> I + 'a,
283 {
284 let f = f.splice_fn1_ctx(&self.location).into();
285 Stream::new(
286 self.location.clone(),
287 HydroNode::FlatMap {
288 f,
289 input: Box::new(self.ir_node.into_inner()),
290 metadata: self.location.new_node_metadata::<U>(),
291 },
292 )
293 }
294
295 pub fn flat_map_unordered<U, I, F>(
296 self,
297 f: impl IntoQuotedMut<'a, F, L>,
298 ) -> Stream<U, L, B, NoOrder>
299 where
300 I: IntoIterator<Item = U>,
301 F: Fn(T) -> I + 'a,
302 {
303 let f = f.splice_fn1_ctx(&self.location).into();
304 Stream::new(
305 self.location.clone(),
306 HydroNode::FlatMap {
307 f,
308 input: Box::new(self.ir_node.into_inner()),
309 metadata: self.location.new_node_metadata::<U>(),
310 },
311 )
312 }
313
314 pub fn flatten_ordered<U>(self) -> Stream<U, L, B>
315 where
316 T: IntoIterator<Item = U>,
317 {
318 self.flat_map_ordered(q!(|v| v))
319 }
320
321 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
322 where
323 T: IntoIterator<Item = U>,
324 {
325 self.flat_map_unordered(q!(|v| v))
326 }
327
328 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
329 where
330 F: Fn(&T) -> bool + 'a,
331 {
332 let f = f.splice_fn1_borrow_ctx(&self.location).into();
333 Optional::new(
334 self.location.clone(),
335 HydroNode::Filter {
336 f,
337 input: Box::new(self.ir_node.into_inner()),
338 metadata: self.location.new_node_metadata::<T>(),
339 },
340 )
341 }
342
343 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
344 where
345 F: Fn(T) -> Option<U> + 'a,
346 {
347 let f = f.splice_fn1_ctx(&self.location).into();
348 Optional::new(
349 self.location.clone(),
350 HydroNode::FilterMap {
351 f,
352 input: Box::new(self.ir_node.into_inner()),
353 metadata: self.location.new_node_metadata::<U>(),
354 },
355 )
356 }
357
358 pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
359 check_matching_location(&self.location, &other.location);
360
361 if L::is_top_level() {
362 Optional::new(
363 self.location.clone(),
364 HydroNode::Persist {
365 inner: Box::new(HydroNode::Chain {
366 first: Box::new(HydroNode::Unpersist {
367 inner: Box::new(self.ir_node.into_inner()),
368 metadata: self.location.new_node_metadata::<T>(),
369 }),
370 second: Box::new(HydroNode::Unpersist {
371 inner: Box::new(other.ir_node.into_inner()),
372 metadata: self.location.new_node_metadata::<T>(),
373 }),
374 metadata: self.location.new_node_metadata::<T>(),
375 }),
376 metadata: self.location.new_node_metadata::<T>(),
377 },
378 )
379 } else {
380 Optional::new(
381 self.location.clone(),
382 HydroNode::Chain {
383 first: Box::new(self.ir_node.into_inner()),
384 second: Box::new(other.ir_node.into_inner()),
385 metadata: self.location.new_node_metadata::<T>(),
386 },
387 )
388 }
389 }
390
391 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
392 where
393 O: Clone,
394 {
395 let other: Optional<O, L, B> = other.into();
396 check_matching_location(&self.location, &other.location);
397
398 if L::is_top_level() {
399 Optional::new(
400 self.location.clone(),
401 HydroNode::Persist {
402 inner: Box::new(HydroNode::CrossSingleton {
403 left: Box::new(HydroNode::Unpersist {
404 inner: Box::new(self.ir_node.into_inner()),
405 metadata: self.location.new_node_metadata::<T>(),
406 }),
407 right: Box::new(HydroNode::Unpersist {
408 inner: Box::new(other.ir_node.into_inner()),
409 metadata: self.location.new_node_metadata::<O>(),
410 }),
411 metadata: self.location.new_node_metadata::<(T, O)>(),
412 }),
413 metadata: self.location.new_node_metadata::<(T, O)>(),
414 },
415 )
416 } else {
417 Optional::new(
418 self.location.clone(),
419 HydroNode::CrossSingleton {
420 left: Box::new(self.ir_node.into_inner()),
421 right: Box::new(other.ir_node.into_inner()),
422 metadata: self.location.new_node_metadata::<(T, O)>(),
423 },
424 )
425 }
426 }
427
428 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
429 check_matching_location(&self.location, &other.location);
430
431 if L::is_top_level() {
432 Singleton::new(
433 self.location.clone(),
434 HydroNode::Persist {
435 inner: Box::new(HydroNode::Chain {
436 first: Box::new(HydroNode::Unpersist {
437 inner: Box::new(self.ir_node.into_inner()),
438 metadata: self.location.new_node_metadata::<T>(),
439 }),
440 second: Box::new(HydroNode::Unpersist {
441 inner: Box::new(other.ir_node.into_inner()),
442 metadata: self.location.new_node_metadata::<T>(),
443 }),
444 metadata: self.location.new_node_metadata::<T>(),
445 }),
446 metadata: self.location.new_node_metadata::<T>(),
447 },
448 )
449 } else {
450 Singleton::new(
451 self.location.clone(),
452 HydroNode::Chain {
453 first: Box::new(self.ir_node.into_inner()),
454 second: Box::new(other.ir_node.into_inner()),
455 metadata: self.location.new_node_metadata::<T>(),
456 },
457 )
458 }
459 }
460
461 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
462 where
463 T: Clone,
464 {
465 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
466 let core_ir = HydroNode::Persist {
467 inner: Box::new(HydroNode::Source {
468 source: HydroSource::Iter(none.into()),
469 location_kind: self.location.id().root().clone(),
470 metadata: self.location.new_node_metadata::<Option<T>>(),
471 }),
472 metadata: self.location.new_node_metadata::<Option<T>>(),
473 };
474
475 let none_singleton = if L::is_top_level() {
476 Singleton::new(
477 self.location.clone(),
478 HydroNode::Persist {
479 inner: Box::new(core_ir),
480 metadata: self.location.new_node_metadata::<Option<T>>(),
481 },
482 )
483 } else {
484 Singleton::new(self.location.clone(), core_ir)
485 };
486
487 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
488 }
489}
490
491impl<'a, T, L> Optional<T, L, Bounded>
492where
493 L: Location<'a>,
494{
495 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
496 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
497 }
498
499 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
500 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
501 }
502
503 pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
504 where
505 Singleton<U, L, Bounded>: ZipResult<
506 'a,
507 Optional<(), L, Bounded>,
508 Location = L,
509 Out = Optional<(U, ()), L, Bounded>,
510 >,
511 {
512 value.continue_if(self)
513 }
514
515 pub fn into_stream(self) -> Stream<T, L, Bounded> {
516 if L::is_top_level() {
517 panic!("Converting an optional to a stream is not yet supported at the top level");
518 }
519
520 Stream::new(self.location, self.ir_node.into_inner())
521 }
522}
523
524impl<'a, T, L, B> Optional<T, Atomic<L>, B>
525where
526 L: Location<'a> + NoTick,
527{
528 pub unsafe fn latest_tick(self) -> Optional<T, Tick<L>, Bounded> {
537 Optional::new(
538 self.location.clone().tick,
539 HydroNode::Unpersist {
540 inner: Box::new(self.ir_node.into_inner()),
541 metadata: self.location.new_node_metadata::<T>(),
542 },
543 )
544 }
545
546 pub fn end_atomic(self) -> Optional<T, L, B> {
547 Optional::new(self.location.tick.l, self.ir_node.into_inner())
548 }
549}
550
551impl<'a, T, L, B> Optional<T, L, B>
552where
553 L: Location<'a> + NoTick + NoAtomic,
554{
555 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
556 Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
557 }
558
559 pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
568 unsafe { self.atomic(tick).latest_tick() }
569 }
570
571 pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
579 let tick = self.location.tick();
580
581 unsafe {
582 self.latest_tick(&tick).all_ticks()
584 }
585 }
586
587 pub unsafe fn sample_every(
597 self,
598 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
599 ) -> Stream<T, L, Unbounded> {
600 let samples = unsafe {
601 self.location.source_interval(interval)
603 };
604 let tick = self.location.tick();
605
606 unsafe {
607 self.latest_tick(&tick)
609 .continue_if(samples.tick_batch(&tick).first())
610 .all_ticks()
611 }
612 }
613}
614
615impl<'a, T, L> Optional<T, Tick<L>, Bounded>
616where
617 L: Location<'a>,
618{
619 pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
620 Stream::new(
621 self.location.outer().clone(),
622 HydroNode::Persist {
623 inner: Box::new(self.ir_node.into_inner()),
624 metadata: self.location.new_node_metadata::<T>(),
625 },
626 )
627 }
628
629 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
630 Stream::new(
631 Atomic {
632 tick: self.location.clone(),
633 },
634 HydroNode::Persist {
635 inner: Box::new(self.ir_node.into_inner()),
636 metadata: self.location.new_node_metadata::<T>(),
637 },
638 )
639 }
640
641 pub fn latest(self) -> Optional<T, L, Unbounded> {
642 Optional::new(
643 self.location.outer().clone(),
644 HydroNode::Persist {
645 inner: Box::new(self.ir_node.into_inner()),
646 metadata: self.location.new_node_metadata::<T>(),
647 },
648 )
649 }
650
651 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
652 Optional::new(
653 Atomic {
654 tick: self.location.clone(),
655 },
656 HydroNode::Persist {
657 inner: Box::new(self.ir_node.into_inner()),
658 metadata: self.location.new_node_metadata::<T>(),
659 },
660 )
661 }
662
663 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
664 Optional::new(
665 self.location.clone(),
666 HydroNode::DeferTick {
667 input: Box::new(self.ir_node.into_inner()),
668 metadata: self.location.new_node_metadata::<T>(),
669 },
670 )
671 }
672
673 pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
674 Stream::new(
675 self.location.clone(),
676 HydroNode::Persist {
677 inner: Box::new(self.ir_node.into_inner()),
678 metadata: self.location.new_node_metadata::<T>(),
679 },
680 )
681 }
682
683 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
684 Optional::new(
685 self.location.clone(),
686 HydroNode::Delta {
687 inner: Box::new(self.ir_node.into_inner()),
688 metadata: self.location.new_node_metadata::<T>(),
689 },
690 )
691 }
692}