1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use dfir_lang::graph::DfirGraph;
6use dfir_rs::bytes::Bytes;
7use dfir_rs::futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12pub mod macro_runtime;
13pub use macro_runtime::*;
14
15#[cfg(feature = "deploy")]
16#[cfg(stageleft_runtime)]
17pub(crate) mod trybuild;
18
19#[cfg(feature = "deploy")]
20#[cfg(stageleft_runtime)]
21mod trybuild_rewriters;
22
23#[cfg(feature = "deploy")]
24#[cfg(stageleft_runtime)]
25#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
26pub use trybuild::init_test;
27
28#[cfg(feature = "deploy")]
29#[cfg(stageleft_runtime)]
30#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
31pub mod deploy_graph;
32
33#[cfg(feature = "deploy")]
34#[cfg(stageleft_runtime)]
35#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
36pub use deploy_graph::*;
37
38pub mod in_memory_graph;
39pub use in_memory_graph::*;
40
41pub trait LocalDeploy<'a> {
42 type Process: Node<Meta = Self::Meta>;
43 type Cluster: Node<Meta = Self::Meta>;
44 type ExternalProcess: Node<Meta = Self::Meta>;
45 type Meta: Default;
46 type GraphId;
47
48 fn has_trivial_node() -> bool {
49 false
50 }
51
52 fn trivial_process(_id: usize) -> Self::Process {
53 panic!("No trivial process")
54 }
55
56 fn trivial_cluster(_id: usize) -> Self::Cluster {
57 panic!("No trivial cluster")
58 }
59
60 fn trivial_external(_id: usize) -> Self::ExternalProcess {
61 panic!("No trivial external")
62 }
63}
64
65pub trait Deploy<'a> {
66 type InstantiateEnv;
67 type CompileEnv;
68
69 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
70 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
71 type ExternalProcess: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
72 + RegisterPort<'a, Self>;
73 type Port: Clone;
74 type ExternalRawPort;
75 type Meta: Default;
76
77 type GraphId;
79
80 fn has_trivial_node() -> bool {
81 false
82 }
83
84 fn trivial_process(_id: usize) -> Self::Process {
85 panic!("No trivial process")
86 }
87
88 fn trivial_cluster(_id: usize) -> Self::Cluster {
89 panic!("No trivial cluster")
90 }
91
92 fn allocate_process_port(process: &Self::Process) -> Self::Port;
93 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
94 fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port;
95
96 fn o2o_sink_source(
97 compile_env: &Self::CompileEnv,
98 p1: &Self::Process,
99 p1_port: &Self::Port,
100 p2: &Self::Process,
101 p2_port: &Self::Port,
102 ) -> (syn::Expr, syn::Expr);
103 fn o2o_connect(
104 p1: &Self::Process,
105 p1_port: &Self::Port,
106 p2: &Self::Process,
107 p2_port: &Self::Port,
108 ) -> Box<dyn FnOnce()>;
109
110 fn o2m_sink_source(
111 compile_env: &Self::CompileEnv,
112 p1: &Self::Process,
113 p1_port: &Self::Port,
114 c2: &Self::Cluster,
115 c2_port: &Self::Port,
116 ) -> (syn::Expr, syn::Expr);
117 fn o2m_connect(
118 p1: &Self::Process,
119 p1_port: &Self::Port,
120 c2: &Self::Cluster,
121 c2_port: &Self::Port,
122 ) -> Box<dyn FnOnce()>;
123
124 fn m2o_sink_source(
125 compile_env: &Self::CompileEnv,
126 c1: &Self::Cluster,
127 c1_port: &Self::Port,
128 p2: &Self::Process,
129 p2_port: &Self::Port,
130 ) -> (syn::Expr, syn::Expr);
131 fn m2o_connect(
132 c1: &Self::Cluster,
133 c1_port: &Self::Port,
134 p2: &Self::Process,
135 p2_port: &Self::Port,
136 ) -> Box<dyn FnOnce()>;
137
138 fn m2m_sink_source(
139 compile_env: &Self::CompileEnv,
140 c1: &Self::Cluster,
141 c1_port: &Self::Port,
142 c2: &Self::Cluster,
143 c2_port: &Self::Port,
144 ) -> (syn::Expr, syn::Expr);
145 fn m2m_connect(
146 c1: &Self::Cluster,
147 c1_port: &Self::Port,
148 c2: &Self::Cluster,
149 c2_port: &Self::Port,
150 ) -> Box<dyn FnOnce()>;
151
152 fn e2o_source(
153 compile_env: &Self::CompileEnv,
154 p1: &Self::ExternalProcess,
155 p1_port: &Self::Port,
156 p2: &Self::Process,
157 p2_port: &Self::Port,
158 ) -> syn::Expr;
159 fn e2o_connect(
160 p1: &Self::ExternalProcess,
161 p1_port: &Self::Port,
162 p2: &Self::Process,
163 p2_port: &Self::Port,
164 ) -> Box<dyn FnOnce()>;
165
166 fn o2e_sink(
167 compile_env: &Self::CompileEnv,
168 p1: &Self::Process,
169 p1_port: &Self::Port,
170 p2: &Self::ExternalProcess,
171 p2_port: &Self::Port,
172 ) -> syn::Expr;
173 fn o2e_connect(
174 p1: &Self::Process,
175 p1_port: &Self::Port,
176 p2: &Self::ExternalProcess,
177 p2_port: &Self::Port,
178 ) -> Box<dyn FnOnce()>;
179
180 fn cluster_ids(
181 env: &Self::CompileEnv,
182 of_cluster: usize,
183 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
184 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
185}
186
187impl<
188 'a,
189 T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>,
190 N: Node<Meta = M>,
191 C: Node<Meta = M>,
192 E: Node<Meta = M>,
193 M: Default,
194 R,
195> LocalDeploy<'a> for T
196{
197 type Process = N;
198 type Cluster = C;
199 type ExternalProcess = E;
200 type Meta = M;
201 type GraphId = R;
202
203 fn has_trivial_node() -> bool {
204 <T as Deploy<'a>>::has_trivial_node()
205 }
206
207 fn trivial_process(id: usize) -> Self::Process {
208 <T as Deploy<'a>>::trivial_process(id)
209 }
210
211 fn trivial_cluster(id: usize) -> Self::Cluster {
212 <T as Deploy<'a>>::trivial_cluster(id)
213 }
214}
215
216pub trait ProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
217 fn build(self, id: usize, name_hint: &str) -> D::Process;
218}
219
220pub trait IntoProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
221 type ProcessSpec: ProcessSpec<'a, D>;
222 fn into_process_spec(self) -> Self::ProcessSpec;
223}
224
225impl<'a, D: LocalDeploy<'a> + ?Sized, T: ProcessSpec<'a, D>> IntoProcessSpec<'a, D> for T {
226 type ProcessSpec = T;
227 fn into_process_spec(self) -> Self::ProcessSpec {
228 self
229 }
230}
231
232pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> {
233 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
234}
235
236pub trait ExternalSpec<'a, D: LocalDeploy<'a> + ?Sized> {
237 fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess;
238}
239
240pub trait Node {
241 type Port;
242 type Meta;
243 type InstantiateEnv;
244
245 fn next_port(&self) -> Self::Port;
246
247 fn update_meta(&mut self, meta: &Self::Meta);
248
249 fn instantiate(
250 &self,
251 env: &mut Self::InstantiateEnv,
252 meta: &mut Self::Meta,
253 graph: DfirGraph,
254 extra_stmts: Vec<syn::Stmt>,
255 );
256}
257
258pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone {
259 fn register(&self, key: usize, port: D::Port);
260 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
261
262 fn as_bytes_sink(
263 &self,
264 key: usize,
265 ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;
266
267 fn as_bincode_sink<T: Serialize + 'static>(
268 &self,
269 key: usize,
270 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a;
271
272 fn as_bytes_source(
273 &self,
274 key: usize,
275 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;
276
277 fn as_bincode_source<T: DeserializeOwned + 'static>(
278 &self,
279 key: usize,
280 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a;
281}