1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::boundedness::Boundedness;
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{
11 CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
12 TickCycleMarker,
13};
14use crate::ir::{HydroLeaf, HydroNode, TeeNode};
15use crate::location::tick::{Atomic, NoAtomic};
16use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
17use crate::stream::{AtLeastOnce, ExactlyOnce};
18use crate::unsafety::NonDet;
19use crate::{Bounded, NoOrder, Optional, Stream, TotalOrder, Unbounded};
20
21pub struct Singleton<Type, Loc, Bound: Boundedness> {
22 pub(crate) location: Loc,
23 pub(crate) ir_node: RefCell<HydroNode>,
24
25 _phantom: PhantomData<(Type, Loc, Bound)>,
26}
27
28impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
29where
30 L: Location<'a>,
31{
32 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
33 Singleton::new(singleton.location, singleton.ir_node.into_inner())
34 }
35}
36
37impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
38where
39 L: Location<'a>,
40{
41 fn defer_tick(self) -> Self {
42 Singleton::defer_tick(self)
43 }
44}
45
46impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
47where
48 L: Location<'a>,
49{
50 type Location = Tick<L>;
51
52 fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
53 Singleton::new(
54 location.clone(),
55 HydroNode::Chain {
56 first: Box::new(HydroNode::CycleSource {
57 ident,
58 metadata: location.new_node_metadata::<T>(),
59 }),
60 second: initial
61 .continue_if(location.optional_first_tick(q!(())))
62 .ir_node
63 .into_inner()
64 .into(),
65 metadata: location.new_node_metadata::<T>(),
66 },
67 )
68 }
69}
70
71impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
72where
73 L: Location<'a>,
74{
75 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
76 assert_eq!(
77 self.location.id(),
78 expected_location,
79 "locations do not match"
80 );
81 self.location
82 .flow_state()
83 .borrow_mut()
84 .leaves
85 .as_mut()
86 .expect(FLOW_USED_MESSAGE)
87 .push(HydroLeaf::CycleSink {
88 ident,
89 input: Box::new(self.ir_node.into_inner()),
90 metadata: self.location.new_node_metadata::<T>(),
91 });
92 }
93}
94
95impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
96where
97 L: Location<'a>,
98{
99 type Location = Tick<L>;
100
101 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
102 Singleton::new(
103 location.clone(),
104 HydroNode::CycleSource {
105 ident,
106 metadata: location.new_node_metadata::<T>(),
107 },
108 )
109 }
110}
111
112impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
113where
114 L: Location<'a>,
115{
116 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
117 assert_eq!(
118 self.location.id(),
119 expected_location,
120 "locations do not match"
121 );
122 self.location
123 .flow_state()
124 .borrow_mut()
125 .leaves
126 .as_mut()
127 .expect(FLOW_USED_MESSAGE)
128 .push(HydroLeaf::CycleSink {
129 ident,
130 input: Box::new(self.ir_node.into_inner()),
131 metadata: self.location.new_node_metadata::<T>(),
132 });
133 }
134}
135
136impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
137where
138 L: Location<'a> + NoTick,
139{
140 type Location = L;
141
142 fn create_source(ident: syn::Ident, location: L) -> Self {
143 Singleton::new(
144 location.clone(),
145 HydroNode::Persist {
146 inner: Box::new(HydroNode::CycleSource {
147 ident,
148 metadata: location.new_node_metadata::<T>(),
149 }),
150 metadata: location.new_node_metadata::<T>(),
151 },
152 )
153 }
154}
155
156impl<'a, T, L, B: Boundedness> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
157where
158 L: Location<'a> + NoTick,
159{
160 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
161 assert_eq!(
162 self.location.id(),
163 expected_location,
164 "locations do not match"
165 );
166 let metadata = self.location.new_node_metadata::<T>();
167 self.location
168 .flow_state()
169 .borrow_mut()
170 .leaves
171 .as_mut()
172 .expect(FLOW_USED_MESSAGE)
173 .push(HydroLeaf::CycleSink {
174 ident,
175 input: Box::new(HydroNode::Unpersist {
176 inner: Box::new(self.ir_node.into_inner()),
177 metadata: metadata.clone(),
178 }),
179 metadata,
180 });
181 }
182}
183
184impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
185where
186 T: Clone,
187 L: Location<'a>,
188{
189 fn clone(&self) -> Self {
190 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192 *self.ir_node.borrow_mut() = HydroNode::Tee {
193 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194 metadata: self.location.new_node_metadata::<T>(),
195 };
196 }
197
198 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199 Singleton {
200 location: self.location.clone(),
201 ir_node: HydroNode::Tee {
202 inner: TeeNode(inner.0.clone()),
203 metadata: metadata.clone(),
204 }
205 .into(),
206 _phantom: PhantomData,
207 }
208 } else {
209 unreachable!()
210 }
211 }
212}
213
214impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
215where
216 L: Location<'a>,
217{
218 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
219 Singleton {
220 location,
221 ir_node: RefCell::new(ir_node),
222 _phantom: PhantomData,
223 }
224 }
225
226 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
227 where
228 F: Fn(T) -> U + 'a,
229 {
230 let f = f.splice_fn1_ctx(&self.location).into();
231 Singleton::new(
232 self.location.clone(),
233 HydroNode::Map {
234 f,
235 input: Box::new(self.ir_node.into_inner()),
236 metadata: self.location.new_node_metadata::<U>(),
237 },
238 )
239 }
240
241 pub fn flat_map_ordered<U, I, F>(
242 self,
243 f: impl IntoQuotedMut<'a, F, L>,
244 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
245 where
246 I: IntoIterator<Item = U>,
247 F: Fn(T) -> I + 'a,
248 {
249 let f = f.splice_fn1_ctx(&self.location).into();
250 Stream::new(
251 self.location.clone(),
252 HydroNode::FlatMap {
253 f,
254 input: Box::new(self.ir_node.into_inner()),
255 metadata: self.location.new_node_metadata::<U>(),
256 },
257 )
258 }
259
260 pub fn flat_map_unordered<U, I, F>(
261 self,
262 f: impl IntoQuotedMut<'a, F, L>,
263 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
264 where
265 I: IntoIterator<Item = U>,
266 F: Fn(T) -> I + 'a,
267 {
268 let f = f.splice_fn1_ctx(&self.location).into();
269 Stream::new(
270 self.location.clone(),
271 HydroNode::FlatMap {
272 f,
273 input: Box::new(self.ir_node.into_inner()),
274 metadata: self.location.new_node_metadata::<U>(),
275 },
276 )
277 }
278
279 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
280 where
281 T: IntoIterator<Item = U>,
282 {
283 self.flat_map_ordered(q!(|x| x))
284 }
285
286 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
287 where
288 T: IntoIterator<Item = U>,
289 {
290 self.flat_map_unordered(q!(|x| x))
291 }
292
293 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
294 where
295 F: Fn(&T) -> bool + 'a,
296 {
297 let f = f.splice_fn1_borrow_ctx(&self.location).into();
298 Optional::new(
299 self.location.clone(),
300 HydroNode::Filter {
301 f,
302 input: Box::new(self.ir_node.into_inner()),
303 metadata: self.location.new_node_metadata::<T>(),
304 },
305 )
306 }
307
308 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
309 where
310 F: Fn(T) -> Option<U> + 'a,
311 {
312 let f = f.splice_fn1_ctx(&self.location).into();
313 Optional::new(
314 self.location.clone(),
315 HydroNode::FilterMap {
316 f,
317 input: Box::new(self.ir_node.into_inner()),
318 metadata: self.location.new_node_metadata::<U>(),
319 },
320 )
321 }
322
323 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
324 where
325 Self: ZipResult<'a, O, Location = L>,
326 {
327 check_matching_location(&self.location, &Self::other_location(&other));
328
329 if L::is_top_level() {
330 let left_ir_node = self.ir_node.into_inner();
331 let left_ir_node_metadata = left_ir_node.metadata().clone();
332 let right_ir_node = Self::other_ir_node(other);
333 let right_ir_node_metadata = right_ir_node.metadata().clone();
334
335 Self::make(
336 self.location.clone(),
337 HydroNode::Persist {
338 inner: Box::new(HydroNode::CrossSingleton {
339 left: Box::new(HydroNode::Unpersist {
340 inner: Box::new(left_ir_node),
341 metadata: left_ir_node_metadata,
342 }),
343 right: Box::new(HydroNode::Unpersist {
344 inner: Box::new(right_ir_node),
345 metadata: right_ir_node_metadata,
346 }),
347 metadata: self
348 .location
349 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
350 }),
351 metadata: self
352 .location
353 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
354 },
355 )
356 } else {
357 Self::make(
358 self.location.clone(),
359 HydroNode::CrossSingleton {
360 left: Box::new(self.ir_node.into_inner()),
361 right: Box::new(Self::other_ir_node(other)),
362 metadata: self
363 .location
364 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
365 },
366 )
367 }
368 }
369
370 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
371 where
372 Self: ZipResult<
373 'a,
374 Optional<(), L, Bounded>,
375 Location = L,
376 Out = Optional<(T, ()), L, Bounded>,
377 >,
378 {
379 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
380 }
381
382 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
383 where
384 Singleton<T, L, B>: ZipResult<
385 'a,
386 Optional<(), L, Bounded>,
387 Location = L,
388 Out = Optional<(T, ()), L, Bounded>,
389 >,
390 {
391 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
392 }
393
394 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
397 {
398 let mut node = self.ir_node.borrow_mut();
399 let metadata = node.metadata_mut();
400 metadata.tag = Some(name.to_string());
401 }
402 self
403 }
404}
405
406impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
407where
408 L: Location<'a> + NoTick,
409{
410 pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
419 Singleton::new(
420 self.location.clone().tick,
421 HydroNode::Unpersist {
422 inner: Box::new(self.ir_node.into_inner()),
423 metadata: self.location.new_node_metadata::<T>(),
424 },
425 )
426 }
427
428 pub fn end_atomic(self) -> Optional<T, L, B> {
429 Optional::new(self.location.tick.l, self.ir_node.into_inner())
430 }
431}
432
433impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
434where
435 L: Location<'a> + NoTick + NoAtomic,
436{
437 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
438 Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
439 }
440
441 pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
450 where
451 L: NoTick,
452 {
453 self.atomic(tick).snapshot(nondet)
454 }
455
456 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
464 let tick = self.location.tick();
465 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
466 }
467
468 pub fn sample_every(
478 self,
479 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
480 nondet: NonDet,
481 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
482 let samples = self.location.source_interval(interval, nondet);
483 let tick = self.location.tick();
484
485 self.snapshot(&tick, nondet)
486 .continue_if(samples.batch(&tick, nondet).first())
487 .all_ticks()
488 .weakest_retries()
489 }
490}
491
492impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
493where
494 L: Location<'a>,
495{
496 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
497 Stream::new(
498 self.location.outer().clone(),
499 HydroNode::Persist {
500 inner: Box::new(self.ir_node.into_inner()),
501 metadata: self.location.new_node_metadata::<T>(),
502 },
503 )
504 }
505
506 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
507 Stream::new(
508 Atomic {
509 tick: self.location.clone(),
510 },
511 HydroNode::Persist {
512 inner: Box::new(self.ir_node.into_inner()),
513 metadata: self.location.new_node_metadata::<T>(),
514 },
515 )
516 }
517
518 pub fn latest(self) -> Singleton<T, L, Unbounded> {
519 Singleton::new(
520 self.location.outer().clone(),
521 HydroNode::Persist {
522 inner: Box::new(self.ir_node.into_inner()),
523 metadata: self.location.new_node_metadata::<T>(),
524 },
525 )
526 }
527
528 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
529 Singleton::new(
530 Atomic {
531 tick: self.location.clone(),
532 },
533 HydroNode::Persist {
534 inner: Box::new(self.ir_node.into_inner()),
535 metadata: self.location.new_node_metadata::<T>(),
536 },
537 )
538 }
539
540 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
541 Singleton::new(
542 self.location.clone(),
543 HydroNode::DeferTick {
544 input: Box::new(self.ir_node.into_inner()),
545 metadata: self.location.new_node_metadata::<T>(),
546 },
547 )
548 }
549
550 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
551 Stream::new(
552 self.location.clone(),
553 HydroNode::Persist {
554 inner: Box::new(self.ir_node.into_inner()),
555 metadata: self.location.new_node_metadata::<T>(),
556 },
557 )
558 }
559
560 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
561 Optional::new(
562 self.location.clone(),
563 HydroNode::Delta {
564 inner: Box::new(self.ir_node.into_inner()),
565 metadata: self.location.new_node_metadata::<T>(),
566 },
567 )
568 }
569
570 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
571 Stream::new(self.location, self.ir_node.into_inner())
572 }
573}
574
575pub trait ZipResult<'a, Other> {
576 type Out;
577 type ElementType;
578 type Location;
579
580 fn other_location(other: &Other) -> Self::Location;
581 fn other_ir_node(other: Other) -> HydroNode;
582
583 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
584}
585
586impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, Tick<L>, B>>
587 for Singleton<T, Tick<L>, B>
588where
589 U: Clone,
590 L: Location<'a>,
591{
592 type Out = Singleton<(T, U), Tick<L>, B>;
593 type ElementType = (T, U);
594 type Location = Tick<L>;
595
596 fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
597 other.location.clone()
598 }
599
600 fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
601 other.ir_node.into_inner()
602 }
603
604 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
605 Singleton::new(location, ir_node)
606 }
607}
608
609impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, Tick<L>, B>>
610 for Singleton<T, Tick<L>, B>
611where
612 U: Clone,
613 L: Location<'a>,
614{
615 type Out = Optional<(T, U), Tick<L>, B>;
616 type ElementType = (T, U);
617 type Location = Tick<L>;
618
619 fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
620 other.location.clone()
621 }
622
623 fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
624 other.ir_node.into_inner()
625 }
626
627 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
628 Optional::new(location, ir_node)
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use futures::{SinkExt, StreamExt};
635 use hydro_deploy::Deployment;
636 use stageleft::q;
637
638 use crate::*;
639
640 #[tokio::test]
641 async fn tick_cycle_cardinality() {
642 let mut deployment = Deployment::new();
643
644 let flow = FlowBuilder::new();
645 let node = flow.process::<()>();
646 let external = flow.external::<()>();
647
648 let (input_send, input) = node.source_external_bincode(&external);
649
650 let node_tick = node.tick();
651 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
652 let counts = singleton
653 .clone()
654 .into_stream()
655 .count()
656 .continue_if(input.batch(&node_tick, nondet!()).first())
657 .all_ticks()
658 .send_bincode_external(&external);
659 complete_cycle.complete_next_tick(singleton);
660
661 let nodes = flow
662 .with_process(&node, deployment.Localhost())
663 .with_external(&external, deployment.Localhost())
664 .deploy(&mut deployment);
665
666 deployment.deploy().await.unwrap();
667
668 let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
669 let mut external_out = nodes.connect_source_bincode(counts).await;
670
671 deployment.start().await.unwrap();
672
673 tick_trigger.send(()).await.unwrap();
674
675 assert_eq!(external_out.next().await.unwrap(), 1);
676
677 tick_trigger.send(()).await.unwrap();
678
679 assert_eq!(external_out.next().await.unwrap(), 1);
680 }
681}