1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use bytes::Bytes;
6use dfir_lang::graph::DfirGraph;
7use futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12pub mod macro_runtime;
13pub use macro_runtime::*;
14
15#[cfg(feature = "deploy")]
16#[cfg(stageleft_runtime)]
17pub(crate) mod trybuild;
18
19#[cfg(feature = "deploy")]
20#[cfg(stageleft_runtime)]
21mod trybuild_rewriters;
22
23#[cfg(feature = "deploy")]
24#[cfg(stageleft_runtime)]
25#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
26pub use trybuild::init_test;
27
28#[cfg(feature = "deploy")]
29#[cfg(stageleft_runtime)]
30#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
31pub mod deploy_graph;
32
33#[cfg(feature = "deploy")]
34#[cfg(stageleft_runtime)]
35#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
36pub use deploy_graph::*;
37
38pub mod in_memory_graph;
39pub use in_memory_graph::*;
40
41pub trait LocalDeploy<'a> {
42 type Process: Node<Meta = Self::Meta>;
43 type Cluster: Node<Meta = Self::Meta>;
44 type ExternalProcess: Node<Meta = Self::Meta>;
45 type Meta: Default;
46 type GraphId;
47
48 fn has_trivial_node() -> bool {
49 false
50 }
51
52 fn trivial_process(_id: usize) -> Self::Process {
53 panic!("No trivial process")
54 }
55
56 fn trivial_cluster(_id: usize) -> Self::Cluster {
57 panic!("No trivial cluster")
58 }
59
60 fn trivial_external(_id: usize) -> Self::ExternalProcess {
61 panic!("No trivial external")
62 }
63}
64
65pub trait Deploy<'a> {
66 type InstantiateEnv;
67 type CompileEnv;
68
69 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
70 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
71 type ExternalProcess: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
72 + RegisterPort<'a, Self>;
73 type Port: Clone;
74 type ExternalRawPort;
75 type Meta: Default;
76
77 type GraphId;
79
80 fn has_trivial_node() -> bool {
81 false
82 }
83
84 fn trivial_process(_id: usize) -> Self::Process {
85 panic!("No trivial process")
86 }
87
88 fn trivial_cluster(_id: usize) -> Self::Cluster {
89 panic!("No trivial cluster")
90 }
91
92 fn allocate_process_port(process: &Self::Process) -> Self::Port;
93 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
94 fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port;
95
96 fn o2o_sink_source(
97 compile_env: &Self::CompileEnv,
98 p1: &Self::Process,
99 p1_port: &Self::Port,
100 p2: &Self::Process,
101 p2_port: &Self::Port,
102 ) -> (syn::Expr, syn::Expr);
103 fn o2o_connect(
104 p1: &Self::Process,
105 p1_port: &Self::Port,
106 p2: &Self::Process,
107 p2_port: &Self::Port,
108 ) -> Box<dyn FnOnce()>;
109
110 fn o2m_sink_source(
111 compile_env: &Self::CompileEnv,
112 p1: &Self::Process,
113 p1_port: &Self::Port,
114 c2: &Self::Cluster,
115 c2_port: &Self::Port,
116 ) -> (syn::Expr, syn::Expr);
117 fn o2m_connect(
118 p1: &Self::Process,
119 p1_port: &Self::Port,
120 c2: &Self::Cluster,
121 c2_port: &Self::Port,
122 ) -> Box<dyn FnOnce()>;
123
124 fn m2o_sink_source(
125 compile_env: &Self::CompileEnv,
126 c1: &Self::Cluster,
127 c1_port: &Self::Port,
128 p2: &Self::Process,
129 p2_port: &Self::Port,
130 ) -> (syn::Expr, syn::Expr);
131 fn m2o_connect(
132 c1: &Self::Cluster,
133 c1_port: &Self::Port,
134 p2: &Self::Process,
135 p2_port: &Self::Port,
136 ) -> Box<dyn FnOnce()>;
137
138 fn m2m_sink_source(
139 compile_env: &Self::CompileEnv,
140 c1: &Self::Cluster,
141 c1_port: &Self::Port,
142 c2: &Self::Cluster,
143 c2_port: &Self::Port,
144 ) -> (syn::Expr, syn::Expr);
145 fn m2m_connect(
146 c1: &Self::Cluster,
147 c1_port: &Self::Port,
148 c2: &Self::Cluster,
149 c2_port: &Self::Port,
150 ) -> Box<dyn FnOnce()>;
151
152 fn e2o_source(
153 compile_env: &Self::CompileEnv,
154 p1: &Self::ExternalProcess,
155 p1_port: &Self::Port,
156 p2: &Self::Process,
157 p2_port: &Self::Port,
158 ) -> syn::Expr;
159 fn e2o_connect(
160 p1: &Self::ExternalProcess,
161 p1_port: &Self::Port,
162 p2: &Self::Process,
163 p2_port: &Self::Port,
164 ) -> Box<dyn FnOnce()>;
165
166 fn o2e_sink(
167 compile_env: &Self::CompileEnv,
168 p1: &Self::Process,
169 p1_port: &Self::Port,
170 p2: &Self::ExternalProcess,
171 p2_port: &Self::Port,
172 ) -> syn::Expr;
173 fn o2e_connect(
174 p1: &Self::Process,
175 p1_port: &Self::Port,
176 p2: &Self::ExternalProcess,
177 p2_port: &Self::Port,
178 ) -> Box<dyn FnOnce()>;
179
180 fn cluster_ids(
181 env: &Self::CompileEnv,
182 of_cluster: usize,
183 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
184 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
185}
186
187impl<'a, T, N, C, E, M, R> LocalDeploy<'a> for T
188where
189 T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>,
190 N: Node<Meta = M>,
191 C: Node<Meta = M>,
192 E: Node<Meta = M>,
193 M: Default,
194{
195 type Process = N;
196 type Cluster = C;
197 type ExternalProcess = E;
198 type Meta = M;
199 type GraphId = R;
200
201 fn has_trivial_node() -> bool {
202 <T as Deploy<'a>>::has_trivial_node()
203 }
204
205 fn trivial_process(id: usize) -> Self::Process {
206 <T as Deploy<'a>>::trivial_process(id)
207 }
208
209 fn trivial_cluster(id: usize) -> Self::Cluster {
210 <T as Deploy<'a>>::trivial_cluster(id)
211 }
212}
213
214pub trait ProcessSpec<'a, D>
215where
216 D: LocalDeploy<'a> + ?Sized,
217{
218 fn build(self, id: usize, name_hint: &str) -> D::Process;
219}
220
221pub trait IntoProcessSpec<'a, D>
222where
223 D: LocalDeploy<'a> + ?Sized,
224{
225 type ProcessSpec: ProcessSpec<'a, D>;
226 fn into_process_spec(self) -> Self::ProcessSpec;
227}
228
229impl<'a, D, T> IntoProcessSpec<'a, D> for T
230where
231 D: LocalDeploy<'a> + ?Sized,
232 T: ProcessSpec<'a, D>,
233{
234 type ProcessSpec = T;
235 fn into_process_spec(self) -> Self::ProcessSpec {
236 self
237 }
238}
239
240pub trait ClusterSpec<'a, D>
241where
242 D: LocalDeploy<'a> + ?Sized,
243{
244 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
245}
246
247pub trait ExternalSpec<'a, D>
248where
249 D: LocalDeploy<'a> + ?Sized,
250{
251 fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess;
252}
253
254pub trait Node {
255 type Port;
256 type Meta;
257 type InstantiateEnv;
258
259 fn next_port(&self) -> Self::Port;
260
261 fn update_meta(&mut self, meta: &Self::Meta);
262
263 fn instantiate(
264 &self,
265 env: &mut Self::InstantiateEnv,
266 meta: &mut Self::Meta,
267 graph: DfirGraph,
268 extra_stmts: Vec<syn::Stmt>,
269 );
270}
271
272pub trait RegisterPort<'a, D>: Clone
273where
274 D: Deploy<'a> + ?Sized,
275{
276 fn register(&self, key: usize, port: D::Port);
277 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
278
279 fn as_bytes_sink(
280 &self,
281 key: usize,
282 ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;
283
284 fn as_bincode_sink<T>(
285 &self,
286 key: usize,
287 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
288 where
289 T: Serialize + 'static;
290
291 fn as_bytes_source(
292 &self,
293 key: usize,
294 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;
295
296 fn as_bincode_source<T>(
297 &self,
298 key: usize,
299 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
300 where
301 T: DeserializeOwned + 'static;
302}