Skip to main content

hydro_test/maelstrom/
broadcast.rs

1//! This implements the Maelstrom broadcast workload.
2//!
3//! See <https://fly.io/dist-sys/3a/> and <https://fly.io/dist-sys/3b/>
4
5use std::collections::HashSet;
6use std::time::Duration;
7
8use hydro_lang::live_collections::stream::{AtLeastOnce, NoOrder};
9use hydro_lang::prelude::*;
10use serde::{Deserialize, Serialize};
11
12#[derive(Serialize, Deserialize, Clone)]
13pub struct Broadcast {
14    pub msg_id: usize,
15    pub message: u32,
16}
17
18#[derive(Serialize, Deserialize, Clone)]
19pub struct Read {
20    pub msg_id: usize,
21}
22
23#[derive(Serialize, Deserialize, Clone)]
24pub struct Topology {
25    pub msg_id: usize,
26}
27
28#[derive(Serialize, Deserialize, Clone)]
29#[serde(tag = "type")]
30pub enum Request {
31    #[serde(alias = "broadcast")]
32    Broadcast(Broadcast),
33    #[serde(alias = "read")]
34    Read(Read),
35    #[serde(alias = "topology")]
36    Topology(Topology),
37}
38
39fn broadcast_core<'a, C: 'a>(
40    cluster: &Cluster<'a, C>,
41    writes: Stream<u32, Cluster<'a, C>, Unbounded, NoOrder>,
42) -> Singleton<HashSet<u32>, Cluster<'a, C>, Unbounded> {
43    let (broadcasted_forward, broadcasted) =
44        cluster.forward_ref::<Stream<_, _, Unbounded, NoOrder, AtLeastOnce>>();
45
46    let cur_state = sliced! {
47        let new_writes = use(writes, nondet!(/** TODO */));
48        let recv_broadcast = use(broadcasted, nondet!(/** TODO */));
49        let mut local_state = use::state_null::<Stream<_, _, _, NoOrder>>();
50
51        local_state = local_state.chain(new_writes).weaken_retries::<AtLeastOnce>()
52            .chain(recv_broadcast.flatten_unordered())
53            .unique();
54        local_state.clone().fold(q!(|| HashSet::new()), q!(|set, v| {
55            set.insert(v);
56        }, commutative = manual_proof!(/** TODO */)))
57    };
58
59    broadcasted_forward.complete(
60        cur_state
61            .clone()
62            .sample_every(q!(Duration::from_millis(50)), nondet!(/** TODO */))
63            .broadcast(
64                cluster,
65                TCP.lossy(nondet!(/** TODO */)).bincode(),
66                nondet!(/** TODO */),
67            )
68            .values(),
69    );
70
71    cur_state
72}
73
74pub fn broadcast_server<'a, C: 'a>(
75    cluster: &Cluster<'a, C>,
76    input: KeyedStream<String, Request, Cluster<'a, C>>,
77) -> KeyedStream<String, serde_json::Value, Cluster<'a, C>, Unbounded, NoOrder> {
78    let broadcast_requests = input.clone().filter_map(q!(|body| {
79        if let Request::Broadcast(b) = body {
80            Some(b)
81        } else {
82            None
83        }
84    }));
85
86    let broadcast_response = broadcast_requests.clone().map(q!(|req| {
87        serde_json::json!({
88            "type": "broadcast_ok",
89            "in_reply_to": req.msg_id
90        })
91    }));
92
93    let written_data = broadcast_requests.values().map(q!(|t| t.message));
94    let current_state = broadcast_core(cluster, written_data);
95
96    let read_requests = input.clone().filter_map(q!(|body| {
97        if let Request::Read(r) = body {
98            Some(r)
99        } else {
100            None
101        }
102    }));
103
104    let read_response = sliced! {
105        let req = use(read_requests, nondet!(/** batching of requests does not matter */));
106        let data = use(
107            current_state,
108            nondet!(/** we only guarantee eventual consistency */)
109        );
110
111        req.cross_singleton(data).map(q!(|(req, data)| {
112            serde_json::json!({
113                "type": "read_ok",
114                "messages": data.into_iter().collect::<Vec<_>>(),
115                "in_reply_to": req.msg_id
116            })
117        }))
118    };
119
120    let topology_requests = input.filter_map(q!(|body| {
121        if let Request::Topology(t) = body {
122            Some(t)
123        } else {
124            None
125        }
126    }));
127
128    let topology_response = topology_requests.map(q!(|req| {
129        serde_json::json!({
130            "type": "topology_ok",
131            "in_reply_to": req.msg_id
132        })
133    }));
134
135    broadcast_response
136        .merge_unordered(read_response)
137        .merge_unordered(topology_response)
138}
139
140#[cfg(test)]
141mod tests {
142    use std::path::PathBuf;
143    use std::str::FromStr;
144
145    use hydro_lang::deploy::maelstrom::deploy_maelstrom::{
146        MaelstromClusterSpec, MaelstromDeployment,
147    };
148    use hydro_lang::deploy::maelstrom::maelstrom_bidi_clients;
149
150    use super::*;
151
152    #[tokio::test]
153    #[cfg_attr(not(maelstrom_available), ignore)]
154    async fn broadcast_3a_maelstrom() {
155        let mut flow = FlowBuilder::new();
156        let cluster = flow.cluster::<()>();
157
158        let (input, output_handle) = maelstrom_bidi_clients(&cluster);
159        output_handle
160            .complete(broadcast_server(&cluster, input).assume_ordering(nondet!(/** test */)));
161
162        let mut deployment = MaelstromDeployment::new("broadcast")
163            .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
164            .node_count(1)
165            .time_limit(20)
166            .rate(10);
167
168        let _ = flow
169            .with_cluster(&cluster, MaelstromClusterSpec)
170            .deploy(&mut deployment);
171
172        deployment.run().unwrap();
173    }
174
175    #[tokio::test]
176    #[cfg_attr(not(maelstrom_available), ignore)]
177    async fn broadcast_3b_maelstrom() {
178        let mut flow = FlowBuilder::new();
179        let cluster = flow.cluster::<()>();
180
181        let (input, output_handle) = maelstrom_bidi_clients(&cluster);
182        output_handle
183            .complete(broadcast_server(&cluster, input).assume_ordering(nondet!(/** test */)));
184
185        let mut deployment = MaelstromDeployment::new("broadcast")
186            .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
187            .node_count(5)
188            .time_limit(20)
189            .rate(10);
190
191        let _ = flow
192            .with_cluster(&cluster, MaelstromClusterSpec)
193            .deploy(&mut deployment);
194
195        deployment.run().unwrap();
196    }
197
198    #[tokio::test]
199    #[cfg_attr(not(maelstrom_available), ignore)]
200    async fn broadcast_3c_maelstrom() {
201        let mut flow = FlowBuilder::new();
202        let cluster = flow.cluster::<()>();
203
204        let (input, output_handle) = maelstrom_bidi_clients(&cluster);
205        output_handle
206            .complete(broadcast_server(&cluster, input).assume_ordering(nondet!(/** test */)));
207
208        let mut deployment = MaelstromDeployment::new("broadcast")
209            .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
210            .node_count(5)
211            .time_limit(20)
212            .rate(10)
213            .nemesis("partition");
214
215        let _ = flow
216            .with_cluster(&cluster, MaelstromClusterSpec)
217            .deploy(&mut deployment);
218
219        deployment.run().unwrap();
220    }
221}