hydro_test/maelstrom/
broadcast.rs1use std::collections::HashSet;
6use std::time::Duration;
7
8use hydro_lang::live_collections::stream::{AtLeastOnce, NoOrder};
9use hydro_lang::prelude::*;
10use serde::{Deserialize, Serialize};
11
12#[derive(Serialize, Deserialize, Clone)]
13pub struct Broadcast {
14 pub msg_id: usize,
15 pub message: u32,
16}
17
18#[derive(Serialize, Deserialize, Clone)]
19pub struct Read {
20 pub msg_id: usize,
21}
22
23#[derive(Serialize, Deserialize, Clone)]
24pub struct Topology {
25 pub msg_id: usize,
26}
27
28#[derive(Serialize, Deserialize, Clone)]
29#[serde(tag = "type")]
30pub enum Request {
31 #[serde(alias = "broadcast")]
32 Broadcast(Broadcast),
33 #[serde(alias = "read")]
34 Read(Read),
35 #[serde(alias = "topology")]
36 Topology(Topology),
37}
38
39fn broadcast_core<'a, C: 'a>(
40 cluster: &Cluster<'a, C>,
41 writes: Stream<u32, Cluster<'a, C>, Unbounded, NoOrder>,
42) -> Singleton<HashSet<u32>, Cluster<'a, C>, Unbounded> {
43 let (broadcasted_forward, broadcasted) =
44 cluster.forward_ref::<Stream<_, _, Unbounded, NoOrder, AtLeastOnce>>();
45
46 let cur_state = sliced! {
47 let new_writes = use(writes, nondet!());
48 let recv_broadcast = use(broadcasted, nondet!());
49 let mut local_state = use::state_null::<Stream<_, _, _, NoOrder>>();
50
51 local_state = local_state.chain(new_writes).weaken_retries::<AtLeastOnce>()
52 .chain(recv_broadcast.flatten_unordered())
53 .unique();
54 local_state.clone().fold(q!(|| HashSet::new()), q!(|set, v| {
55 set.insert(v);
56 }, commutative = manual_proof!()))
57 };
58
59 broadcasted_forward.complete(
60 cur_state
61 .clone()
62 .sample_every(q!(Duration::from_millis(50)), nondet!())
63 .broadcast(
64 cluster,
65 TCP.lossy(nondet!()).bincode(),
66 nondet!(),
67 )
68 .values(),
69 );
70
71 cur_state
72}
73
74pub fn broadcast_server<'a, C: 'a>(
75 cluster: &Cluster<'a, C>,
76 input: KeyedStream<String, Request, Cluster<'a, C>>,
77) -> KeyedStream<String, serde_json::Value, Cluster<'a, C>, Unbounded, NoOrder> {
78 let broadcast_requests = input.clone().filter_map(q!(|body| {
79 if let Request::Broadcast(b) = body {
80 Some(b)
81 } else {
82 None
83 }
84 }));
85
86 let broadcast_response = broadcast_requests.clone().map(q!(|req| {
87 serde_json::json!({
88 "type": "broadcast_ok",
89 "in_reply_to": req.msg_id
90 })
91 }));
92
93 let written_data = broadcast_requests.values().map(q!(|t| t.message));
94 let current_state = broadcast_core(cluster, written_data);
95
96 let read_requests = input.clone().filter_map(q!(|body| {
97 if let Request::Read(r) = body {
98 Some(r)
99 } else {
100 None
101 }
102 }));
103
104 let read_response = sliced! {
105 let req = use(read_requests, nondet!());
106 let data = use(
107 current_state,
108 nondet!()
109 );
110
111 req.cross_singleton(data).map(q!(|(req, data)| {
112 serde_json::json!({
113 "type": "read_ok",
114 "messages": data.into_iter().collect::<Vec<_>>(),
115 "in_reply_to": req.msg_id
116 })
117 }))
118 };
119
120 let topology_requests = input.filter_map(q!(|body| {
121 if let Request::Topology(t) = body {
122 Some(t)
123 } else {
124 None
125 }
126 }));
127
128 let topology_response = topology_requests.map(q!(|req| {
129 serde_json::json!({
130 "type": "topology_ok",
131 "in_reply_to": req.msg_id
132 })
133 }));
134
135 broadcast_response
136 .merge_unordered(read_response)
137 .merge_unordered(topology_response)
138}
139
140#[cfg(test)]
141mod tests {
142 use std::path::PathBuf;
143 use std::str::FromStr;
144
145 use hydro_lang::deploy::maelstrom::deploy_maelstrom::{
146 MaelstromClusterSpec, MaelstromDeployment,
147 };
148 use hydro_lang::deploy::maelstrom::maelstrom_bidi_clients;
149
150 use super::*;
151
152 #[tokio::test]
153 #[cfg_attr(not(maelstrom_available), ignore)]
154 async fn broadcast_3a_maelstrom() {
155 let mut flow = FlowBuilder::new();
156 let cluster = flow.cluster::<()>();
157
158 let (input, output_handle) = maelstrom_bidi_clients(&cluster);
159 output_handle
160 .complete(broadcast_server(&cluster, input).assume_ordering(nondet!()));
161
162 let mut deployment = MaelstromDeployment::new("broadcast")
163 .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
164 .node_count(1)
165 .time_limit(20)
166 .rate(10);
167
168 let _ = flow
169 .with_cluster(&cluster, MaelstromClusterSpec)
170 .deploy(&mut deployment);
171
172 deployment.run().unwrap();
173 }
174
175 #[tokio::test]
176 #[cfg_attr(not(maelstrom_available), ignore)]
177 async fn broadcast_3b_maelstrom() {
178 let mut flow = FlowBuilder::new();
179 let cluster = flow.cluster::<()>();
180
181 let (input, output_handle) = maelstrom_bidi_clients(&cluster);
182 output_handle
183 .complete(broadcast_server(&cluster, input).assume_ordering(nondet!()));
184
185 let mut deployment = MaelstromDeployment::new("broadcast")
186 .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
187 .node_count(5)
188 .time_limit(20)
189 .rate(10);
190
191 let _ = flow
192 .with_cluster(&cluster, MaelstromClusterSpec)
193 .deploy(&mut deployment);
194
195 deployment.run().unwrap();
196 }
197
198 #[tokio::test]
199 #[cfg_attr(not(maelstrom_available), ignore)]
200 async fn broadcast_3c_maelstrom() {
201 let mut flow = FlowBuilder::new();
202 let cluster = flow.cluster::<()>();
203
204 let (input, output_handle) = maelstrom_bidi_clients(&cluster);
205 output_handle
206 .complete(broadcast_server(&cluster, input).assume_ordering(nondet!()));
207
208 let mut deployment = MaelstromDeployment::new("broadcast")
209 .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
210 .node_count(5)
211 .time_limit(20)
212 .rate(10)
213 .nemesis("partition");
214
215 let _ = flow
216 .with_cluster(&cluster, MaelstromClusterSpec)
217 .deploy(&mut deployment);
218
219 deployment.run().unwrap();
220 }
221}