hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    fn o2o_sink_source(
26        p1: &Self::Process,
27        p1_port: &<Self::Process as Node>::Port,
28        p2: &Self::Process,
29        p2_port: &<Self::Process as Node>::Port,
30    ) -> (syn::Expr, syn::Expr);
31    fn o2o_connect(
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36    ) -> Box<dyn FnOnce()>;
37
38    fn o2m_sink_source(
39        p1: &Self::Process,
40        p1_port: &<Self::Process as Node>::Port,
41        c2: &Self::Cluster,
42        c2_port: &<Self::Cluster as Node>::Port,
43    ) -> (syn::Expr, syn::Expr);
44    fn o2m_connect(
45        p1: &Self::Process,
46        p1_port: &<Self::Process as Node>::Port,
47        c2: &Self::Cluster,
48        c2_port: &<Self::Cluster as Node>::Port,
49    ) -> Box<dyn FnOnce()>;
50
51    fn m2o_sink_source(
52        c1: &Self::Cluster,
53        c1_port: &<Self::Cluster as Node>::Port,
54        p2: &Self::Process,
55        p2_port: &<Self::Process as Node>::Port,
56    ) -> (syn::Expr, syn::Expr);
57    fn m2o_connect(
58        c1: &Self::Cluster,
59        c1_port: &<Self::Cluster as Node>::Port,
60        p2: &Self::Process,
61        p2_port: &<Self::Process as Node>::Port,
62    ) -> Box<dyn FnOnce()>;
63
64    fn m2m_sink_source(
65        c1: &Self::Cluster,
66        c1_port: &<Self::Cluster as Node>::Port,
67        c2: &Self::Cluster,
68        c2_port: &<Self::Cluster as Node>::Port,
69    ) -> (syn::Expr, syn::Expr);
70    fn m2m_connect(
71        c1: &Self::Cluster,
72        c1_port: &<Self::Cluster as Node>::Port,
73        c2: &Self::Cluster,
74        c2_port: &<Self::Cluster as Node>::Port,
75    ) -> Box<dyn FnOnce()>;
76
77    fn e2o_many_source(
78        extra_stmts: &mut Vec<syn::Stmt>,
79        p2: &Self::Process,
80        p2_port: &<Self::Process as Node>::Port,
81        codec_type: &syn::Type,
82        shared_handle: String,
83    ) -> syn::Expr;
84    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
85
86    fn e2o_source(
87        extra_stmts: &mut Vec<syn::Stmt>,
88        p1: &Self::External,
89        p1_port: &<Self::External as Node>::Port,
90        p2: &Self::Process,
91        p2_port: &<Self::Process as Node>::Port,
92        codec_type: &syn::Type,
93        shared_handle: String,
94    ) -> syn::Expr;
95    fn e2o_connect(
96        p1: &Self::External,
97        p1_port: &<Self::External as Node>::Port,
98        p2: &Self::Process,
99        p2_port: &<Self::Process as Node>::Port,
100        many: bool,
101        server_hint: NetworkHint,
102    ) -> Box<dyn FnOnce()>;
103
104    fn o2e_sink(
105        p1: &Self::Process,
106        p1_port: &<Self::Process as Node>::Port,
107        p2: &Self::External,
108        p2_port: &<Self::External as Node>::Port,
109        shared_handle: String,
110    ) -> syn::Expr;
111
112    fn cluster_ids(
113        of_cluster: usize,
114    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
115
116    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
117
118    fn cluster_membership_stream(
119        location_id: &LocationId,
120    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121}
122
123pub trait ProcessSpec<'a, D>
124where
125    D: Deploy<'a> + ?Sized,
126{
127    fn build(self, id: usize, name_hint: &str) -> D::Process;
128}
129
130pub trait IntoProcessSpec<'a, D>
131where
132    D: Deploy<'a> + ?Sized,
133{
134    type ProcessSpec: ProcessSpec<'a, D>;
135    fn into_process_spec(self) -> Self::ProcessSpec;
136}
137
138impl<'a, D, T> IntoProcessSpec<'a, D> for T
139where
140    D: Deploy<'a> + ?Sized,
141    T: ProcessSpec<'a, D>,
142{
143    type ProcessSpec = T;
144    fn into_process_spec(self) -> Self::ProcessSpec {
145        self
146    }
147}
148
149pub trait ClusterSpec<'a, D>
150where
151    D: Deploy<'a> + ?Sized,
152{
153    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
154}
155
156pub trait ExternalSpec<'a, D>
157where
158    D: Deploy<'a> + ?Sized,
159{
160    fn build(self, id: usize, name_hint: &str) -> D::External;
161}
162
163pub trait Node {
164    /// A logical communication endpoint for this node.
165    ///
166    /// Implementors are free to choose the concrete representation (for example,
167    /// a handle or identifier), but it must be `Clone` so that a single logical
168    /// port can be duplicated and passed to multiple consumers. New ports are
169    /// allocated via [`Self::next_port`].
170    type Port: Clone;
171    type Meta: Default;
172    type InstantiateEnv;
173
174    /// Allocates and returns a new port.
175    fn next_port(&self) -> Self::Port;
176
177    fn update_meta(&self, meta: &Self::Meta);
178
179    fn instantiate(
180        &self,
181        env: &mut Self::InstantiateEnv,
182        meta: &mut Self::Meta,
183        graph: DfirGraph,
184        extra_stmts: Vec<syn::Stmt>,
185    );
186}
187
188pub type DynSourceSink<Out, In, InErr> = (
189    Pin<Box<dyn Stream<Item = Out>>>,
190    Pin<Box<dyn Sink<In, Error = InErr>>>,
191);
192
193pub trait RegisterPort<'a, D>: Node + Clone
194where
195    D: Deploy<'a> + ?Sized,
196{
197    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
198
199    fn as_bytes_bidi(
200        &self,
201        external_port_id: ExternalPortId,
202    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
203
204    fn as_bincode_bidi<InT, OutT>(
205        &self,
206        external_port_id: ExternalPortId,
207    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
208    where
209        InT: Serialize + 'static,
210        OutT: DeserializeOwned + 'static;
211
212    fn as_bincode_sink<T>(
213        &self,
214        external_port_id: ExternalPortId,
215    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
216    where
217        T: Serialize + 'static;
218
219    fn as_bincode_source<T>(
220        &self,
221        external_port_id: ExternalPortId,
222    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
223    where
224        T: DeserializeOwned + 'static;
225}