1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11 TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::{Bounded, Optional, Stream, Unbounded};
17
18pub struct Singleton<Type, Loc, Bound> {
19 pub(crate) location: Loc,
20 pub(crate) ir_node: RefCell<HydroNode>,
21
22 _phantom: PhantomData<(Type, Loc, Bound)>,
23}
24
25impl<'a, T, L, B> Singleton<T, L, B>
26where
27 L: Location<'a>,
28{
29 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
30 Singleton {
31 location,
32 ir_node: RefCell::new(ir_node),
33 _phantom: PhantomData,
34 }
35 }
36
37 fn location_kind(&self) -> LocationId {
38 self.location.id()
39 }
40}
41
42impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
43where
44 L: Location<'a>,
45{
46 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
47 Singleton::new(singleton.location, singleton.ir_node.into_inner())
48 }
49}
50
51impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
52where
53 L: Location<'a>,
54{
55 fn defer_tick(self) -> Self {
56 Singleton::defer_tick(self)
57 }
58}
59
60impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
61where
62 L: Location<'a>,
63{
64 type Location = Tick<L>;
65
66 fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
67 let location_id = location.id();
68 Singleton::new(
69 location.clone(),
70 HydroNode::Chain {
71 first: Box::new(HydroNode::CycleSource {
72 ident,
73 location_kind: location_id,
74 metadata: location.new_node_metadata::<T>(),
75 }),
76 second: initial.ir_node.into_inner().into(),
77 metadata: location.new_node_metadata::<T>(),
78 },
79 )
80 }
81}
82
83impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
84where
85 L: Location<'a>,
86{
87 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
88 assert_eq!(
89 self.location.id(),
90 expected_location,
91 "locations do not match"
92 );
93 self.location
94 .flow_state()
95 .borrow_mut()
96 .leaves
97 .as_mut()
98 .expect(FLOW_USED_MESSAGE)
99 .push(HydroLeaf::CycleSink {
100 ident,
101 location_kind: self.location_kind(),
102 input: Box::new(self.ir_node.into_inner()),
103 metadata: self.location.new_node_metadata::<T>(),
104 });
105 }
106}
107
108impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
109where
110 L: Location<'a>,
111{
112 type Location = Tick<L>;
113
114 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
115 let location_id = location.id();
116 Singleton::new(
117 location.clone(),
118 HydroNode::CycleSource {
119 ident,
120 location_kind: location_id,
121 metadata: location.new_node_metadata::<T>(),
122 },
123 )
124 }
125}
126
127impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
128where
129 L: Location<'a>,
130{
131 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
132 assert_eq!(
133 self.location.id(),
134 expected_location,
135 "locations do not match"
136 );
137 self.location
138 .flow_state()
139 .borrow_mut()
140 .leaves
141 .as_mut()
142 .expect(FLOW_USED_MESSAGE)
143 .push(HydroLeaf::CycleSink {
144 ident,
145 location_kind: self.location_kind(),
146 input: Box::new(self.ir_node.into_inner()),
147 metadata: self.location.new_node_metadata::<T>(),
148 });
149 }
150}
151
152impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 type Location = L;
157
158 fn create_source(ident: syn::Ident, location: L) -> Self {
159 let location_id = location.id();
160 Singleton::new(
161 location.clone(),
162 HydroNode::Persist {
163 inner: Box::new(HydroNode::CycleSource {
164 ident,
165 location_kind: location_id,
166 metadata: location.new_node_metadata::<T>(),
167 }),
168 metadata: location.new_node_metadata::<T>(),
169 },
170 )
171 }
172}
173
174impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
175where
176 L: Location<'a> + NoTick,
177{
178 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
179 assert_eq!(
180 self.location.id(),
181 expected_location,
182 "locations do not match"
183 );
184 let metadata = self.location.new_node_metadata::<T>();
185 self.location
186 .flow_state()
187 .borrow_mut()
188 .leaves
189 .as_mut()
190 .expect(FLOW_USED_MESSAGE)
191 .push(HydroLeaf::CycleSink {
192 ident,
193 location_kind: self.location_kind(),
194 input: Box::new(HydroNode::Unpersist {
195 inner: Box::new(self.ir_node.into_inner()),
196 metadata: metadata.clone(),
197 }),
198 metadata,
199 });
200 }
201}
202
203impl<'a, T, L, B> Clone for Singleton<T, L, B>
204where
205 T: Clone,
206 L: Location<'a>,
207{
208 fn clone(&self) -> Self {
209 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
210 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
211 *self.ir_node.borrow_mut() = HydroNode::Tee {
212 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
213 metadata: self.location.new_node_metadata::<T>(),
214 };
215 }
216
217 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
218 Singleton {
219 location: self.location.clone(),
220 ir_node: HydroNode::Tee {
221 inner: TeeNode(inner.0.clone()),
222 metadata: metadata.clone(),
223 }
224 .into(),
225 _phantom: PhantomData,
226 }
227 } else {
228 unreachable!()
229 }
230 }
231}
232
233impl<'a, T, L, B> Singleton<T, L, B>
234where
235 L: Location<'a>,
236{
237 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
238 where
239 F: Fn(T) -> U + 'a,
240 {
241 let f = f.splice_fn1_ctx(&self.location).into();
242 Singleton::new(
243 self.location.clone(),
244 HydroNode::Map {
245 f,
246 input: Box::new(self.ir_node.into_inner()),
247 metadata: self.location.new_node_metadata::<U>(),
248 },
249 )
250 }
251
252 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B>
253 where
254 I: IntoIterator<Item = U>,
255 F: Fn(T) -> I + 'a,
256 {
257 let f = f.splice_fn1_ctx(&self.location).into();
258 Stream::new(
259 self.location.clone(),
260 HydroNode::FlatMap {
261 f,
262 input: Box::new(self.ir_node.into_inner()),
263 metadata: self.location.new_node_metadata::<U>(),
264 },
265 )
266 }
267
268 pub fn flat_map_unordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B>
269 where
270 I: IntoIterator<Item = U>,
271 F: Fn(T) -> I + 'a,
272 {
273 let f = f.splice_fn1_ctx(&self.location).into();
274 Stream::new(
275 self.location.clone(),
276 HydroNode::FlatMap {
277 f,
278 input: Box::new(self.ir_node.into_inner()),
279 metadata: self.location.new_node_metadata::<U>(),
280 },
281 )
282 }
283
284 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
285 where
286 F: Fn(&T) -> bool + 'a,
287 {
288 let f = f.splice_fn1_borrow_ctx(&self.location).into();
289 Optional::new(
290 self.location.clone(),
291 HydroNode::Filter {
292 f,
293 input: Box::new(self.ir_node.into_inner()),
294 metadata: self.location.new_node_metadata::<T>(),
295 },
296 )
297 }
298
299 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
300 where
301 F: Fn(T) -> Option<U> + 'a,
302 {
303 let f = f.splice_fn1_ctx(&self.location).into();
304 Optional::new(
305 self.location.clone(),
306 HydroNode::FilterMap {
307 f,
308 input: Box::new(self.ir_node.into_inner()),
309 metadata: self.location.new_node_metadata::<U>(),
310 },
311 )
312 }
313
314 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
315 where
316 Self: ZipResult<'a, O, Location = L>,
317 {
318 check_matching_location(&self.location, &Self::other_location(&other));
319
320 if L::is_top_level() {
321 let left_ir_node = self.ir_node.into_inner();
322 let left_ir_node_metadata = left_ir_node.metadata().clone();
323 let right_ir_node = Self::other_ir_node(other);
324 let right_ir_node_metadata = right_ir_node.metadata().clone();
325
326 Self::make(
327 self.location.clone(),
328 HydroNode::Persist {
329 inner: Box::new(HydroNode::CrossSingleton {
330 left: Box::new(HydroNode::Unpersist {
331 inner: Box::new(left_ir_node),
332 metadata: left_ir_node_metadata,
333 }),
334 right: Box::new(HydroNode::Unpersist {
335 inner: Box::new(right_ir_node),
336 metadata: right_ir_node_metadata,
337 }),
338 metadata: self
339 .location
340 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
341 }),
342 metadata: self
343 .location
344 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
345 },
346 )
347 } else {
348 Self::make(
349 self.location.clone(),
350 HydroNode::CrossSingleton {
351 left: Box::new(self.ir_node.into_inner()),
352 right: Box::new(Self::other_ir_node(other)),
353 metadata: self
354 .location
355 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
356 },
357 )
358 }
359 }
360
361 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
362 where
363 Self: ZipResult<
364 'a,
365 Optional<(), L, Bounded>,
366 Location = L,
367 Out = Optional<(T, ()), L, Bounded>,
368 >,
369 {
370 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
371 }
372
373 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
374 where
375 Singleton<T, L, B>: ZipResult<
376 'a,
377 Optional<(), L, Bounded>,
378 Location = L,
379 Out = Optional<(T, ()), L, Bounded>,
380 >,
381 {
382 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
383 }
384}
385
386impl<'a, T, L, B> Singleton<T, Atomic<L>, B>
387where
388 L: Location<'a> + NoTick,
389{
390 pub unsafe fn latest_tick(self) -> Singleton<T, Tick<L>, Bounded> {
399 Singleton::new(
400 self.location.clone().tick,
401 HydroNode::Unpersist {
402 inner: Box::new(self.ir_node.into_inner()),
403 metadata: self.location.new_node_metadata::<T>(),
404 },
405 )
406 }
407
408 pub fn end_atomic(self) -> Optional<T, L, B> {
409 Optional::new(self.location.tick.l, self.ir_node.into_inner())
410 }
411}
412
413impl<'a, T, L, B> Singleton<T, L, B>
414where
415 L: Location<'a> + NoTick + NoAtomic,
416{
417 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
418 Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
419 }
420
421 pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
430 where
431 L: NoTick,
432 {
433 unsafe { self.atomic(tick).latest_tick() }
434 }
435
436 pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
444 let tick = self.location.tick();
445
446 unsafe {
447 self.latest_tick(&tick).all_ticks()
449 }
450 }
451
452 pub unsafe fn sample_every(
462 self,
463 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
464 ) -> Stream<T, L, Unbounded> {
465 let samples = unsafe {
466 self.location.source_interval(interval)
468 };
469 let tick = self.location.tick();
470
471 unsafe {
472 self.latest_tick(&tick)
474 .continue_if(samples.tick_batch(&tick).first())
475 .all_ticks()
476 }
477 }
478}
479
480impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
481where
482 L: Location<'a>,
483{
484 pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
485 Stream::new(
486 self.location.outer().clone(),
487 HydroNode::Persist {
488 inner: Box::new(self.ir_node.into_inner()),
489 metadata: self.location.new_node_metadata::<T>(),
490 },
491 )
492 }
493
494 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
495 Stream::new(
496 Atomic {
497 tick: self.location.clone(),
498 },
499 HydroNode::Persist {
500 inner: Box::new(self.ir_node.into_inner()),
501 metadata: self.location.new_node_metadata::<T>(),
502 },
503 )
504 }
505
506 pub fn latest(self) -> Singleton<T, L, Unbounded> {
507 Singleton::new(
508 self.location.outer().clone(),
509 HydroNode::Persist {
510 inner: Box::new(self.ir_node.into_inner()),
511 metadata: self.location.new_node_metadata::<T>(),
512 },
513 )
514 }
515
516 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
517 Singleton::new(
518 Atomic {
519 tick: self.location.clone(),
520 },
521 HydroNode::Persist {
522 inner: Box::new(self.ir_node.into_inner()),
523 metadata: self.location.new_node_metadata::<T>(),
524 },
525 )
526 }
527
528 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
529 Singleton::new(
530 self.location.clone(),
531 HydroNode::DeferTick {
532 input: Box::new(self.ir_node.into_inner()),
533 metadata: self.location.new_node_metadata::<T>(),
534 },
535 )
536 }
537
538 pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
539 Stream::new(
540 self.location.clone(),
541 HydroNode::Persist {
542 inner: Box::new(self.ir_node.into_inner()),
543 metadata: self.location.new_node_metadata::<T>(),
544 },
545 )
546 }
547
548 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
549 Optional::new(
550 self.location.clone(),
551 HydroNode::Delta {
552 inner: Box::new(self.ir_node.into_inner()),
553 metadata: self.location.new_node_metadata::<T>(),
554 },
555 )
556 }
557
558 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded> {
559 Stream::new(self.location, self.ir_node.into_inner())
560 }
561}
562
563pub trait ZipResult<'a, Other> {
564 type Out;
565 type ElementType;
566 type Location;
567
568 fn other_location(other: &Other) -> Self::Location;
569 fn other_ir_node(other: Other) -> HydroNode;
570
571 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
572}
573
574impl<'a, T, U, L, B> ZipResult<'a, Singleton<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
575where
576 U: Clone,
577 L: Location<'a>,
578{
579 type Out = Singleton<(T, U), Tick<L>, B>;
580 type ElementType = (T, U);
581 type Location = Tick<L>;
582
583 fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
584 other.location.clone()
585 }
586
587 fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
588 other.ir_node.into_inner()
589 }
590
591 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
592 Singleton::new(location, ir_node)
593 }
594}
595
596impl<'a, T, U, L, B> ZipResult<'a, Optional<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
597where
598 U: Clone,
599 L: Location<'a>,
600{
601 type Out = Optional<(T, U), Tick<L>, B>;
602 type ElementType = (T, U);
603 type Location = Tick<L>;
604
605 fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
606 other.location.clone()
607 }
608
609 fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
610 other.ir_node.into_inner()
611 }
612
613 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
614 Optional::new(location, ir_node)
615 }
616}