1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::stream::Stream as FuturesStream;
6use proc_macro2::Span;
7use stageleft::{QuotedWithContext, q};
8
9use super::builder::FlowState;
10use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
11use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
12use crate::stream::ExactlyOnce;
13use crate::{Singleton, Stream, TotalOrder, Unbounded};
14
15pub mod external_process;
16pub use external_process::ExternalProcess;
17
18pub mod process;
19pub use process::Process;
20
21pub mod cluster;
22pub use cluster::{Cluster, ClusterId};
23
24pub mod can_send;
25pub use can_send::CanSend;
26
27pub mod tick;
28pub use tick::{Atomic, NoTick, Tick};
29
30#[derive(PartialEq, Eq, Clone, Debug, Hash)]
31pub enum LocationId {
32 Process(usize),
33 Cluster(usize),
34 Tick(usize, Box<LocationId>),
35 ExternalProcess(usize),
36}
37
38impl LocationId {
39 pub fn root(&self) -> &LocationId {
40 match self {
41 LocationId::Process(_) => self,
42 LocationId::Cluster(_) => self,
43 LocationId::Tick(_, id) => id.root(),
44 LocationId::ExternalProcess(_) => self,
45 }
46 }
47
48 pub fn raw_id(&self) -> usize {
49 match self {
50 LocationId::Process(id) => *id,
51 LocationId::Cluster(id) => *id,
52 LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
53 LocationId::ExternalProcess(id) => *id,
54 }
55 }
56}
57
58pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
59 assert_eq!(l1.id(), l2.id(), "locations do not match");
60}
61
62pub trait Location<'a>: Clone {
63 type Root: Location<'a>;
64
65 fn root(&self) -> Self::Root;
66
67 fn id(&self) -> LocationId;
68
69 fn flow_state(&self) -> &FlowState;
70
71 fn is_top_level() -> bool;
72
73 fn tick(&self) -> Tick<Self>
74 where
75 Self: NoTick,
76 {
77 let next_id = self.flow_state().borrow_mut().next_clock_id;
78 self.flow_state().borrow_mut().next_clock_id += 1;
79 Tick {
80 id: next_id,
81 l: self.clone(),
82 }
83 }
84
85 fn next_node_id(&self) -> usize {
86 let next_id = self.flow_state().borrow_mut().next_node_id;
87 self.flow_state().borrow_mut().next_node_id += 1;
88 next_id
89 }
90
91 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
92 HydroIrMetadata {
93 location_kind: self.id(),
94 output_type: Some(stageleft::quote_type::<T>().into()),
95 cardinality: None,
96 cpu_usage: None,
97 }
98 }
99
100 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
101 where
102 Self: Sized + NoTick,
103 {
104 Stream::new(
105 self.clone(),
106 HydroNode::Persist {
107 inner: Box::new(HydroNode::Source {
108 source: HydroSource::Spin(),
109 location_kind: self.id(),
110 metadata: self.new_node_metadata::<()>(),
111 }),
112 metadata: self.new_node_metadata::<()>(),
113 },
114 )
115 }
116
117 fn source_stream<T, E>(
118 &self,
119 e: impl QuotedWithContext<'a, E, Self>,
120 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
121 where
122 E: FuturesStream<Item = T> + Unpin,
123 Self: Sized + NoTick,
124 {
125 let e = e.splice_untyped_ctx(self);
126
127 Stream::new(
128 self.clone(),
129 HydroNode::Persist {
130 inner: Box::new(HydroNode::Source {
131 source: HydroSource::Stream(e.into()),
132 location_kind: self.id(),
133 metadata: self.new_node_metadata::<T>(),
134 }),
135 metadata: self.new_node_metadata::<T>(),
136 },
137 )
138 }
139
140 fn source_iter<T, E>(
141 &self,
142 e: impl QuotedWithContext<'a, E, Self>,
143 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
144 where
145 E: IntoIterator<Item = T>,
146 Self: Sized + NoTick,
147 {
148 let e = e.splice_untyped_ctx(self);
151
152 Stream::new(
153 self.clone(),
154 HydroNode::Persist {
155 inner: Box::new(HydroNode::Source {
156 source: HydroSource::Iter(e.into()),
157 location_kind: self.id(),
158 metadata: self.new_node_metadata::<T>(),
159 }),
160 metadata: self.new_node_metadata::<T>(),
161 },
162 )
163 }
164
165 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
166 where
167 T: Clone,
168 Self: Sized + NoTick,
169 {
170 let e_arr = q!([e]);
174 let e = e_arr.splice_untyped_ctx(self);
175
176 Singleton::new(
180 self.clone(),
181 HydroNode::Persist {
182 inner: Box::new(HydroNode::Persist {
183 inner: Box::new(HydroNode::Source {
184 source: HydroSource::Iter(e.into()),
185 location_kind: self.id(),
186 metadata: self.new_node_metadata::<T>(),
187 }),
188 metadata: self.new_node_metadata::<T>(),
189 }),
190 metadata: self.new_node_metadata::<T>(),
191 },
192 )
193 }
194
195 unsafe fn source_interval(
205 &self,
206 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
207 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
208 where
209 Self: Sized + NoTick,
210 {
211 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
212 tokio::time::interval(interval)
213 )))
214 }
215
216 unsafe fn source_interval_delayed(
227 &self,
228 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
230 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
231 where
232 Self: Sized + NoTick,
233 {
234 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
235 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
236 )))
237 }
238
239 fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
240 where
241 S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
242 Self: NoTick,
243 {
244 let next_id = self.flow_state().borrow_mut().next_cycle_id();
245 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
246
247 (
248 ForwardRef {
249 completed: false,
250 ident: ident.clone(),
251 expected_location: self.id(),
252 _phantom: PhantomData,
253 },
254 S::create_source(ident, self.clone()),
255 )
256 }
257}