1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use bytes::{Bytes, BytesMut};
6use dfir_lang::graph::DfirGraph;
7use futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "deploy")]
14pub(crate) mod trybuild;
15
16#[cfg(stageleft_runtime)]
17#[cfg(feature = "deploy")]
18mod trybuild_rewriters;
19
20#[cfg(stageleft_runtime)]
21#[cfg(feature = "deploy")]
22#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
23pub use trybuild::init_test;
24
25#[cfg(stageleft_runtime)]
26#[cfg(feature = "deploy")]
27#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
28pub mod deploy_graph;
29
30#[cfg(stageleft_runtime)]
31#[cfg(feature = "deploy")]
32#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
33pub use deploy_graph::*;
34
35use crate::NetworkHint;
36
37pub trait Deploy<'a> {
38 type InstantiateEnv;
39 type CompileEnv;
40
41 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
42 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
43 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
44 + RegisterPort<'a, Self>;
45 type Port: Clone;
46 type ExternalRawPort;
47 type Meta: Default;
48
49 type GraphId;
51
52 fn has_trivial_node() -> bool {
53 false
54 }
55
56 fn trivial_process(_id: usize) -> Self::Process {
57 panic!("No trivial process")
58 }
59
60 fn trivial_cluster(_id: usize) -> Self::Cluster {
61 panic!("No trivial cluster")
62 }
63
64 fn trivial_external(_id: usize) -> Self::External {
65 panic!("No trivial external process")
66 }
67
68 fn allocate_process_port(process: &Self::Process) -> Self::Port;
69 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
70 fn allocate_external_port(external: &Self::External) -> Self::Port;
71
72 fn o2o_sink_source(
73 compile_env: &Self::CompileEnv,
74 p1: &Self::Process,
75 p1_port: &Self::Port,
76 p2: &Self::Process,
77 p2_port: &Self::Port,
78 ) -> (syn::Expr, syn::Expr);
79 fn o2o_connect(
80 p1: &Self::Process,
81 p1_port: &Self::Port,
82 p2: &Self::Process,
83 p2_port: &Self::Port,
84 ) -> Box<dyn FnOnce()>;
85
86 fn o2m_sink_source(
87 compile_env: &Self::CompileEnv,
88 p1: &Self::Process,
89 p1_port: &Self::Port,
90 c2: &Self::Cluster,
91 c2_port: &Self::Port,
92 ) -> (syn::Expr, syn::Expr);
93 fn o2m_connect(
94 p1: &Self::Process,
95 p1_port: &Self::Port,
96 c2: &Self::Cluster,
97 c2_port: &Self::Port,
98 ) -> Box<dyn FnOnce()>;
99
100 fn m2o_sink_source(
101 compile_env: &Self::CompileEnv,
102 c1: &Self::Cluster,
103 c1_port: &Self::Port,
104 p2: &Self::Process,
105 p2_port: &Self::Port,
106 ) -> (syn::Expr, syn::Expr);
107 fn m2o_connect(
108 c1: &Self::Cluster,
109 c1_port: &Self::Port,
110 p2: &Self::Process,
111 p2_port: &Self::Port,
112 ) -> Box<dyn FnOnce()>;
113
114 fn m2m_sink_source(
115 compile_env: &Self::CompileEnv,
116 c1: &Self::Cluster,
117 c1_port: &Self::Port,
118 c2: &Self::Cluster,
119 c2_port: &Self::Port,
120 ) -> (syn::Expr, syn::Expr);
121 fn m2m_connect(
122 c1: &Self::Cluster,
123 c1_port: &Self::Port,
124 c2: &Self::Cluster,
125 c2_port: &Self::Port,
126 ) -> Box<dyn FnOnce()>;
127
128 fn e2o_many_source(
129 compile_env: &Self::CompileEnv,
130 extra_stmts: &mut Vec<syn::Stmt>,
131 p2: &Self::Process,
132 p2_port: &Self::Port,
133 codec_type: &syn::Type,
134 shared_handle: String,
135 ) -> syn::Expr;
136 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
137
138 fn e2o_source(
139 compile_env: &Self::CompileEnv,
140 p1: &Self::External,
141 p1_port: &Self::Port,
142 p2: &Self::Process,
143 p2_port: &Self::Port,
144 ) -> syn::Expr;
145 fn e2o_connect(
146 p1: &Self::External,
147 p1_port: &Self::Port,
148 p2: &Self::Process,
149 p2_port: &Self::Port,
150 many: bool,
151 server_hint: NetworkHint,
152 ) -> Box<dyn FnOnce()>;
153
154 fn o2e_sink(
155 compile_env: &Self::CompileEnv,
156 p1: &Self::Process,
157 p1_port: &Self::Port,
158 p2: &Self::External,
159 p2_port: &Self::Port,
160 ) -> syn::Expr;
161 fn o2e_connect(
162 p1: &Self::Process,
163 p1_port: &Self::Port,
164 p2: &Self::External,
165 p2_port: &Self::Port,
166 ) -> Box<dyn FnOnce()>;
167
168 fn cluster_ids(
169 env: &Self::CompileEnv,
170 of_cluster: usize,
171 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
172 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
173}
174
175pub trait ProcessSpec<'a, D>
176where
177 D: Deploy<'a> + ?Sized,
178{
179 fn build(self, id: usize, name_hint: &str) -> D::Process;
180}
181
182pub trait IntoProcessSpec<'a, D>
183where
184 D: Deploy<'a> + ?Sized,
185{
186 type ProcessSpec: ProcessSpec<'a, D>;
187 fn into_process_spec(self) -> Self::ProcessSpec;
188}
189
190impl<'a, D, T> IntoProcessSpec<'a, D> for T
191where
192 D: Deploy<'a> + ?Sized,
193 T: ProcessSpec<'a, D>,
194{
195 type ProcessSpec = T;
196 fn into_process_spec(self) -> Self::ProcessSpec {
197 self
198 }
199}
200
201pub trait ClusterSpec<'a, D>
202where
203 D: Deploy<'a> + ?Sized,
204{
205 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
206}
207
208pub trait ExternalSpec<'a, D>
209where
210 D: Deploy<'a> + ?Sized,
211{
212 fn build(self, id: usize, name_hint: &str) -> D::External;
213}
214
215pub trait Node {
216 type Port;
217 type Meta;
218 type InstantiateEnv;
219
220 fn next_port(&self) -> Self::Port;
221
222 fn update_meta(&mut self, meta: &Self::Meta);
223
224 fn instantiate(
225 &self,
226 env: &mut Self::InstantiateEnv,
227 meta: &mut Self::Meta,
228 graph: DfirGraph,
229 extra_stmts: Vec<syn::Stmt>,
230 );
231}
232
233type DynSourceSink<Out, In, InErr> = (
234 Pin<Box<dyn Stream<Item = Out>>>,
235 Pin<Box<dyn Sink<In, Error = InErr>>>,
236);
237
238pub trait RegisterPort<'a, D>: Clone
239where
240 D: Deploy<'a> + ?Sized,
241{
242 fn register(&self, key: usize, port: D::Port);
243 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
244
245 fn as_bytes_bidi(
246 &self,
247 key: usize,
248 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
249
250 fn as_bincode_bidi<InT, OutT>(
251 &self,
252 key: usize,
253 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
254 where
255 InT: Serialize + 'static,
256 OutT: DeserializeOwned + 'static;
257
258 fn as_bincode_sink<T>(
259 &self,
260 key: usize,
261 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
262 where
263 T: Serialize + 'static;
264
265 fn as_bincode_source<T>(
266 &self,
267 key: usize,
268 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
269 where
270 T: DeserializeOwned + 'static;
271}