1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::location::NetworkHint;
12
13pub trait Deploy<'a> {
14 type InstantiateEnv;
15 type CompileEnv;
16
17 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
18 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
19 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
20 + RegisterPort<'a, Self>;
21 type Port: Clone;
22 type ExternalRawPort;
23 type Meta: Default;
24
25 type GraphId;
27
28 fn has_trivial_node() -> bool {
29 false
30 }
31
32 fn trivial_process(_id: usize) -> Self::Process {
33 panic!("No trivial process")
34 }
35
36 fn trivial_cluster(_id: usize) -> Self::Cluster {
37 panic!("No trivial cluster")
38 }
39
40 fn trivial_external(_id: usize) -> Self::External {
41 panic!("No trivial external process")
42 }
43
44 fn allocate_process_port(process: &Self::Process) -> Self::Port;
45 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
46 fn allocate_external_port(external: &Self::External) -> Self::Port;
47
48 fn o2o_sink_source(
49 compile_env: &Self::CompileEnv,
50 p1: &Self::Process,
51 p1_port: &Self::Port,
52 p2: &Self::Process,
53 p2_port: &Self::Port,
54 ) -> (syn::Expr, syn::Expr);
55 fn o2o_connect(
56 p1: &Self::Process,
57 p1_port: &Self::Port,
58 p2: &Self::Process,
59 p2_port: &Self::Port,
60 ) -> Box<dyn FnOnce()>;
61
62 fn o2m_sink_source(
63 compile_env: &Self::CompileEnv,
64 p1: &Self::Process,
65 p1_port: &Self::Port,
66 c2: &Self::Cluster,
67 c2_port: &Self::Port,
68 ) -> (syn::Expr, syn::Expr);
69 fn o2m_connect(
70 p1: &Self::Process,
71 p1_port: &Self::Port,
72 c2: &Self::Cluster,
73 c2_port: &Self::Port,
74 ) -> Box<dyn FnOnce()>;
75
76 fn m2o_sink_source(
77 compile_env: &Self::CompileEnv,
78 c1: &Self::Cluster,
79 c1_port: &Self::Port,
80 p2: &Self::Process,
81 p2_port: &Self::Port,
82 ) -> (syn::Expr, syn::Expr);
83 fn m2o_connect(
84 c1: &Self::Cluster,
85 c1_port: &Self::Port,
86 p2: &Self::Process,
87 p2_port: &Self::Port,
88 ) -> Box<dyn FnOnce()>;
89
90 fn m2m_sink_source(
91 compile_env: &Self::CompileEnv,
92 c1: &Self::Cluster,
93 c1_port: &Self::Port,
94 c2: &Self::Cluster,
95 c2_port: &Self::Port,
96 ) -> (syn::Expr, syn::Expr);
97 fn m2m_connect(
98 c1: &Self::Cluster,
99 c1_port: &Self::Port,
100 c2: &Self::Cluster,
101 c2_port: &Self::Port,
102 ) -> Box<dyn FnOnce()>;
103
104 fn e2o_many_source(
105 compile_env: &Self::CompileEnv,
106 extra_stmts: &mut Vec<syn::Stmt>,
107 p2: &Self::Process,
108 p2_port: &Self::Port,
109 codec_type: &syn::Type,
110 shared_handle: String,
111 ) -> syn::Expr;
112 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
113
114 #[expect(clippy::too_many_arguments, reason = "necessary for code generation")]
115 fn e2o_source(
116 compile_env: &Self::CompileEnv,
117 extra_stmts: &mut Vec<syn::Stmt>,
118 p1: &Self::External,
119 p1_port: &Self::Port,
120 p2: &Self::Process,
121 p2_port: &Self::Port,
122 codec_type: &syn::Type,
123 shared_handle: String,
124 ) -> syn::Expr;
125 fn e2o_connect(
126 p1: &Self::External,
127 p1_port: &Self::Port,
128 p2: &Self::Process,
129 p2_port: &Self::Port,
130 many: bool,
131 server_hint: NetworkHint,
132 ) -> Box<dyn FnOnce()>;
133
134 fn o2e_sink(
135 compile_env: &Self::CompileEnv,
136 p1: &Self::Process,
137 p1_port: &Self::Port,
138 p2: &Self::External,
139 p2_port: &Self::Port,
140 shared_handle: String,
141 ) -> syn::Expr;
142
143 fn cluster_ids(
144 env: &Self::CompileEnv,
145 of_cluster: usize,
146 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
147 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
148}
149
150pub trait ProcessSpec<'a, D>
151where
152 D: Deploy<'a> + ?Sized,
153{
154 fn build(self, id: usize, name_hint: &str) -> D::Process;
155}
156
157pub trait IntoProcessSpec<'a, D>
158where
159 D: Deploy<'a> + ?Sized,
160{
161 type ProcessSpec: ProcessSpec<'a, D>;
162 fn into_process_spec(self) -> Self::ProcessSpec;
163}
164
165impl<'a, D, T> IntoProcessSpec<'a, D> for T
166where
167 D: Deploy<'a> + ?Sized,
168 T: ProcessSpec<'a, D>,
169{
170 type ProcessSpec = T;
171 fn into_process_spec(self) -> Self::ProcessSpec {
172 self
173 }
174}
175
176pub trait ClusterSpec<'a, D>
177where
178 D: Deploy<'a> + ?Sized,
179{
180 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
181}
182
183pub trait ExternalSpec<'a, D>
184where
185 D: Deploy<'a> + ?Sized,
186{
187 fn build(self, id: usize, name_hint: &str) -> D::External;
188}
189
190pub trait Node {
191 type Port;
192 type Meta;
193 type InstantiateEnv;
194
195 fn next_port(&self) -> Self::Port;
196
197 fn update_meta(&mut self, meta: &Self::Meta);
198
199 fn instantiate(
200 &self,
201 env: &mut Self::InstantiateEnv,
202 meta: &mut Self::Meta,
203 graph: DfirGraph,
204 extra_stmts: Vec<syn::Stmt>,
205 );
206}
207
208pub type DynSourceSink<Out, In, InErr> = (
209 Pin<Box<dyn Stream<Item = Out>>>,
210 Pin<Box<dyn Sink<In, Error = InErr>>>,
211);
212
213pub trait RegisterPort<'a, D>: Clone
214where
215 D: Deploy<'a> + ?Sized,
216{
217 fn register(&self, key: usize, port: D::Port);
218 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
219
220 fn as_bytes_bidi(
221 &self,
222 key: usize,
223 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
224
225 fn as_bincode_bidi<InT, OutT>(
226 &self,
227 key: usize,
228 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
229 where
230 InT: Serialize + 'static,
231 OutT: DeserializeOwned + 'static;
232
233 fn as_bincode_sink<T>(
234 &self,
235 key: usize,
236 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
237 where
238 T: Serialize + 'static;
239
240 fn as_bincode_source<T>(
241 &self,
242 key: usize,
243 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
244 where
245 T: DeserializeOwned + 'static;
246}