1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
26 p1: &Self::Process,
27 p1_port: &<Self::Process as Node>::Port,
28 p2: &Self::Process,
29 p2_port: &<Self::Process as Node>::Port,
30 ) -> (syn::Expr, syn::Expr);
31 fn o2o_connect(
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 ) -> Box<dyn FnOnce()>;
37
38 fn o2m_sink_source(
39 p1: &Self::Process,
40 p1_port: &<Self::Process as Node>::Port,
41 c2: &Self::Cluster,
42 c2_port: &<Self::Cluster as Node>::Port,
43 ) -> (syn::Expr, syn::Expr);
44 fn o2m_connect(
45 p1: &Self::Process,
46 p1_port: &<Self::Process as Node>::Port,
47 c2: &Self::Cluster,
48 c2_port: &<Self::Cluster as Node>::Port,
49 ) -> Box<dyn FnOnce()>;
50
51 fn m2o_sink_source(
52 c1: &Self::Cluster,
53 c1_port: &<Self::Cluster as Node>::Port,
54 p2: &Self::Process,
55 p2_port: &<Self::Process as Node>::Port,
56 ) -> (syn::Expr, syn::Expr);
57 fn m2o_connect(
58 c1: &Self::Cluster,
59 c1_port: &<Self::Cluster as Node>::Port,
60 p2: &Self::Process,
61 p2_port: &<Self::Process as Node>::Port,
62 ) -> Box<dyn FnOnce()>;
63
64 fn m2m_sink_source(
65 c1: &Self::Cluster,
66 c1_port: &<Self::Cluster as Node>::Port,
67 c2: &Self::Cluster,
68 c2_port: &<Self::Cluster as Node>::Port,
69 ) -> (syn::Expr, syn::Expr);
70 fn m2m_connect(
71 c1: &Self::Cluster,
72 c1_port: &<Self::Cluster as Node>::Port,
73 c2: &Self::Cluster,
74 c2_port: &<Self::Cluster as Node>::Port,
75 ) -> Box<dyn FnOnce()>;
76
77 fn e2o_many_source(
78 extra_stmts: &mut Vec<syn::Stmt>,
79 p2: &Self::Process,
80 p2_port: &<Self::Process as Node>::Port,
81 codec_type: &syn::Type,
82 shared_handle: String,
83 ) -> syn::Expr;
84 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
85
86 fn e2o_source(
87 extra_stmts: &mut Vec<syn::Stmt>,
88 p1: &Self::External,
89 p1_port: &<Self::External as Node>::Port,
90 p2: &Self::Process,
91 p2_port: &<Self::Process as Node>::Port,
92 codec_type: &syn::Type,
93 shared_handle: String,
94 ) -> syn::Expr;
95 fn e2o_connect(
96 p1: &Self::External,
97 p1_port: &<Self::External as Node>::Port,
98 p2: &Self::Process,
99 p2_port: &<Self::Process as Node>::Port,
100 many: bool,
101 server_hint: NetworkHint,
102 ) -> Box<dyn FnOnce()>;
103
104 fn o2e_sink(
105 p1: &Self::Process,
106 p1_port: &<Self::Process as Node>::Port,
107 p2: &Self::External,
108 p2_port: &<Self::External as Node>::Port,
109 shared_handle: String,
110 ) -> syn::Expr;
111
112 fn cluster_ids(
113 of_cluster: usize,
114 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
115
116 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
117
118 fn cluster_membership_stream(
119 location_id: &LocationId,
120 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121}
122
123pub trait ProcessSpec<'a, D>
124where
125 D: Deploy<'a> + ?Sized,
126{
127 fn build(self, id: usize, name_hint: &str) -> D::Process;
128}
129
130pub trait IntoProcessSpec<'a, D>
131where
132 D: Deploy<'a> + ?Sized,
133{
134 type ProcessSpec: ProcessSpec<'a, D>;
135 fn into_process_spec(self) -> Self::ProcessSpec;
136}
137
138impl<'a, D, T> IntoProcessSpec<'a, D> for T
139where
140 D: Deploy<'a> + ?Sized,
141 T: ProcessSpec<'a, D>,
142{
143 type ProcessSpec = T;
144 fn into_process_spec(self) -> Self::ProcessSpec {
145 self
146 }
147}
148
149pub trait ClusterSpec<'a, D>
150where
151 D: Deploy<'a> + ?Sized,
152{
153 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
154}
155
156pub trait ExternalSpec<'a, D>
157where
158 D: Deploy<'a> + ?Sized,
159{
160 fn build(self, id: usize, name_hint: &str) -> D::External;
161}
162
163pub trait Node {
164 type Port: Clone;
171 type Meta: Default;
172 type InstantiateEnv;
173
174 fn next_port(&self) -> Self::Port;
176
177 fn update_meta(&self, meta: &Self::Meta);
178
179 fn instantiate(
180 &self,
181 env: &mut Self::InstantiateEnv,
182 meta: &mut Self::Meta,
183 graph: DfirGraph,
184 extra_stmts: Vec<syn::Stmt>,
185 );
186}
187
188pub type DynSourceSink<Out, In, InErr> = (
189 Pin<Box<dyn Stream<Item = Out>>>,
190 Pin<Box<dyn Sink<In, Error = InErr>>>,
191);
192
193pub trait RegisterPort<'a, D>: Node + Clone
194where
195 D: Deploy<'a> + ?Sized,
196{
197 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
198
199 fn as_bytes_bidi(
200 &self,
201 external_port_id: ExternalPortId,
202 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
203
204 fn as_bincode_bidi<InT, OutT>(
205 &self,
206 external_port_id: ExternalPortId,
207 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
208 where
209 InT: Serialize + 'static,
210 OutT: DeserializeOwned + 'static;
211
212 fn as_bincode_sink<T>(
213 &self,
214 external_port_id: ExternalPortId,
215 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
216 where
217 T: Serialize + 'static;
218
219 fn as_bincode_source<T>(
220 &self,
221 external_port_id: ExternalPortId,
222 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
223 where
224 T: DeserializeOwned + 'static;
225}