1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
31 env: &mut Self::InstantiateEnv,
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 name: Option<&str>,
37 networking_info: &crate::networking::NetworkingInfo,
38 ) -> (syn::Expr, syn::Expr);
39
40 fn o2o_connect(
45 p1: &Self::Process,
46 p1_port: &<Self::Process as Node>::Port,
47 p2: &Self::Process,
48 p2_port: &<Self::Process as Node>::Port,
49 ) -> Box<dyn FnOnce()>;
50
51 fn o2m_sink_source(
59 env: &mut Self::InstantiateEnv,
60 p1: &Self::Process,
61 p1_port: &<Self::Process as Node>::Port,
62 c2: &Self::Cluster,
63 c2_port: &<Self::Cluster as Node>::Port,
64 name: Option<&str>,
65 networking_info: &crate::networking::NetworkingInfo,
66 ) -> (syn::Expr, syn::Expr);
67
68 fn o2m_connect(
73 p1: &Self::Process,
74 p1_port: &<Self::Process as Node>::Port,
75 c2: &Self::Cluster,
76 c2_port: &<Self::Cluster as Node>::Port,
77 ) -> Box<dyn FnOnce()>;
78
79 fn m2o_sink_source(
87 env: &mut Self::InstantiateEnv,
88 c1: &Self::Cluster,
89 c1_port: &<Self::Cluster as Node>::Port,
90 p2: &Self::Process,
91 p2_port: &<Self::Process as Node>::Port,
92 name: Option<&str>,
93 networking_info: &crate::networking::NetworkingInfo,
94 ) -> (syn::Expr, syn::Expr);
95
96 fn m2o_connect(
101 c1: &Self::Cluster,
102 c1_port: &<Self::Cluster as Node>::Port,
103 p2: &Self::Process,
104 p2_port: &<Self::Process as Node>::Port,
105 ) -> Box<dyn FnOnce()>;
106
107 fn m2m_sink_source(
115 env: &mut Self::InstantiateEnv,
116 c1: &Self::Cluster,
117 c1_port: &<Self::Cluster as Node>::Port,
118 c2: &Self::Cluster,
119 c2_port: &<Self::Cluster as Node>::Port,
120 name: Option<&str>,
121 networking_info: &crate::networking::NetworkingInfo,
122 ) -> (syn::Expr, syn::Expr);
123
124 fn m2m_connect(
129 c1: &Self::Cluster,
130 c1_port: &<Self::Cluster as Node>::Port,
131 c2: &Self::Cluster,
132 c2_port: &<Self::Cluster as Node>::Port,
133 ) -> Box<dyn FnOnce()>;
134
135 fn e2o_many_source(
136 extra_stmts: &mut Vec<syn::Stmt>,
137 p2: &Self::Process,
138 p2_port: &<Self::Process as Node>::Port,
139 codec_type: &syn::Type,
140 shared_handle: String,
141 ) -> syn::Expr;
142 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
143
144 fn e2o_source(
145 extra_stmts: &mut Vec<syn::Stmt>,
146 p1: &Self::External,
147 p1_port: &<Self::External as Node>::Port,
148 p2: &Self::Process,
149 p2_port: &<Self::Process as Node>::Port,
150 codec_type: &syn::Type,
151 shared_handle: String,
152 ) -> syn::Expr;
153 fn e2o_connect(
154 p1: &Self::External,
155 p1_port: &<Self::External as Node>::Port,
156 p2: &Self::Process,
157 p2_port: &<Self::Process as Node>::Port,
158 many: bool,
159 server_hint: NetworkHint,
160 ) -> Box<dyn FnOnce()>;
161
162 fn o2e_sink(
163 p1: &Self::Process,
164 p1_port: &<Self::Process as Node>::Port,
165 p2: &Self::External,
166 p2_port: &<Self::External as Node>::Port,
167 shared_handle: String,
168 ) -> syn::Expr;
169
170 fn cluster_ids(
171 of_cluster: LocationKey,
172 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
173
174 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
175
176 fn cluster_membership_stream(
177 env: &mut Self::InstantiateEnv,
178 at_location: &LocationId,
179 location_id: &LocationId,
180 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
181
182 fn register_embedded_stream_input(
187 _env: &mut Self::InstantiateEnv,
188 _location_key: LocationKey,
189 _ident: &syn::Ident,
190 _element_type: &syn::Type,
191 ) {
192 panic!("register_embedded_stream_input is only supported by EmbeddedDeploy");
193 }
194
195 fn register_embedded_singleton_input(
200 _env: &mut Self::InstantiateEnv,
201 _location_key: LocationKey,
202 _ident: &syn::Ident,
203 _element_type: &syn::Type,
204 ) {
205 panic!("register_embedded_singleton_input is only supported by EmbeddedDeploy");
206 }
207
208 fn register_embedded_output(
213 _env: &mut Self::InstantiateEnv,
214 _location_key: LocationKey,
215 _ident: &syn::Ident,
216 _element_type: &syn::Type,
217 ) {
218 panic!("register_embedded_output is only supported by EmbeddedDeploy");
219 }
220}
221
222pub trait ProcessSpec<'a, D>
223where
224 D: Deploy<'a> + ?Sized,
225{
226 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
227}
228
229pub trait IntoProcessSpec<'a, D>
230where
231 D: Deploy<'a> + ?Sized,
232{
233 type ProcessSpec: ProcessSpec<'a, D>;
234 fn into_process_spec(self) -> Self::ProcessSpec;
235}
236
237impl<'a, D, T> IntoProcessSpec<'a, D> for T
238where
239 D: Deploy<'a> + ?Sized,
240 T: ProcessSpec<'a, D>,
241{
242 type ProcessSpec = T;
243 fn into_process_spec(self) -> Self::ProcessSpec {
244 self
245 }
246}
247
248pub trait ClusterSpec<'a, D>
249where
250 D: Deploy<'a> + ?Sized,
251{
252 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
253}
254
255pub trait ExternalSpec<'a, D>
256where
257 D: Deploy<'a> + ?Sized,
258{
259 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
260}
261
262pub trait Node {
263 type Port: Clone;
270 type Meta: Default;
271 type InstantiateEnv;
272
273 fn next_port(&self) -> Self::Port;
275
276 fn update_meta(&self, meta: &Self::Meta);
277
278 fn instantiate(
279 &self,
280 env: &mut Self::InstantiateEnv,
281 meta: &mut Self::Meta,
282 graph: DfirGraph,
283 extra_stmts: &[syn::Stmt],
284 sidecars: &[syn::Expr],
285 );
286}
287
288pub type DynSourceSink<Out, In, InErr> = (
289 Pin<Box<dyn Stream<Item = Out>>>,
290 Pin<Box<dyn Sink<In, Error = InErr>>>,
291);
292
293pub trait RegisterPort<'a, D>: Node + Clone
294where
295 D: Deploy<'a> + ?Sized,
296{
297 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
298
299 fn as_bytes_bidi(
300 &self,
301 external_port_id: ExternalPortId,
302 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
303
304 fn as_bincode_bidi<InT, OutT>(
305 &self,
306 external_port_id: ExternalPortId,
307 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
308 where
309 InT: Serialize + 'static,
310 OutT: DeserializeOwned + 'static;
311
312 fn as_bincode_sink<T>(
313 &self,
314 external_port_id: ExternalPortId,
315 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
316 where
317 T: Serialize + 'static;
318
319 fn as_bincode_source<T>(
320 &self,
321 external_port_id: ExternalPortId,
322 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
323 where
324 T: DeserializeOwned + 'static;
325}