hydro_lang/
deploy_runtime.rs

1use std::collections::HashMap;
2
3use hydro_deploy_integration::{
4    ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
5};
6use serde::{Deserialize, Serialize};
7use stageleft::{QuotedWithContext, RuntimeData, q};
8
9#[derive(Default, Serialize, Deserialize)]
10pub struct HydroMeta {
11    pub clusters: HashMap<usize, Vec<u32>>,
12    pub cluster_id: Option<u32>,
13    pub subgraph_id: usize,
14}
15
16pub fn cluster_members(
17    cli: RuntimeData<&DeployPorts<HydroMeta>>,
18    of_cluster: usize,
19) -> impl QuotedWithContext<'_, &[u32], ()> + Copy {
20    q!(cli
21        .meta
22        .clusters
23        .get(&of_cluster)
24        .map(|v| v.as_slice())
25        .unwrap_or(&[])) // we default to empty slice because this is the scenario where the cluster is unused in the graph
26}
27
28pub fn cluster_self_id(
29    cli: RuntimeData<&DeployPorts<HydroMeta>>,
30) -> impl QuotedWithContext<'_, u32, ()> + Copy {
31    q!(cli
32        .meta
33        .cluster_id
34        .expect("Tried to read Cluster Self ID on a non-cluster node"))
35}
36
37pub fn deploy_o2o(
38    env: RuntimeData<&DeployPorts<HydroMeta>>,
39    p1_port: &str,
40    p2_port: &str,
41) -> (syn::Expr, syn::Expr) {
42    (
43        { q!(env.port(p1_port).connect::<ConnectedDirect>().into_sink()).splice_untyped_ctx(&()) },
44        {
45            q!(env.port(p2_port).connect::<ConnectedDirect>().into_source()).splice_untyped_ctx(&())
46        },
47    )
48}
49
50pub fn deploy_o2m(
51    env: RuntimeData<&DeployPorts<HydroMeta>>,
52    p1_port: &str,
53    c2_port: &str,
54) -> (syn::Expr, syn::Expr) {
55    (
56        {
57            q!({
58                env.port(p1_port)
59                    .connect::<ConnectedDemux<ConnectedDirect>>()
60                    .into_sink()
61            })
62            .splice_untyped_ctx(&())
63        },
64        {
65            q!(env.port(c2_port).connect::<ConnectedDirect>().into_source()).splice_untyped_ctx(&())
66        },
67    )
68}
69
70pub fn deploy_m2o(
71    env: RuntimeData<&DeployPorts<HydroMeta>>,
72    c1_port: &str,
73    p2_port: &str,
74) -> (syn::Expr, syn::Expr) {
75    (
76        { q!(env.port(c1_port).connect::<ConnectedDirect>().into_sink()).splice_untyped_ctx(&()) },
77        {
78            q!({
79                env.port(p2_port)
80                    .connect::<ConnectedTagged<ConnectedDirect>>()
81                    .into_source()
82            })
83            .splice_untyped_ctx(&())
84        },
85    )
86}
87
88pub fn deploy_m2m(
89    env: RuntimeData<&DeployPorts<HydroMeta>>,
90    c1_port: &str,
91    c2_port: &str,
92) -> (syn::Expr, syn::Expr) {
93    (
94        {
95            q!({
96                env.port(c1_port)
97                    .connect::<ConnectedDemux<ConnectedDirect>>()
98                    .into_sink()
99            })
100            .splice_untyped_ctx(&())
101        },
102        {
103            q!({
104                env.port(c2_port)
105                    .connect::<ConnectedTagged<ConnectedDirect>>()
106                    .into_source()
107            })
108            .splice_untyped_ctx(&())
109        },
110    )
111}
112
113pub fn deploy_e2o(
114    env: RuntimeData<&DeployPorts<HydroMeta>>,
115    _e1_port: &str,
116    p2_port: &str,
117) -> syn::Expr {
118    q!(env.port(p2_port).connect::<ConnectedDirect>().into_source()).splice_untyped_ctx(&())
119}
120
121pub fn deploy_o2e(
122    env: RuntimeData<&DeployPorts<HydroMeta>>,
123    p1_port: &str,
124    _e2_port: &str,
125) -> syn::Expr {
126    q!(env.port(p1_port).connect::<ConnectedDirect>().into_sink()).splice_untyped_ctx(&())
127}