1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::stream::Stream as FuturesStream;
6use proc_macro2::Span;
7use stageleft::{QuotedWithContext, q};
8
9use super::builder::FlowState;
10use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
11use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
12use crate::{Singleton, Stream, Unbounded};
13
14pub mod external_process;
15pub use external_process::ExternalProcess;
16
17pub mod process;
18pub use process::Process;
19
20pub mod cluster;
21pub use cluster::{Cluster, ClusterId};
22
23pub mod can_send;
24pub use can_send::CanSend;
25
26pub mod tick;
27pub use tick::{Atomic, NoTick, Tick};
28
29#[derive(PartialEq, Eq, Clone, Debug, Hash)]
30pub enum LocationId {
31 Process(usize),
32 Cluster(usize),
33 Tick(usize, Box<LocationId>),
34 ExternalProcess(usize),
35}
36
37impl LocationId {
38 pub fn root(&self) -> &LocationId {
39 match self {
40 LocationId::Process(_) => self,
41 LocationId::Cluster(_) => self,
42 LocationId::Tick(_, id) => id.root(),
43 LocationId::ExternalProcess(_) => self,
44 }
45 }
46
47 pub fn raw_id(&self) -> usize {
48 match self {
49 LocationId::Process(id) => *id,
50 LocationId::Cluster(id) => *id,
51 LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
52 LocationId::ExternalProcess(id) => *id,
53 }
54 }
55}
56
57pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
58 assert_eq!(l1.id(), l2.id(), "locations do not match");
59}
60
61pub trait Location<'a>: Clone {
62 type Root: Location<'a>;
63
64 fn root(&self) -> Self::Root;
65
66 fn id(&self) -> LocationId;
67
68 fn flow_state(&self) -> &FlowState;
69
70 fn is_top_level() -> bool;
71
72 fn tick(&self) -> Tick<Self>
73 where
74 Self: NoTick,
75 {
76 let next_id = self.flow_state().borrow_mut().next_clock_id;
77 self.flow_state().borrow_mut().next_clock_id += 1;
78 Tick {
79 id: next_id,
80 l: self.clone(),
81 }
82 }
83
84 fn next_node_id(&self) -> usize {
85 let next_id = self.flow_state().borrow_mut().next_node_id;
86 self.flow_state().borrow_mut().next_node_id += 1;
87 next_id
88 }
89
90 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
91 HydroIrMetadata {
92 location_kind: self.id(),
93 output_type: Some(stageleft::quote_type::<T>().into()),
94 cardinality: None,
95 cpu_usage: None,
96 }
97 }
98
99 fn spin(&self) -> Stream<(), Self, Unbounded>
100 where
101 Self: Sized + NoTick,
102 {
103 Stream::new(
104 self.clone(),
105 HydroNode::Persist {
106 inner: Box::new(HydroNode::Source {
107 source: HydroSource::Spin(),
108 location_kind: self.id(),
109 metadata: self.new_node_metadata::<()>(),
110 }),
111 metadata: self.new_node_metadata::<()>(),
112 },
113 )
114 }
115
116 fn source_stream<T, E>(
117 &self,
118 e: impl QuotedWithContext<'a, E, Self>,
119 ) -> Stream<T, Self, Unbounded>
120 where
121 E: FuturesStream<Item = T> + Unpin,
122 Self: Sized + NoTick,
123 {
124 let e = e.splice_untyped_ctx(self);
125
126 Stream::new(
127 self.clone(),
128 HydroNode::Persist {
129 inner: Box::new(HydroNode::Source {
130 source: HydroSource::Stream(e.into()),
131 location_kind: self.id(),
132 metadata: self.new_node_metadata::<T>(),
133 }),
134 metadata: self.new_node_metadata::<T>(),
135 },
136 )
137 }
138
139 fn source_iter<T, E>(
140 &self,
141 e: impl QuotedWithContext<'a, E, Self>,
142 ) -> Stream<T, Self, Unbounded>
143 where
144 E: IntoIterator<Item = T>,
145 Self: Sized + NoTick,
146 {
147 let e = e.splice_untyped_ctx(self);
150
151 Stream::new(
152 self.clone(),
153 HydroNode::Persist {
154 inner: Box::new(HydroNode::Source {
155 source: HydroSource::Iter(e.into()),
156 location_kind: self.id(),
157 metadata: self.new_node_metadata::<T>(),
158 }),
159 metadata: self.new_node_metadata::<T>(),
160 },
161 )
162 }
163
164 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
165 where
166 T: Clone,
167 Self: Sized + NoTick,
168 {
169 let e_arr = q!([e]);
173 let e = e_arr.splice_untyped_ctx(self);
174
175 Singleton::new(
179 self.clone(),
180 HydroNode::Persist {
181 inner: Box::new(HydroNode::Persist {
182 inner: Box::new(HydroNode::Source {
183 source: HydroSource::Iter(e.into()),
184 location_kind: self.id(),
185 metadata: self.new_node_metadata::<T>(),
186 }),
187 metadata: self.new_node_metadata::<T>(),
188 }),
189 metadata: self.new_node_metadata::<T>(),
190 },
191 )
192 }
193
194 unsafe fn source_interval(
204 &self,
205 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
206 ) -> Stream<tokio::time::Instant, Self, Unbounded>
207 where
208 Self: Sized + NoTick,
209 {
210 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
211 tokio::time::interval(interval)
212 )))
213 }
214
215 unsafe fn source_interval_delayed(
226 &self,
227 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
228 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229 ) -> Stream<tokio::time::Instant, Self, Unbounded>
230 where
231 Self: Sized + NoTick,
232 {
233 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
234 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
235 )))
236 }
237
238 fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
239 where
240 S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
241 Self: NoTick,
242 {
243 let next_id = {
244 let on_id = match self.id() {
245 LocationId::Process(id) => id,
246 LocationId::Cluster(id) => id,
247 LocationId::Tick(_, _) => panic!(),
248 LocationId::ExternalProcess(_) => panic!(),
249 };
250
251 let mut flow_state = self.flow_state().borrow_mut();
252 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
253
254 let id = *next_id_entry;
255 *next_id_entry += 1;
256 id
257 };
258
259 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
260
261 (
262 ForwardRef {
263 completed: false,
264 ident: ident.clone(),
265 expected_location: self.id(),
266 _phantom: PhantomData,
267 },
268 S::create_source(ident, self.clone()),
269 )
270 }
271}