hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::location::dynamic::LocationId;
12use crate::location::member_id::TaglessMemberId;
13use crate::location::{MembershipEvent, NetworkHint};
14
15pub trait Deploy<'a> {
16    type InstantiateEnv;
17
18    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
19    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
20    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
21        + RegisterPort<'a, Self>;
22    type Port: Clone;
23    type ExternalRawPort;
24    type Meta: Default;
25
26    /// Type of ID used to switch between different subgraphs at runtime.
27    type GraphId;
28
29    fn has_trivial_node() -> bool {
30        false
31    }
32
33    fn trivial_process(_id: usize) -> Self::Process {
34        panic!("No trivial process")
35    }
36
37    fn trivial_cluster(_id: usize) -> Self::Cluster {
38        panic!("No trivial cluster")
39    }
40
41    fn trivial_external(_id: usize) -> Self::External {
42        panic!("No trivial external process")
43    }
44
45    fn allocate_process_port(process: &Self::Process) -> Self::Port;
46    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
47    fn allocate_external_port(external: &Self::External) -> Self::Port;
48
49    fn o2o_sink_source(
50        p1: &Self::Process,
51        p1_port: &Self::Port,
52        p2: &Self::Process,
53        p2_port: &Self::Port,
54    ) -> (syn::Expr, syn::Expr);
55    fn o2o_connect(
56        p1: &Self::Process,
57        p1_port: &Self::Port,
58        p2: &Self::Process,
59        p2_port: &Self::Port,
60    ) -> Box<dyn FnOnce()>;
61
62    fn o2m_sink_source(
63        p1: &Self::Process,
64        p1_port: &Self::Port,
65        c2: &Self::Cluster,
66        c2_port: &Self::Port,
67    ) -> (syn::Expr, syn::Expr);
68    fn o2m_connect(
69        p1: &Self::Process,
70        p1_port: &Self::Port,
71        c2: &Self::Cluster,
72        c2_port: &Self::Port,
73    ) -> Box<dyn FnOnce()>;
74
75    fn m2o_sink_source(
76        c1: &Self::Cluster,
77        c1_port: &Self::Port,
78        p2: &Self::Process,
79        p2_port: &Self::Port,
80    ) -> (syn::Expr, syn::Expr);
81    fn m2o_connect(
82        c1: &Self::Cluster,
83        c1_port: &Self::Port,
84        p2: &Self::Process,
85        p2_port: &Self::Port,
86    ) -> Box<dyn FnOnce()>;
87
88    fn m2m_sink_source(
89        c1: &Self::Cluster,
90        c1_port: &Self::Port,
91        c2: &Self::Cluster,
92        c2_port: &Self::Port,
93    ) -> (syn::Expr, syn::Expr);
94    fn m2m_connect(
95        c1: &Self::Cluster,
96        c1_port: &Self::Port,
97        c2: &Self::Cluster,
98        c2_port: &Self::Port,
99    ) -> Box<dyn FnOnce()>;
100
101    fn e2o_many_source(
102        extra_stmts: &mut Vec<syn::Stmt>,
103        p2: &Self::Process,
104        p2_port: &Self::Port,
105        codec_type: &syn::Type,
106        shared_handle: String,
107    ) -> syn::Expr;
108    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
109
110    fn e2o_source(
111        extra_stmts: &mut Vec<syn::Stmt>,
112        p1: &Self::External,
113        p1_port: &Self::Port,
114        p2: &Self::Process,
115        p2_port: &Self::Port,
116        codec_type: &syn::Type,
117        shared_handle: String,
118    ) -> syn::Expr;
119    fn e2o_connect(
120        p1: &Self::External,
121        p1_port: &Self::Port,
122        p2: &Self::Process,
123        p2_port: &Self::Port,
124        many: bool,
125        server_hint: NetworkHint,
126    ) -> Box<dyn FnOnce()>;
127
128    fn o2e_sink(
129        p1: &Self::Process,
130        p1_port: &Self::Port,
131        p2: &Self::External,
132        p2_port: &Self::Port,
133        shared_handle: String,
134    ) -> syn::Expr;
135
136    fn cluster_ids(
137        of_cluster: usize,
138    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
139
140    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
141
142    fn cluster_membership_stream(
143        location_id: &LocationId,
144    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
145}
146
147pub trait ProcessSpec<'a, D>
148where
149    D: Deploy<'a> + ?Sized,
150{
151    fn build(self, id: usize, name_hint: &str) -> D::Process;
152}
153
154pub trait IntoProcessSpec<'a, D>
155where
156    D: Deploy<'a> + ?Sized,
157{
158    type ProcessSpec: ProcessSpec<'a, D>;
159    fn into_process_spec(self) -> Self::ProcessSpec;
160}
161
162impl<'a, D, T> IntoProcessSpec<'a, D> for T
163where
164    D: Deploy<'a> + ?Sized,
165    T: ProcessSpec<'a, D>,
166{
167    type ProcessSpec = T;
168    fn into_process_spec(self) -> Self::ProcessSpec {
169        self
170    }
171}
172
173pub trait ClusterSpec<'a, D>
174where
175    D: Deploy<'a> + ?Sized,
176{
177    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
178}
179
180pub trait ExternalSpec<'a, D>
181where
182    D: Deploy<'a> + ?Sized,
183{
184    fn build(self, id: usize, name_hint: &str) -> D::External;
185}
186
187pub trait Node {
188    type Port;
189    type Meta;
190    type InstantiateEnv;
191
192    fn next_port(&self) -> Self::Port;
193
194    fn update_meta(&mut self, meta: &Self::Meta);
195
196    fn instantiate(
197        &self,
198        env: &mut Self::InstantiateEnv,
199        meta: &mut Self::Meta,
200        graph: DfirGraph,
201        extra_stmts: Vec<syn::Stmt>,
202    );
203}
204
205pub type DynSourceSink<Out, In, InErr> = (
206    Pin<Box<dyn Stream<Item = Out>>>,
207    Pin<Box<dyn Sink<In, Error = InErr>>>,
208);
209
210pub trait RegisterPort<'a, D>: Clone
211where
212    D: Deploy<'a> + ?Sized,
213{
214    fn register(&self, key: usize, port: D::Port);
215    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
216
217    fn as_bytes_bidi(
218        &self,
219        key: usize,
220    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
221
222    fn as_bincode_bidi<InT, OutT>(
223        &self,
224        key: usize,
225    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
226    where
227        InT: Serialize + 'static,
228        OutT: DeserializeOwned + 'static;
229
230    fn as_bincode_sink<T>(
231        &self,
232        key: usize,
233    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
234    where
235        T: Serialize + 'static;
236
237    fn as_bincode_source<T>(
238        &self,
239        key: usize,
240    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
241    where
242        T: DeserializeOwned + 'static;
243}