1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
31 env: &mut Self::InstantiateEnv,
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 name: Option<&str>,
37 networking_info: &crate::networking::NetworkingInfo,
38 ) -> (syn::Expr, syn::Expr);
39
40 fn o2o_connect(
45 p1: &Self::Process,
46 p1_port: &<Self::Process as Node>::Port,
47 p2: &Self::Process,
48 p2_port: &<Self::Process as Node>::Port,
49 ) -> Box<dyn FnOnce()>;
50
51 fn o2m_sink_source(
59 env: &mut Self::InstantiateEnv,
60 p1: &Self::Process,
61 p1_port: &<Self::Process as Node>::Port,
62 c2: &Self::Cluster,
63 c2_port: &<Self::Cluster as Node>::Port,
64 name: Option<&str>,
65 networking_info: &crate::networking::NetworkingInfo,
66 ) -> (syn::Expr, syn::Expr);
67
68 fn o2m_connect(
73 p1: &Self::Process,
74 p1_port: &<Self::Process as Node>::Port,
75 c2: &Self::Cluster,
76 c2_port: &<Self::Cluster as Node>::Port,
77 ) -> Box<dyn FnOnce()>;
78
79 fn m2o_sink_source(
87 env: &mut Self::InstantiateEnv,
88 c1: &Self::Cluster,
89 c1_port: &<Self::Cluster as Node>::Port,
90 p2: &Self::Process,
91 p2_port: &<Self::Process as Node>::Port,
92 name: Option<&str>,
93 networking_info: &crate::networking::NetworkingInfo,
94 ) -> (syn::Expr, syn::Expr);
95
96 fn m2o_connect(
101 c1: &Self::Cluster,
102 c1_port: &<Self::Cluster as Node>::Port,
103 p2: &Self::Process,
104 p2_port: &<Self::Process as Node>::Port,
105 ) -> Box<dyn FnOnce()>;
106
107 fn m2m_sink_source(
115 env: &mut Self::InstantiateEnv,
116 c1: &Self::Cluster,
117 c1_port: &<Self::Cluster as Node>::Port,
118 c2: &Self::Cluster,
119 c2_port: &<Self::Cluster as Node>::Port,
120 name: Option<&str>,
121 networking_info: &crate::networking::NetworkingInfo,
122 ) -> (syn::Expr, syn::Expr);
123
124 fn m2m_connect(
129 c1: &Self::Cluster,
130 c1_port: &<Self::Cluster as Node>::Port,
131 c2: &Self::Cluster,
132 c2_port: &<Self::Cluster as Node>::Port,
133 ) -> Box<dyn FnOnce()>;
134
135 fn e2o_many_source(
136 extra_stmts: &mut Vec<syn::Stmt>,
137 p2: &Self::Process,
138 p2_port: &<Self::Process as Node>::Port,
139 codec_type: &syn::Type,
140 shared_handle: String,
141 ) -> syn::Expr;
142 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
143
144 fn e2o_source(
145 extra_stmts: &mut Vec<syn::Stmt>,
146 p1: &Self::External,
147 p1_port: &<Self::External as Node>::Port,
148 p2: &Self::Process,
149 p2_port: &<Self::Process as Node>::Port,
150 codec_type: &syn::Type,
151 shared_handle: String,
152 ) -> syn::Expr;
153 fn e2o_connect(
154 p1: &Self::External,
155 p1_port: &<Self::External as Node>::Port,
156 p2: &Self::Process,
157 p2_port: &<Self::Process as Node>::Port,
158 many: bool,
159 server_hint: NetworkHint,
160 ) -> Box<dyn FnOnce()>;
161
162 fn o2e_sink(
163 p1: &Self::Process,
164 p1_port: &<Self::Process as Node>::Port,
165 p2: &Self::External,
166 p2_port: &<Self::External as Node>::Port,
167 shared_handle: String,
168 ) -> syn::Expr;
169
170 fn cluster_ids(
171 of_cluster: LocationKey,
172 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
173
174 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
175
176 fn cluster_membership_stream(
177 env: &mut Self::InstantiateEnv,
178 at_location: &LocationId,
179 location_id: &LocationId,
180 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
181
182 fn register_embedded_input(
187 _env: &mut Self::InstantiateEnv,
188 _location_key: LocationKey,
189 _ident: &syn::Ident,
190 _element_type: &syn::Type,
191 ) {
192 panic!("register_embedded_input is only supported by EmbeddedDeploy");
193 }
194
195 fn register_embedded_output(
200 _env: &mut Self::InstantiateEnv,
201 _location_key: LocationKey,
202 _ident: &syn::Ident,
203 _element_type: &syn::Type,
204 ) {
205 panic!("register_embedded_output is only supported by EmbeddedDeploy");
206 }
207}
208
209pub trait ProcessSpec<'a, D>
210where
211 D: Deploy<'a> + ?Sized,
212{
213 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
214}
215
216pub trait IntoProcessSpec<'a, D>
217where
218 D: Deploy<'a> + ?Sized,
219{
220 type ProcessSpec: ProcessSpec<'a, D>;
221 fn into_process_spec(self) -> Self::ProcessSpec;
222}
223
224impl<'a, D, T> IntoProcessSpec<'a, D> for T
225where
226 D: Deploy<'a> + ?Sized,
227 T: ProcessSpec<'a, D>,
228{
229 type ProcessSpec = T;
230 fn into_process_spec(self) -> Self::ProcessSpec {
231 self
232 }
233}
234
235pub trait ClusterSpec<'a, D>
236where
237 D: Deploy<'a> + ?Sized,
238{
239 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
240}
241
242pub trait ExternalSpec<'a, D>
243where
244 D: Deploy<'a> + ?Sized,
245{
246 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
247}
248
249pub trait Node {
250 type Port: Clone;
257 type Meta: Default;
258 type InstantiateEnv;
259
260 fn next_port(&self) -> Self::Port;
262
263 fn update_meta(&self, meta: &Self::Meta);
264
265 fn instantiate(
266 &self,
267 env: &mut Self::InstantiateEnv,
268 meta: &mut Self::Meta,
269 graph: DfirGraph,
270 extra_stmts: &[syn::Stmt],
271 sidecars: &[syn::Expr],
272 );
273}
274
275pub type DynSourceSink<Out, In, InErr> = (
276 Pin<Box<dyn Stream<Item = Out>>>,
277 Pin<Box<dyn Sink<In, Error = InErr>>>,
278);
279
280pub trait RegisterPort<'a, D>: Node + Clone
281where
282 D: Deploy<'a> + ?Sized,
283{
284 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
285
286 fn as_bytes_bidi(
287 &self,
288 external_port_id: ExternalPortId,
289 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
290
291 fn as_bincode_bidi<InT, OutT>(
292 &self,
293 external_port_id: ExternalPortId,
294 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
295 where
296 InT: Serialize + 'static,
297 OutT: DeserializeOwned + 'static;
298
299 fn as_bincode_sink<T>(
300 &self,
301 external_port_id: ExternalPortId,
302 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
303 where
304 T: Serialize + 'static;
305
306 fn as_bincode_source<T>(
307 &self,
308 external_port_id: ExternalPortId,
309 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
310 where
311 T: DeserializeOwned + 'static;
312}