hydro_lang/
deploy_runtime.rs1use std::collections::HashMap;
2
3use hydro_deploy_integration::{
4 ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
5};
6use serde::{Deserialize, Serialize};
7use stageleft::{QuotedWithContext, RuntimeData, q};
8
9#[derive(Default, Serialize, Deserialize)]
10pub struct HydroMeta {
11 pub clusters: HashMap<usize, Vec<u32>>,
12 pub cluster_id: Option<u32>,
13 pub subgraph_id: usize,
14}
15
16pub fn cluster_members(
17 cli: RuntimeData<&DeployPorts<HydroMeta>>,
18 of_cluster: usize,
19) -> impl QuotedWithContext<'_, &[u32], ()> + Copy {
20 q!(cli
21 .meta
22 .clusters
23 .get(&of_cluster)
24 .map(|v| v.as_slice())
25 .unwrap_or(&[])) }
27
28pub fn cluster_self_id(
29 cli: RuntimeData<&DeployPorts<HydroMeta>>,
30) -> impl QuotedWithContext<'_, u32, ()> + Copy {
31 q!(cli
32 .meta
33 .cluster_id
34 .expect("Tried to read Cluster Self ID on a non-cluster node"))
35}
36
37pub fn deploy_o2o(
38 env: RuntimeData<&DeployPorts<HydroMeta>>,
39 p1_port: &str,
40 p2_port: &str,
41) -> (syn::Expr, syn::Expr) {
42 (
43 { q!(env.port(p1_port).connect::<ConnectedDirect>().into_sink()).splice_untyped_ctx(&()) },
44 {
45 q!(env.port(p2_port).connect::<ConnectedDirect>().into_source()).splice_untyped_ctx(&())
46 },
47 )
48}
49
50pub fn deploy_o2m(
51 env: RuntimeData<&DeployPorts<HydroMeta>>,
52 p1_port: &str,
53 c2_port: &str,
54) -> (syn::Expr, syn::Expr) {
55 (
56 {
57 q!({
58 env.port(p1_port)
59 .connect::<ConnectedDemux<ConnectedDirect>>()
60 .into_sink()
61 })
62 .splice_untyped_ctx(&())
63 },
64 {
65 q!(env.port(c2_port).connect::<ConnectedDirect>().into_source()).splice_untyped_ctx(&())
66 },
67 )
68}
69
70pub fn deploy_m2o(
71 env: RuntimeData<&DeployPorts<HydroMeta>>,
72 c1_port: &str,
73 p2_port: &str,
74) -> (syn::Expr, syn::Expr) {
75 (
76 { q!(env.port(c1_port).connect::<ConnectedDirect>().into_sink()).splice_untyped_ctx(&()) },
77 {
78 q!({
79 env.port(p2_port)
80 .connect::<ConnectedTagged<ConnectedDirect>>()
81 .into_source()
82 })
83 .splice_untyped_ctx(&())
84 },
85 )
86}
87
88pub fn deploy_m2m(
89 env: RuntimeData<&DeployPorts<HydroMeta>>,
90 c1_port: &str,
91 c2_port: &str,
92) -> (syn::Expr, syn::Expr) {
93 (
94 {
95 q!({
96 env.port(c1_port)
97 .connect::<ConnectedDemux<ConnectedDirect>>()
98 .into_sink()
99 })
100 .splice_untyped_ctx(&())
101 },
102 {
103 q!({
104 env.port(c2_port)
105 .connect::<ConnectedTagged<ConnectedDirect>>()
106 .into_source()
107 })
108 .splice_untyped_ctx(&())
109 },
110 )
111}
112
113pub fn deploy_e2o(
114 env: RuntimeData<&DeployPorts<HydroMeta>>,
115 _e1_port: &str,
116 p2_port: &str,
117) -> syn::Expr {
118 q!(env.port(p2_port).connect::<ConnectedDirect>().into_source()).splice_untyped_ctx(&())
119}
120
121pub fn deploy_o2e(
122 env: RuntimeData<&DeployPorts<HydroMeta>>,
123 p1_port: &str,
124 _e2_port: &str,
125) -> syn::Expr {
126 q!(env.port(p1_port).connect::<ConnectedDirect>().into_sink()).splice_untyped_ctx(&())
127}