1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::NoOrder;
16use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded};
17
18pub struct Optional<T, L, B> {
19 pub(crate) location: L,
20 pub(crate) ir_node: RefCell<HydroNode>,
21
22 _phantom: PhantomData<(T, L, B)>,
23}
24
25impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
26 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
27 Optional {
28 location,
29 ir_node: RefCell::new(ir_node),
30 _phantom: PhantomData,
31 }
32 }
33
34 pub fn some(singleton: Singleton<T, L, B>) -> Self {
35 Optional::new(singleton.location, singleton.ir_node.into_inner())
36 }
37
38 fn location_kind(&self) -> LocationId {
39 self.location.id()
40 }
41}
42
43impl<'a, T, L: Location<'a>> DeferTick for Optional<T, Tick<L>, Bounded> {
44 fn defer_tick(self) -> Self {
45 Optional::defer_tick(self)
46 }
47}
48
49impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker>
50 for Optional<T, Tick<L>, Bounded>
51{
52 type Location = Tick<L>;
53
54 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
55 let location_id = location.id();
56 Optional::new(
57 location.clone(),
58 HydroNode::CycleSource {
59 ident,
60 location_kind: location_id,
61 metadata: location.new_node_metadata::<T>(),
62 },
63 )
64 }
65}
66
67impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded> {
68 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
69 assert_eq!(
70 self.location.id(),
71 expected_location,
72 "locations do not match"
73 );
74 self.location
75 .flow_state()
76 .borrow_mut()
77 .leaves
78 .as_mut()
79 .expect(FLOW_USED_MESSAGE)
80 .push(HydroLeaf::CycleSink {
81 ident,
82 location_kind: self.location_kind(),
83 input: Box::new(self.ir_node.into_inner()),
84 metadata: self.location.new_node_metadata::<T>(),
85 });
86 }
87}
88
89impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker>
90 for Optional<T, Tick<L>, Bounded>
91{
92 type Location = Tick<L>;
93
94 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95 let location_id = location.id();
96 Optional::new(
97 location.clone(),
98 HydroNode::CycleSource {
99 ident,
100 location_kind: location_id,
101 metadata: location.new_node_metadata::<T>(),
102 },
103 )
104 }
105}
106
107impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded> {
108 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
109 assert_eq!(
110 self.location.id(),
111 expected_location,
112 "locations do not match"
113 );
114 self.location
115 .flow_state()
116 .borrow_mut()
117 .leaves
118 .as_mut()
119 .expect(FLOW_USED_MESSAGE)
120 .push(HydroLeaf::CycleSink {
121 ident,
122 location_kind: self.location_kind(),
123 input: Box::new(self.ir_node.into_inner()),
124 metadata: self.location.new_node_metadata::<T>(),
125 });
126 }
127}
128
129impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker>
130 for Optional<T, L, B>
131{
132 type Location = L;
133
134 fn create_source(ident: syn::Ident, location: L) -> Self {
135 let location_id = location.id();
136 Optional::new(
137 location.clone(),
138 HydroNode::Persist {
139 inner: Box::new(HydroNode::CycleSource {
140 ident,
141 location_kind: location_id,
142 metadata: location.new_node_metadata::<T>(),
143 }),
144 metadata: location.new_node_metadata::<T>(),
145 },
146 )
147 }
148}
149
150impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B> {
151 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
152 assert_eq!(
153 self.location.id(),
154 expected_location,
155 "locations do not match"
156 );
157 let metadata = self.location.new_node_metadata::<T>();
158 self.location
159 .flow_state()
160 .borrow_mut()
161 .leaves
162 .as_mut()
163 .expect(FLOW_USED_MESSAGE)
164 .push(HydroLeaf::CycleSink {
165 ident,
166 location_kind: self.location_kind(),
167 input: Box::new(HydroNode::Unpersist {
168 inner: Box::new(self.ir_node.into_inner()),
169 metadata: metadata.clone(),
170 }),
171 metadata,
172 });
173 }
174}
175
176impl<'a, T, L: Location<'a>> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded> {
177 fn from(singleton: Optional<T, L, Bounded>) -> Self {
178 Optional::new(singleton.location, singleton.ir_node.into_inner())
179 }
180}
181
182impl<'a, T, L: Location<'a>, B> From<Singleton<T, L, B>> for Optional<T, L, B> {
183 fn from(singleton: Singleton<T, L, B>) -> Self {
184 Optional::some(singleton)
185 }
186}
187
188impl<'a, T: Clone, L: Location<'a>, B> Clone for Optional<T, L, B> {
189 fn clone(&self) -> Self {
190 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192 *self.ir_node.borrow_mut() = HydroNode::Tee {
193 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194 metadata: self.location.new_node_metadata::<T>(),
195 };
196 }
197
198 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199 Optional {
200 location: self.location.clone(),
201 ir_node: HydroNode::Tee {
202 inner: TeeNode(inner.0.clone()),
203 metadata: metadata.clone(),
204 }
205 .into(),
206 _phantom: PhantomData,
207 }
208 } else {
209 unreachable!()
210 }
211 }
212}
213
214impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
215 pub fn map<U, F: Fn(T) -> U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B> {
234 let f = f.splice_fn1_ctx(&self.location).into();
235 Optional::new(
236 self.location.clone(),
237 HydroNode::Map {
238 f,
239 input: Box::new(self.ir_node.into_inner()),
240 metadata: self.location.new_node_metadata::<U>(),
241 },
242 )
243 }
244
245 pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
246 self,
247 f: impl IntoQuotedMut<'a, F, L>,
248 ) -> Stream<U, L, B> {
249 let f = f.splice_fn1_ctx(&self.location).into();
250 Stream::new(
251 self.location.clone(),
252 HydroNode::FlatMap {
253 f,
254 input: Box::new(self.ir_node.into_inner()),
255 metadata: self.location.new_node_metadata::<U>(),
256 },
257 )
258 }
259
260 pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
261 self,
262 f: impl IntoQuotedMut<'a, F, L>,
263 ) -> Stream<U, L, B, NoOrder> {
264 let f = f.splice_fn1_ctx(&self.location).into();
265 Stream::new(
266 self.location.clone(),
267 HydroNode::FlatMap {
268 f,
269 input: Box::new(self.ir_node.into_inner()),
270 metadata: self.location.new_node_metadata::<U>(),
271 },
272 )
273 }
274
275 pub fn flatten_ordered<U>(self) -> Stream<U, L, B>
276 where
277 T: IntoIterator<Item = U>,
278 {
279 self.flat_map_ordered(q!(|v| v))
280 }
281
282 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
283 where
284 T: IntoIterator<Item = U>,
285 {
286 self.flat_map_unordered(q!(|v| v))
287 }
288
289 pub fn filter<F: Fn(&T) -> bool + 'a>(
290 self,
291 f: impl IntoQuotedMut<'a, F, L>,
292 ) -> Optional<T, L, B> {
293 let f = f.splice_fn1_borrow_ctx(&self.location).into();
294 Optional::new(
295 self.location.clone(),
296 HydroNode::Filter {
297 f,
298 input: Box::new(self.ir_node.into_inner()),
299 metadata: self.location.new_node_metadata::<T>(),
300 },
301 )
302 }
303
304 pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
305 self,
306 f: impl IntoQuotedMut<'a, F, L>,
307 ) -> Optional<U, L, B> {
308 let f = f.splice_fn1_ctx(&self.location).into();
309 Optional::new(
310 self.location.clone(),
311 HydroNode::FilterMap {
312 f,
313 input: Box::new(self.ir_node.into_inner()),
314 metadata: self.location.new_node_metadata::<U>(),
315 },
316 )
317 }
318
319 pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
320 check_matching_location(&self.location, &other.location);
321
322 if L::is_top_level() {
323 Optional::new(
324 self.location.clone(),
325 HydroNode::Persist {
326 inner: Box::new(HydroNode::Chain {
327 first: Box::new(HydroNode::Unpersist {
328 inner: Box::new(self.ir_node.into_inner()),
329 metadata: self.location.new_node_metadata::<T>(),
330 }),
331 second: Box::new(HydroNode::Unpersist {
332 inner: Box::new(other.ir_node.into_inner()),
333 metadata: self.location.new_node_metadata::<T>(),
334 }),
335 metadata: self.location.new_node_metadata::<T>(),
336 }),
337 metadata: self.location.new_node_metadata::<T>(),
338 },
339 )
340 } else {
341 Optional::new(
342 self.location.clone(),
343 HydroNode::Chain {
344 first: Box::new(self.ir_node.into_inner()),
345 second: Box::new(other.ir_node.into_inner()),
346 metadata: self.location.new_node_metadata::<T>(),
347 },
348 )
349 }
350 }
351
352 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
353 where
354 O: Clone,
355 {
356 let other: Optional<O, L, B> = other.into();
357 check_matching_location(&self.location, &other.location);
358
359 if L::is_top_level() {
360 Optional::new(
361 self.location.clone(),
362 HydroNode::Persist {
363 inner: Box::new(HydroNode::CrossSingleton {
364 left: Box::new(HydroNode::Unpersist {
365 inner: Box::new(self.ir_node.into_inner()),
366 metadata: self.location.new_node_metadata::<T>(),
367 }),
368 right: Box::new(HydroNode::Unpersist {
369 inner: Box::new(other.ir_node.into_inner()),
370 metadata: self.location.new_node_metadata::<O>(),
371 }),
372 metadata: self.location.new_node_metadata::<(T, O)>(),
373 }),
374 metadata: self.location.new_node_metadata::<(T, O)>(),
375 },
376 )
377 } else {
378 Optional::new(
379 self.location.clone(),
380 HydroNode::CrossSingleton {
381 left: Box::new(self.ir_node.into_inner()),
382 right: Box::new(other.ir_node.into_inner()),
383 metadata: self.location.new_node_metadata::<(T, O)>(),
384 },
385 )
386 }
387 }
388
389 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
390 check_matching_location(&self.location, &other.location);
391
392 if L::is_top_level() {
393 Singleton::new(
394 self.location.clone(),
395 HydroNode::Persist {
396 inner: Box::new(HydroNode::Chain {
397 first: Box::new(HydroNode::Unpersist {
398 inner: Box::new(self.ir_node.into_inner()),
399 metadata: self.location.new_node_metadata::<T>(),
400 }),
401 second: Box::new(HydroNode::Unpersist {
402 inner: Box::new(other.ir_node.into_inner()),
403 metadata: self.location.new_node_metadata::<T>(),
404 }),
405 metadata: self.location.new_node_metadata::<T>(),
406 }),
407 metadata: self.location.new_node_metadata::<T>(),
408 },
409 )
410 } else {
411 Singleton::new(
412 self.location.clone(),
413 HydroNode::Chain {
414 first: Box::new(self.ir_node.into_inner()),
415 second: Box::new(other.ir_node.into_inner()),
416 metadata: self.location.new_node_metadata::<T>(),
417 },
418 )
419 }
420 }
421
422 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
423 where
424 T: Clone,
425 {
426 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
427 let core_ir = HydroNode::Persist {
428 inner: Box::new(HydroNode::Source {
429 source: HydroSource::Iter(none.into()),
430 location_kind: self.location.id().root().clone(),
431 metadata: self.location.new_node_metadata::<Option<T>>(),
432 }),
433 metadata: self.location.new_node_metadata::<Option<T>>(),
434 };
435
436 let none_singleton = if L::is_top_level() {
437 Singleton::new(
438 self.location.clone(),
439 HydroNode::Persist {
440 inner: Box::new(core_ir),
441 metadata: self.location.new_node_metadata::<Option<T>>(),
442 },
443 )
444 } else {
445 Singleton::new(self.location.clone(), core_ir)
446 };
447
448 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
449 }
450}
451
452impl<'a, T, L: Location<'a>> Optional<T, L, Bounded> {
453 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
454 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
455 }
456
457 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
458 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
459 }
460
461 pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
462 where
463 Singleton<U, L, Bounded>: ZipResult<
464 'a,
465 Optional<(), L, Bounded>,
466 Location = L,
467 Out = Optional<(U, ()), L, Bounded>,
468 >,
469 {
470 value.continue_if(self)
471 }
472
473 pub fn into_stream(self) -> Stream<T, L, Bounded> {
474 if L::is_top_level() {
475 panic!("Converting an optional to a stream is not yet supported at the top level");
476 }
477
478 Stream::new(self.location, self.ir_node.into_inner())
479 }
480}
481
482impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, Atomic<L>, B> {
483 pub unsafe fn latest_tick(self) -> Optional<T, Tick<L>, Bounded> {
492 Optional::new(
493 self.location.clone().tick,
494 HydroNode::Unpersist {
495 inner: Box::new(self.ir_node.into_inner()),
496 metadata: self.location.new_node_metadata::<T>(),
497 },
498 )
499 }
500
501 pub fn end_atomic(self) -> Optional<T, L, B> {
502 Optional::new(self.location.tick.l, self.ir_node.into_inner())
503 }
504}
505
506impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Optional<T, L, B> {
507 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
508 Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
509 }
510
511 pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
520 unsafe { self.atomic(tick).latest_tick() }
521 }
522
523 pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
531 let tick = self.location.tick();
532
533 unsafe {
534 self.latest_tick(&tick).all_ticks()
536 }
537 }
538
539 pub unsafe fn sample_every(
549 self,
550 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
551 ) -> Stream<T, L, Unbounded>
552 where
553 L: NoAtomic,
554 {
555 let samples = unsafe {
556 self.location.source_interval(interval)
558 };
559 let tick = self.location.tick();
560
561 unsafe {
562 self.latest_tick(&tick)
564 .continue_if(samples.tick_batch(&tick).first())
565 .all_ticks()
566 }
567 }
568}
569
570impl<'a, T, L: Location<'a>> Optional<T, Tick<L>, Bounded> {
571 pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
572 Stream::new(
573 self.location.outer().clone(),
574 HydroNode::Persist {
575 inner: Box::new(self.ir_node.into_inner()),
576 metadata: self.location.new_node_metadata::<T>(),
577 },
578 )
579 }
580
581 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
582 Stream::new(
583 Atomic {
584 tick: self.location.clone(),
585 },
586 HydroNode::Persist {
587 inner: Box::new(self.ir_node.into_inner()),
588 metadata: self.location.new_node_metadata::<T>(),
589 },
590 )
591 }
592
593 pub fn latest(self) -> Optional<T, L, Unbounded> {
594 Optional::new(
595 self.location.outer().clone(),
596 HydroNode::Persist {
597 inner: Box::new(self.ir_node.into_inner()),
598 metadata: self.location.new_node_metadata::<T>(),
599 },
600 )
601 }
602
603 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
604 Optional::new(
605 Atomic {
606 tick: self.location.clone(),
607 },
608 HydroNode::Persist {
609 inner: Box::new(self.ir_node.into_inner()),
610 metadata: self.location.new_node_metadata::<T>(),
611 },
612 )
613 }
614
615 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
616 Optional::new(
617 self.location.clone(),
618 HydroNode::DeferTick {
619 input: Box::new(self.ir_node.into_inner()),
620 metadata: self.location.new_node_metadata::<T>(),
621 },
622 )
623 }
624
625 pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
626 Stream::new(
627 self.location.clone(),
628 HydroNode::Persist {
629 inner: Box::new(self.ir_node.into_inner()),
630 metadata: self.location.new_node_metadata::<T>(),
631 },
632 )
633 }
634
635 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
636 Optional::new(
637 self.location.clone(),
638 HydroNode::Delta {
639 inner: Box::new(self.ir_node.into_inner()),
640 metadata: self.location.new_node_metadata::<T>(),
641 },
642 )
643 }
644}