hydro_lang/
deploy_runtime.rs

1use std::collections::HashMap;
2
3use dfir_rs::util::deploy::{
4    ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
5};
6use serde::{Deserialize, Serialize};
7use stageleft::{QuotedWithContext, RuntimeData, q};
8
9#[derive(Default, Serialize, Deserialize)]
10pub struct HydroMeta {
11    pub clusters: HashMap<usize, Vec<u32>>,
12    pub cluster_id: Option<u32>,
13    pub subgraph_id: usize,
14}
15
16pub fn cluster_members(
17    cli: RuntimeData<&DeployPorts<HydroMeta>>,
18    of_cluster: usize,
19) -> impl QuotedWithContext<&[u32], ()> + Copy {
20    q!(cli
21        .meta
22        .clusters
23        .get(&of_cluster)
24        .map(|v| v.as_slice())
25        .unwrap_or(&[])) // we default to empty slice because this is the scenario where the cluster is unused in the graph
26}
27
28pub fn cluster_self_id(
29    cli: RuntimeData<&DeployPorts<HydroMeta>>,
30) -> impl QuotedWithContext<u32, ()> + Copy {
31    q!(cli
32        .meta
33        .cluster_id
34        .expect("Tried to read Cluster ID on a non-cluster node"))
35}
36
37pub fn deploy_o2o(
38    env: RuntimeData<&DeployPorts<HydroMeta>>,
39    p1_port: &str,
40    p2_port: &str,
41) -> (syn::Expr, syn::Expr) {
42    (
43        {
44            q!({
45                env.port(p1_port)
46                    .connect_local_blocking::<ConnectedDirect>()
47                    .into_sink()
48            })
49            .splice_untyped_ctx(&())
50        },
51        {
52            q!({
53                env.port(p2_port)
54                    .connect_local_blocking::<ConnectedDirect>()
55                    .into_source()
56            })
57            .splice_untyped_ctx(&())
58        },
59    )
60}
61
62pub fn deploy_o2m(
63    env: RuntimeData<&DeployPorts<HydroMeta>>,
64    p1_port: &str,
65    c2_port: &str,
66) -> (syn::Expr, syn::Expr) {
67    (
68        {
69            q!({
70                env.port(p1_port)
71                    .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
72                    .into_sink()
73            })
74            .splice_untyped_ctx(&())
75        },
76        {
77            q!({
78                env.port(c2_port)
79                    .connect_local_blocking::<ConnectedDirect>()
80                    .into_source()
81            })
82            .splice_untyped_ctx(&())
83        },
84    )
85}
86
87pub fn deploy_m2o(
88    env: RuntimeData<&DeployPorts<HydroMeta>>,
89    c1_port: &str,
90    p2_port: &str,
91) -> (syn::Expr, syn::Expr) {
92    (
93        {
94            q!({
95                env.port(c1_port)
96                    .connect_local_blocking::<ConnectedDirect>()
97                    .into_sink()
98            })
99            .splice_untyped_ctx(&())
100        },
101        {
102            q!({
103                env.port(p2_port)
104                    .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
105                    .into_source()
106            })
107            .splice_untyped_ctx(&())
108        },
109    )
110}
111
112pub fn deploy_m2m(
113    env: RuntimeData<&DeployPorts<HydroMeta>>,
114    c1_port: &str,
115    c2_port: &str,
116) -> (syn::Expr, syn::Expr) {
117    (
118        {
119            q!({
120                env.port(c1_port)
121                    .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
122                    .into_sink()
123            })
124            .splice_untyped_ctx(&())
125        },
126        {
127            q!({
128                env.port(c2_port)
129                    .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
130                    .into_source()
131            })
132            .splice_untyped_ctx(&())
133        },
134    )
135}
136
137pub fn deploy_e2o(
138    env: RuntimeData<&DeployPorts<HydroMeta>>,
139    _e1_port: &str,
140    p2_port: &str,
141) -> syn::Expr {
142    q!({
143        env.port(p2_port)
144            .connect_local_blocking::<ConnectedDirect>()
145            .into_source()
146    })
147    .splice_untyped_ctx(&())
148}
149
150pub fn deploy_o2e(
151    env: RuntimeData<&DeployPorts<HydroMeta>>,
152    p1_port: &str,
153    _e2_port: &str,
154) -> syn::Expr {
155    q!({
156        env.port(p1_port)
157            .connect_local_blocking::<ConnectedDirect>()
158            .into_sink()
159    })
160    .splice_untyped_ctx(&())
161}