1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
26 p1: &Self::Process,
27 p1_port: &<Self::Process as Node>::Port,
28 p2: &Self::Process,
29 p2_port: &<Self::Process as Node>::Port,
30 ) -> (syn::Expr, syn::Expr);
31 fn o2o_connect(
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 ) -> Box<dyn FnOnce()>;
37
38 fn o2m_sink_source(
39 p1: &Self::Process,
40 p1_port: &<Self::Process as Node>::Port,
41 c2: &Self::Cluster,
42 c2_port: &<Self::Cluster as Node>::Port,
43 ) -> (syn::Expr, syn::Expr);
44 fn o2m_connect(
45 p1: &Self::Process,
46 p1_port: &<Self::Process as Node>::Port,
47 c2: &Self::Cluster,
48 c2_port: &<Self::Cluster as Node>::Port,
49 ) -> Box<dyn FnOnce()>;
50
51 fn m2o_sink_source(
52 c1: &Self::Cluster,
53 c1_port: &<Self::Cluster as Node>::Port,
54 p2: &Self::Process,
55 p2_port: &<Self::Process as Node>::Port,
56 ) -> (syn::Expr, syn::Expr);
57 fn m2o_connect(
58 c1: &Self::Cluster,
59 c1_port: &<Self::Cluster as Node>::Port,
60 p2: &Self::Process,
61 p2_port: &<Self::Process as Node>::Port,
62 ) -> Box<dyn FnOnce()>;
63
64 fn m2m_sink_source(
65 c1: &Self::Cluster,
66 c1_port: &<Self::Cluster as Node>::Port,
67 c2: &Self::Cluster,
68 c2_port: &<Self::Cluster as Node>::Port,
69 ) -> (syn::Expr, syn::Expr);
70 fn m2m_connect(
71 c1: &Self::Cluster,
72 c1_port: &<Self::Cluster as Node>::Port,
73 c2: &Self::Cluster,
74 c2_port: &<Self::Cluster as Node>::Port,
75 ) -> Box<dyn FnOnce()>;
76
77 fn e2o_many_source(
78 extra_stmts: &mut Vec<syn::Stmt>,
79 p2: &Self::Process,
80 p2_port: &<Self::Process as Node>::Port,
81 codec_type: &syn::Type,
82 shared_handle: String,
83 ) -> syn::Expr;
84 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
85
86 fn e2o_source(
87 extra_stmts: &mut Vec<syn::Stmt>,
88 p1: &Self::External,
89 p1_port: &<Self::External as Node>::Port,
90 p2: &Self::Process,
91 p2_port: &<Self::Process as Node>::Port,
92 codec_type: &syn::Type,
93 shared_handle: String,
94 ) -> syn::Expr;
95 fn e2o_connect(
96 p1: &Self::External,
97 p1_port: &<Self::External as Node>::Port,
98 p2: &Self::Process,
99 p2_port: &<Self::Process as Node>::Port,
100 many: bool,
101 server_hint: NetworkHint,
102 ) -> Box<dyn FnOnce()>;
103
104 fn o2e_sink(
105 p1: &Self::Process,
106 p1_port: &<Self::Process as Node>::Port,
107 p2: &Self::External,
108 p2_port: &<Self::External as Node>::Port,
109 shared_handle: String,
110 ) -> syn::Expr;
111
112 fn cluster_ids(
113 of_cluster: LocationKey,
114 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
115
116 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
117
118 fn cluster_membership_stream(
119 location_id: &LocationId,
120 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121
122 fn register_embedded_input(
127 _env: &mut Self::InstantiateEnv,
128 _location_key: LocationKey,
129 _ident: &syn::Ident,
130 _element_type: &syn::Type,
131 ) {
132 panic!("register_embedded_input is only supported by EmbeddedDeploy");
133 }
134
135 fn register_embedded_output(
140 _env: &mut Self::InstantiateEnv,
141 _location_key: LocationKey,
142 _ident: &syn::Ident,
143 _element_type: &syn::Type,
144 ) {
145 panic!("register_embedded_output is only supported by EmbeddedDeploy");
146 }
147}
148
149pub trait ProcessSpec<'a, D>
150where
151 D: Deploy<'a> + ?Sized,
152{
153 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
154}
155
156pub trait IntoProcessSpec<'a, D>
157where
158 D: Deploy<'a> + ?Sized,
159{
160 type ProcessSpec: ProcessSpec<'a, D>;
161 fn into_process_spec(self) -> Self::ProcessSpec;
162}
163
164impl<'a, D, T> IntoProcessSpec<'a, D> for T
165where
166 D: Deploy<'a> + ?Sized,
167 T: ProcessSpec<'a, D>,
168{
169 type ProcessSpec = T;
170 fn into_process_spec(self) -> Self::ProcessSpec {
171 self
172 }
173}
174
175pub trait ClusterSpec<'a, D>
176where
177 D: Deploy<'a> + ?Sized,
178{
179 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
180}
181
182pub trait ExternalSpec<'a, D>
183where
184 D: Deploy<'a> + ?Sized,
185{
186 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
187}
188
189pub trait Node {
190 type Port: Clone;
197 type Meta: Default;
198 type InstantiateEnv;
199
200 fn next_port(&self) -> Self::Port;
202
203 fn update_meta(&self, meta: &Self::Meta);
204
205 fn instantiate(
206 &self,
207 env: &mut Self::InstantiateEnv,
208 meta: &mut Self::Meta,
209 graph: DfirGraph,
210 extra_stmts: &[syn::Stmt],
211 sidecars: &[syn::Expr],
212 );
213}
214
215pub type DynSourceSink<Out, In, InErr> = (
216 Pin<Box<dyn Stream<Item = Out>>>,
217 Pin<Box<dyn Sink<In, Error = InErr>>>,
218);
219
220pub trait RegisterPort<'a, D>: Node + Clone
221where
222 D: Deploy<'a> + ?Sized,
223{
224 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
225
226 fn as_bytes_bidi(
227 &self,
228 external_port_id: ExternalPortId,
229 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
230
231 fn as_bincode_bidi<InT, OutT>(
232 &self,
233 external_port_id: ExternalPortId,
234 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
235 where
236 InT: Serialize + 'static,
237 OutT: DeserializeOwned + 'static;
238
239 fn as_bincode_sink<T>(
240 &self,
241 external_port_id: ExternalPortId,
242 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
243 where
244 T: Serialize + 'static;
245
246 fn as_bincode_source<T>(
247 &self,
248 external_port_id: ExternalPortId,
249 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
250 where
251 T: DeserializeOwned + 'static;
252}