Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    fn o2o_sink_source(
26        p1: &Self::Process,
27        p1_port: &<Self::Process as Node>::Port,
28        p2: &Self::Process,
29        p2_port: &<Self::Process as Node>::Port,
30    ) -> (syn::Expr, syn::Expr);
31    fn o2o_connect(
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36    ) -> Box<dyn FnOnce()>;
37
38    fn o2m_sink_source(
39        p1: &Self::Process,
40        p1_port: &<Self::Process as Node>::Port,
41        c2: &Self::Cluster,
42        c2_port: &<Self::Cluster as Node>::Port,
43    ) -> (syn::Expr, syn::Expr);
44    fn o2m_connect(
45        p1: &Self::Process,
46        p1_port: &<Self::Process as Node>::Port,
47        c2: &Self::Cluster,
48        c2_port: &<Self::Cluster as Node>::Port,
49    ) -> Box<dyn FnOnce()>;
50
51    fn m2o_sink_source(
52        c1: &Self::Cluster,
53        c1_port: &<Self::Cluster as Node>::Port,
54        p2: &Self::Process,
55        p2_port: &<Self::Process as Node>::Port,
56    ) -> (syn::Expr, syn::Expr);
57    fn m2o_connect(
58        c1: &Self::Cluster,
59        c1_port: &<Self::Cluster as Node>::Port,
60        p2: &Self::Process,
61        p2_port: &<Self::Process as Node>::Port,
62    ) -> Box<dyn FnOnce()>;
63
64    fn m2m_sink_source(
65        c1: &Self::Cluster,
66        c1_port: &<Self::Cluster as Node>::Port,
67        c2: &Self::Cluster,
68        c2_port: &<Self::Cluster as Node>::Port,
69    ) -> (syn::Expr, syn::Expr);
70    fn m2m_connect(
71        c1: &Self::Cluster,
72        c1_port: &<Self::Cluster as Node>::Port,
73        c2: &Self::Cluster,
74        c2_port: &<Self::Cluster as Node>::Port,
75    ) -> Box<dyn FnOnce()>;
76
77    fn e2o_many_source(
78        extra_stmts: &mut Vec<syn::Stmt>,
79        p2: &Self::Process,
80        p2_port: &<Self::Process as Node>::Port,
81        codec_type: &syn::Type,
82        shared_handle: String,
83    ) -> syn::Expr;
84    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
85
86    fn e2o_source(
87        extra_stmts: &mut Vec<syn::Stmt>,
88        p1: &Self::External,
89        p1_port: &<Self::External as Node>::Port,
90        p2: &Self::Process,
91        p2_port: &<Self::Process as Node>::Port,
92        codec_type: &syn::Type,
93        shared_handle: String,
94    ) -> syn::Expr;
95    fn e2o_connect(
96        p1: &Self::External,
97        p1_port: &<Self::External as Node>::Port,
98        p2: &Self::Process,
99        p2_port: &<Self::Process as Node>::Port,
100        many: bool,
101        server_hint: NetworkHint,
102    ) -> Box<dyn FnOnce()>;
103
104    fn o2e_sink(
105        p1: &Self::Process,
106        p1_port: &<Self::Process as Node>::Port,
107        p2: &Self::External,
108        p2_port: &<Self::External as Node>::Port,
109        shared_handle: String,
110    ) -> syn::Expr;
111
112    fn cluster_ids(
113        of_cluster: LocationKey,
114    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
115
116    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
117
118    fn cluster_membership_stream(
119        location_id: &LocationId,
120    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121
122    /// Registers an embedded input for the given ident and element type.
123    ///
124    /// Only meaningful for the embedded deployment backend. The default
125    /// implementation panics.
126    fn register_embedded_input(
127        _env: &mut Self::InstantiateEnv,
128        _location_key: LocationKey,
129        _ident: &syn::Ident,
130        _element_type: &syn::Type,
131    ) {
132        panic!("register_embedded_input is only supported by EmbeddedDeploy");
133    }
134
135    /// Registers an embedded output for the given ident and element type.
136    ///
137    /// Only meaningful for the embedded deployment backend. The default
138    /// implementation panics.
139    fn register_embedded_output(
140        _env: &mut Self::InstantiateEnv,
141        _location_key: LocationKey,
142        _ident: &syn::Ident,
143        _element_type: &syn::Type,
144    ) {
145        panic!("register_embedded_output is only supported by EmbeddedDeploy");
146    }
147}
148
149pub trait ProcessSpec<'a, D>
150where
151    D: Deploy<'a> + ?Sized,
152{
153    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
154}
155
156pub trait IntoProcessSpec<'a, D>
157where
158    D: Deploy<'a> + ?Sized,
159{
160    type ProcessSpec: ProcessSpec<'a, D>;
161    fn into_process_spec(self) -> Self::ProcessSpec;
162}
163
164impl<'a, D, T> IntoProcessSpec<'a, D> for T
165where
166    D: Deploy<'a> + ?Sized,
167    T: ProcessSpec<'a, D>,
168{
169    type ProcessSpec = T;
170    fn into_process_spec(self) -> Self::ProcessSpec {
171        self
172    }
173}
174
175pub trait ClusterSpec<'a, D>
176where
177    D: Deploy<'a> + ?Sized,
178{
179    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
180}
181
182pub trait ExternalSpec<'a, D>
183where
184    D: Deploy<'a> + ?Sized,
185{
186    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
187}
188
189pub trait Node {
190    /// A logical communication endpoint for this node.
191    ///
192    /// Implementors are free to choose the concrete representation (for example,
193    /// a handle or identifier), but it must be `Clone` so that a single logical
194    /// port can be duplicated and passed to multiple consumers. New ports are
195    /// allocated via [`Self::next_port`].
196    type Port: Clone;
197    type Meta: Default;
198    type InstantiateEnv;
199
200    /// Allocates and returns a new port.
201    fn next_port(&self) -> Self::Port;
202
203    fn update_meta(&self, meta: &Self::Meta);
204
205    fn instantiate(
206        &self,
207        env: &mut Self::InstantiateEnv,
208        meta: &mut Self::Meta,
209        graph: DfirGraph,
210        extra_stmts: &[syn::Stmt],
211        sidecars: &[syn::Expr],
212    );
213}
214
215pub type DynSourceSink<Out, In, InErr> = (
216    Pin<Box<dyn Stream<Item = Out>>>,
217    Pin<Box<dyn Sink<In, Error = InErr>>>,
218);
219
220pub trait RegisterPort<'a, D>: Node + Clone
221where
222    D: Deploy<'a> + ?Sized,
223{
224    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
225
226    fn as_bytes_bidi(
227        &self,
228        external_port_id: ExternalPortId,
229    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
230
231    fn as_bincode_bidi<InT, OutT>(
232        &self,
233        external_port_id: ExternalPortId,
234    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
235    where
236        InT: Serialize + 'static,
237        OutT: DeserializeOwned + 'static;
238
239    fn as_bincode_sink<T>(
240        &self,
241        external_port_id: ExternalPortId,
242    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
243    where
244        T: Serialize + 'static;
245
246    fn as_bincode_source<T>(
247        &self,
248        external_port_id: ExternalPortId,
249    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
250    where
251        T: DeserializeOwned + 'static;
252}