Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    /// Generates the source and sink expressions when connecting a [`Self::Process`] to another
26    /// [`Self::Process`].
27    ///
28    /// The [`Self::InstantiateEnv`] can be used to record metadata about the created channel. The
29    /// provided `name` is the user-configured channel name from the network IR node.
30    fn o2o_sink_source(
31        env: &mut Self::InstantiateEnv,
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36        name: Option<&str>,
37        networking_info: &crate::networking::NetworkingInfo,
38    ) -> (syn::Expr, syn::Expr);
39
40    /// Performs any runtime wiring needed after code generation for a
41    /// [`Self::Process`]-to-[`Self::Process`] channel.
42    ///
43    /// The returned closure is executed once all locations have been instantiated.
44    fn o2o_connect(
45        p1: &Self::Process,
46        p1_port: &<Self::Process as Node>::Port,
47        p2: &Self::Process,
48        p2_port: &<Self::Process as Node>::Port,
49    ) -> Box<dyn FnOnce()>;
50
51    /// Generates the source and sink expressions when connecting a [`Self::Process`] to a
52    /// [`Self::Cluster`] (one-to-many).
53    ///
54    /// The sink expression is used on the sending process and the source expression on each
55    /// receiving cluster member. The [`Self::InstantiateEnv`] can be used to record metadata
56    /// about the created channel. The provided `name` is the user-configured channel name
57    /// from the network IR node.
58    fn o2m_sink_source(
59        env: &mut Self::InstantiateEnv,
60        p1: &Self::Process,
61        p1_port: &<Self::Process as Node>::Port,
62        c2: &Self::Cluster,
63        c2_port: &<Self::Cluster as Node>::Port,
64        name: Option<&str>,
65        networking_info: &crate::networking::NetworkingInfo,
66    ) -> (syn::Expr, syn::Expr);
67
68    /// Performs any runtime wiring needed after code generation for a
69    /// [`Self::Process`]-to-[`Self::Cluster`] channel.
70    ///
71    /// The returned closure is executed once all locations have been instantiated.
72    fn o2m_connect(
73        p1: &Self::Process,
74        p1_port: &<Self::Process as Node>::Port,
75        c2: &Self::Cluster,
76        c2_port: &<Self::Cluster as Node>::Port,
77    ) -> Box<dyn FnOnce()>;
78
79    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to a
80    /// [`Self::Process`] (many-to-one).
81    ///
82    /// The sink expression is used on each sending cluster member and the source expression
83    /// on the receiving process. The [`Self::InstantiateEnv`] can be used to record metadata
84    /// about the created channel. The provided `name` is the user-configured channel name
85    /// from the network IR node.
86    fn m2o_sink_source(
87        env: &mut Self::InstantiateEnv,
88        c1: &Self::Cluster,
89        c1_port: &<Self::Cluster as Node>::Port,
90        p2: &Self::Process,
91        p2_port: &<Self::Process as Node>::Port,
92        name: Option<&str>,
93        networking_info: &crate::networking::NetworkingInfo,
94    ) -> (syn::Expr, syn::Expr);
95
96    /// Performs any runtime wiring needed after code generation for a
97    /// [`Self::Cluster`]-to-[`Self::Process`] channel.
98    ///
99    /// The returned closure is executed once all locations have been instantiated.
100    fn m2o_connect(
101        c1: &Self::Cluster,
102        c1_port: &<Self::Cluster as Node>::Port,
103        p2: &Self::Process,
104        p2_port: &<Self::Process as Node>::Port,
105    ) -> Box<dyn FnOnce()>;
106
107    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to another
108    /// [`Self::Cluster`] (many-to-many).
109    ///
110    /// The sink expression is used on each sending cluster member and the source expression
111    /// on each receiving cluster member. The [`Self::InstantiateEnv`] can be used to record
112    /// metadata about the created channel. The provided `name` is the user-configured channel
113    /// name from the network IR node.
114    fn m2m_sink_source(
115        env: &mut Self::InstantiateEnv,
116        c1: &Self::Cluster,
117        c1_port: &<Self::Cluster as Node>::Port,
118        c2: &Self::Cluster,
119        c2_port: &<Self::Cluster as Node>::Port,
120        name: Option<&str>,
121        networking_info: &crate::networking::NetworkingInfo,
122    ) -> (syn::Expr, syn::Expr);
123
124    /// Performs any runtime wiring needed after code generation for a
125    /// [`Self::Cluster`]-to-[`Self::Cluster`] channel.
126    ///
127    /// The returned closure is executed once all locations have been instantiated.
128    fn m2m_connect(
129        c1: &Self::Cluster,
130        c1_port: &<Self::Cluster as Node>::Port,
131        c2: &Self::Cluster,
132        c2_port: &<Self::Cluster as Node>::Port,
133    ) -> Box<dyn FnOnce()>;
134
135    fn e2o_many_source(
136        extra_stmts: &mut Vec<syn::Stmt>,
137        p2: &Self::Process,
138        p2_port: &<Self::Process as Node>::Port,
139        codec_type: &syn::Type,
140        shared_handle: String,
141    ) -> syn::Expr;
142    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
143
144    fn e2o_source(
145        extra_stmts: &mut Vec<syn::Stmt>,
146        p1: &Self::External,
147        p1_port: &<Self::External as Node>::Port,
148        p2: &Self::Process,
149        p2_port: &<Self::Process as Node>::Port,
150        codec_type: &syn::Type,
151        shared_handle: String,
152    ) -> syn::Expr;
153    fn e2o_connect(
154        p1: &Self::External,
155        p1_port: &<Self::External as Node>::Port,
156        p2: &Self::Process,
157        p2_port: &<Self::Process as Node>::Port,
158        many: bool,
159        server_hint: NetworkHint,
160    ) -> Box<dyn FnOnce()>;
161
162    fn o2e_sink(
163        p1: &Self::Process,
164        p1_port: &<Self::Process as Node>::Port,
165        p2: &Self::External,
166        p2_port: &<Self::External as Node>::Port,
167        shared_handle: String,
168    ) -> syn::Expr;
169
170    fn cluster_ids(
171        of_cluster: LocationKey,
172    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
173
174    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
175
176    fn cluster_membership_stream(
177        env: &mut Self::InstantiateEnv,
178        at_location: &LocationId,
179        location_id: &LocationId,
180    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
181
182    /// Registers an embedded input for the given ident and element type.
183    ///
184    /// Only meaningful for the embedded deployment backend. The default
185    /// implementation panics.
186    fn register_embedded_input(
187        _env: &mut Self::InstantiateEnv,
188        _location_key: LocationKey,
189        _ident: &syn::Ident,
190        _element_type: &syn::Type,
191    ) {
192        panic!("register_embedded_input is only supported by EmbeddedDeploy");
193    }
194
195    /// Registers an embedded output for the given ident and element type.
196    ///
197    /// Only meaningful for the embedded deployment backend. The default
198    /// implementation panics.
199    fn register_embedded_output(
200        _env: &mut Self::InstantiateEnv,
201        _location_key: LocationKey,
202        _ident: &syn::Ident,
203        _element_type: &syn::Type,
204    ) {
205        panic!("register_embedded_output is only supported by EmbeddedDeploy");
206    }
207}
208
209pub trait ProcessSpec<'a, D>
210where
211    D: Deploy<'a> + ?Sized,
212{
213    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
214}
215
216pub trait IntoProcessSpec<'a, D>
217where
218    D: Deploy<'a> + ?Sized,
219{
220    type ProcessSpec: ProcessSpec<'a, D>;
221    fn into_process_spec(self) -> Self::ProcessSpec;
222}
223
224impl<'a, D, T> IntoProcessSpec<'a, D> for T
225where
226    D: Deploy<'a> + ?Sized,
227    T: ProcessSpec<'a, D>,
228{
229    type ProcessSpec = T;
230    fn into_process_spec(self) -> Self::ProcessSpec {
231        self
232    }
233}
234
235pub trait ClusterSpec<'a, D>
236where
237    D: Deploy<'a> + ?Sized,
238{
239    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
240}
241
242pub trait ExternalSpec<'a, D>
243where
244    D: Deploy<'a> + ?Sized,
245{
246    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
247}
248
249pub trait Node {
250    /// A logical communication endpoint for this node.
251    ///
252    /// Implementors are free to choose the concrete representation (for example,
253    /// a handle or identifier), but it must be `Clone` so that a single logical
254    /// port can be duplicated and passed to multiple consumers. New ports are
255    /// allocated via [`Self::next_port`].
256    type Port: Clone;
257    type Meta: Default;
258    type InstantiateEnv;
259
260    /// Allocates and returns a new port.
261    fn next_port(&self) -> Self::Port;
262
263    fn update_meta(&self, meta: &Self::Meta);
264
265    fn instantiate(
266        &self,
267        env: &mut Self::InstantiateEnv,
268        meta: &mut Self::Meta,
269        graph: DfirGraph,
270        extra_stmts: &[syn::Stmt],
271        sidecars: &[syn::Expr],
272    );
273}
274
275pub type DynSourceSink<Out, In, InErr> = (
276    Pin<Box<dyn Stream<Item = Out>>>,
277    Pin<Box<dyn Sink<In, Error = InErr>>>,
278);
279
280pub trait RegisterPort<'a, D>: Node + Clone
281where
282    D: Deploy<'a> + ?Sized,
283{
284    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
285
286    fn as_bytes_bidi(
287        &self,
288        external_port_id: ExternalPortId,
289    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
290
291    fn as_bincode_bidi<InT, OutT>(
292        &self,
293        external_port_id: ExternalPortId,
294    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
295    where
296        InT: Serialize + 'static,
297        OutT: DeserializeOwned + 'static;
298
299    fn as_bincode_sink<T>(
300        &self,
301        external_port_id: ExternalPortId,
302    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
303    where
304        T: Serialize + 'static;
305
306    fn as_bincode_source<T>(
307        &self,
308        external_port_id: ExternalPortId,
309    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
310    where
311        T: DeserializeOwned + 'static;
312}