hydro_lang/
deploy_runtime.rs
1use std::collections::HashMap;
2
3use dfir_rs::util::deploy::{
4 ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
5};
6use serde::{Deserialize, Serialize};
7use stageleft::{QuotedWithContext, RuntimeData, q};
8
9#[derive(Default, Serialize, Deserialize)]
10pub struct HydroMeta {
11 pub clusters: HashMap<usize, Vec<u32>>,
12 pub cluster_id: Option<u32>,
13 pub subgraph_id: usize,
14}
15
16pub fn cluster_members(
17 cli: RuntimeData<&DeployPorts<HydroMeta>>,
18 of_cluster: usize,
19) -> impl QuotedWithContext<&[u32], ()> + Copy {
20 q!(cli
21 .meta
22 .clusters
23 .get(&of_cluster)
24 .map(|v| v.as_slice())
25 .unwrap_or(&[])) }
27
28pub fn cluster_self_id(
29 cli: RuntimeData<&DeployPorts<HydroMeta>>,
30) -> impl QuotedWithContext<u32, ()> + Copy {
31 q!(cli
32 .meta
33 .cluster_id
34 .expect("Tried to read Cluster ID on a non-cluster node"))
35}
36
37pub fn deploy_o2o(
38 env: RuntimeData<&DeployPorts<HydroMeta>>,
39 p1_port: &str,
40 p2_port: &str,
41) -> (syn::Expr, syn::Expr) {
42 (
43 {
44 q!({
45 env.port(p1_port)
46 .connect_local_blocking::<ConnectedDirect>()
47 .into_sink()
48 })
49 .splice_untyped_ctx(&())
50 },
51 {
52 q!({
53 env.port(p2_port)
54 .connect_local_blocking::<ConnectedDirect>()
55 .into_source()
56 })
57 .splice_untyped_ctx(&())
58 },
59 )
60}
61
62pub fn deploy_o2m(
63 env: RuntimeData<&DeployPorts<HydroMeta>>,
64 p1_port: &str,
65 c2_port: &str,
66) -> (syn::Expr, syn::Expr) {
67 (
68 {
69 q!({
70 env.port(p1_port)
71 .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
72 .into_sink()
73 })
74 .splice_untyped_ctx(&())
75 },
76 {
77 q!({
78 env.port(c2_port)
79 .connect_local_blocking::<ConnectedDirect>()
80 .into_source()
81 })
82 .splice_untyped_ctx(&())
83 },
84 )
85}
86
87pub fn deploy_m2o(
88 env: RuntimeData<&DeployPorts<HydroMeta>>,
89 c1_port: &str,
90 p2_port: &str,
91) -> (syn::Expr, syn::Expr) {
92 (
93 {
94 q!({
95 env.port(c1_port)
96 .connect_local_blocking::<ConnectedDirect>()
97 .into_sink()
98 })
99 .splice_untyped_ctx(&())
100 },
101 {
102 q!({
103 env.port(p2_port)
104 .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
105 .into_source()
106 })
107 .splice_untyped_ctx(&())
108 },
109 )
110}
111
112pub fn deploy_m2m(
113 env: RuntimeData<&DeployPorts<HydroMeta>>,
114 c1_port: &str,
115 c2_port: &str,
116) -> (syn::Expr, syn::Expr) {
117 (
118 {
119 q!({
120 env.port(c1_port)
121 .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
122 .into_sink()
123 })
124 .splice_untyped_ctx(&())
125 },
126 {
127 q!({
128 env.port(c2_port)
129 .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
130 .into_source()
131 })
132 .splice_untyped_ctx(&())
133 },
134 )
135}
136
137pub fn deploy_e2o(
138 env: RuntimeData<&DeployPorts<HydroMeta>>,
139 _e1_port: &str,
140 p2_port: &str,
141) -> syn::Expr {
142 q!({
143 env.port(p2_port)
144 .connect_local_blocking::<ConnectedDirect>()
145 .into_source()
146 })
147 .splice_untyped_ctx(&())
148}
149
150pub fn deploy_o2e(
151 env: RuntimeData<&DeployPorts<HydroMeta>>,
152 p1_port: &str,
153 _e2_port: &str,
154) -> syn::Expr {
155 q!({
156 env.port(p1_port)
157 .connect_local_blocking::<ConnectedDirect>()
158 .into_sink()
159 })
160 .splice_untyped_ctx(&())
161}