1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::location::dynamic::LocationId;
12use crate::location::member_id::TaglessMemberId;
13use crate::location::{MembershipEvent, NetworkHint};
14
15pub trait Deploy<'a> {
16 type InstantiateEnv;
17
18 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
19 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
20 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
21 + RegisterPort<'a, Self>;
22 type Port: Clone;
23 type ExternalRawPort;
24 type Meta: Default;
25
26 type GraphId;
28
29 fn has_trivial_node() -> bool {
30 false
31 }
32
33 fn trivial_process(_id: usize) -> Self::Process {
34 panic!("No trivial process")
35 }
36
37 fn trivial_cluster(_id: usize) -> Self::Cluster {
38 panic!("No trivial cluster")
39 }
40
41 fn trivial_external(_id: usize) -> Self::External {
42 panic!("No trivial external process")
43 }
44
45 fn allocate_process_port(process: &Self::Process) -> Self::Port;
46 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
47 fn allocate_external_port(external: &Self::External) -> Self::Port;
48
49 fn o2o_sink_source(
50 p1: &Self::Process,
51 p1_port: &Self::Port,
52 p2: &Self::Process,
53 p2_port: &Self::Port,
54 ) -> (syn::Expr, syn::Expr);
55 fn o2o_connect(
56 p1: &Self::Process,
57 p1_port: &Self::Port,
58 p2: &Self::Process,
59 p2_port: &Self::Port,
60 ) -> Box<dyn FnOnce()>;
61
62 fn o2m_sink_source(
63 p1: &Self::Process,
64 p1_port: &Self::Port,
65 c2: &Self::Cluster,
66 c2_port: &Self::Port,
67 ) -> (syn::Expr, syn::Expr);
68 fn o2m_connect(
69 p1: &Self::Process,
70 p1_port: &Self::Port,
71 c2: &Self::Cluster,
72 c2_port: &Self::Port,
73 ) -> Box<dyn FnOnce()>;
74
75 fn m2o_sink_source(
76 c1: &Self::Cluster,
77 c1_port: &Self::Port,
78 p2: &Self::Process,
79 p2_port: &Self::Port,
80 ) -> (syn::Expr, syn::Expr);
81 fn m2o_connect(
82 c1: &Self::Cluster,
83 c1_port: &Self::Port,
84 p2: &Self::Process,
85 p2_port: &Self::Port,
86 ) -> Box<dyn FnOnce()>;
87
88 fn m2m_sink_source(
89 c1: &Self::Cluster,
90 c1_port: &Self::Port,
91 c2: &Self::Cluster,
92 c2_port: &Self::Port,
93 ) -> (syn::Expr, syn::Expr);
94 fn m2m_connect(
95 c1: &Self::Cluster,
96 c1_port: &Self::Port,
97 c2: &Self::Cluster,
98 c2_port: &Self::Port,
99 ) -> Box<dyn FnOnce()>;
100
101 fn e2o_many_source(
102 extra_stmts: &mut Vec<syn::Stmt>,
103 p2: &Self::Process,
104 p2_port: &Self::Port,
105 codec_type: &syn::Type,
106 shared_handle: String,
107 ) -> syn::Expr;
108 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
109
110 fn e2o_source(
111 extra_stmts: &mut Vec<syn::Stmt>,
112 p1: &Self::External,
113 p1_port: &Self::Port,
114 p2: &Self::Process,
115 p2_port: &Self::Port,
116 codec_type: &syn::Type,
117 shared_handle: String,
118 ) -> syn::Expr;
119 fn e2o_connect(
120 p1: &Self::External,
121 p1_port: &Self::Port,
122 p2: &Self::Process,
123 p2_port: &Self::Port,
124 many: bool,
125 server_hint: NetworkHint,
126 ) -> Box<dyn FnOnce()>;
127
128 fn o2e_sink(
129 p1: &Self::Process,
130 p1_port: &Self::Port,
131 p2: &Self::External,
132 p2_port: &Self::Port,
133 shared_handle: String,
134 ) -> syn::Expr;
135
136 fn cluster_ids(
137 of_cluster: usize,
138 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
139
140 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
141
142 fn cluster_membership_stream(
143 location_id: &LocationId,
144 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
145}
146
147pub trait ProcessSpec<'a, D>
148where
149 D: Deploy<'a> + ?Sized,
150{
151 fn build(self, id: usize, name_hint: &str) -> D::Process;
152}
153
154pub trait IntoProcessSpec<'a, D>
155where
156 D: Deploy<'a> + ?Sized,
157{
158 type ProcessSpec: ProcessSpec<'a, D>;
159 fn into_process_spec(self) -> Self::ProcessSpec;
160}
161
162impl<'a, D, T> IntoProcessSpec<'a, D> for T
163where
164 D: Deploy<'a> + ?Sized,
165 T: ProcessSpec<'a, D>,
166{
167 type ProcessSpec = T;
168 fn into_process_spec(self) -> Self::ProcessSpec {
169 self
170 }
171}
172
173pub trait ClusterSpec<'a, D>
174where
175 D: Deploy<'a> + ?Sized,
176{
177 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
178}
179
180pub trait ExternalSpec<'a, D>
181where
182 D: Deploy<'a> + ?Sized,
183{
184 fn build(self, id: usize, name_hint: &str) -> D::External;
185}
186
187pub trait Node {
188 type Port;
189 type Meta;
190 type InstantiateEnv;
191
192 fn next_port(&self) -> Self::Port;
193
194 fn update_meta(&mut self, meta: &Self::Meta);
195
196 fn instantiate(
197 &self,
198 env: &mut Self::InstantiateEnv,
199 meta: &mut Self::Meta,
200 graph: DfirGraph,
201 extra_stmts: Vec<syn::Stmt>,
202 );
203}
204
205pub type DynSourceSink<Out, In, InErr> = (
206 Pin<Box<dyn Stream<Item = Out>>>,
207 Pin<Box<dyn Sink<In, Error = InErr>>>,
208);
209
210pub trait RegisterPort<'a, D>: Clone
211where
212 D: Deploy<'a> + ?Sized,
213{
214 fn register(&self, key: usize, port: D::Port);
215 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
216
217 fn as_bytes_bidi(
218 &self,
219 key: usize,
220 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
221
222 fn as_bincode_bidi<InT, OutT>(
223 &self,
224 key: usize,
225 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
226 where
227 InT: Serialize + 'static,
228 OutT: DeserializeOwned + 'static;
229
230 fn as_bincode_sink<T>(
231 &self,
232 key: usize,
233 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
234 where
235 T: Serialize + 'static;
236
237 fn as_bincode_source<T>(
238 &self,
239 key: usize,
240 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
241 where
242 T: DeserializeOwned + 'static;
243}