Skip to main content

hydro_test/maelstrom/
unique_ids.rs

1//! This implements the Maelstrom unique-ids workload.
2//!
3//! See <https://fly.io/dist-sys/2/>
4
5use hydro_lang::location::cluster::CLUSTER_SELF_ID;
6use hydro_lang::prelude::*;
7use serde::{Deserialize, Serialize};
8
9#[derive(Serialize, Deserialize)]
10pub struct GenerateMessage {
11    pub msg_id: usize,
12}
13
14pub fn unique_id_server<'a, C: 'a>(
15    input: KeyedStream<String, GenerateMessage, Cluster<'a, C>>,
16    nondet_ids: NonDet,
17) -> KeyedStream<String, serde_json::Value, Cluster<'a, C>> {
18    input
19        .entries()
20        .assume_ordering(nondet_ids)
21        .enumerate()
22        .map(q!(move |(idx, (sender, msg))| {
23            let self_id = &CLUSTER_SELF_ID;
24            (
25                sender,
26                serde_json::json!({
27                    "type": "generate_ok",
28                    "id": format!("{}-{}", self_id, idx),
29                    "in_reply_to": msg.msg_id
30                }),
31            )
32        }))
33        .into_keyed()
34}
35
36#[cfg(test)]
37mod tests {
38    use std::path::PathBuf;
39    use std::str::FromStr;
40
41    use hydro_lang::deploy::maelstrom::deploy_maelstrom::{
42        MaelstromClusterSpec, MaelstromDeployment,
43    };
44    use hydro_lang::deploy::maelstrom::maelstrom_bidi_clients;
45
46    use super::*;
47
48    #[tokio::test]
49    #[cfg_attr(not(maelstrom_available), ignore)]
50    async fn test_with_maelstrom() {
51        let mut flow = FlowBuilder::new();
52        let cluster = flow.cluster::<()>();
53
54        let (input, output_handle) = maelstrom_bidi_clients(&cluster);
55        output_handle.complete(unique_id_server(
56            input,
57            nondet!(/** ids can be nondeterministic */),
58        ));
59
60        let mut deployment = MaelstromDeployment::new("unique-ids")
61            .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
62            .node_count(3)
63            .time_limit(30)
64            .rate(1000)
65            .availability("total")
66            .nemesis("partition");
67
68        let _ = flow
69            .with_cluster(&cluster, MaelstromClusterSpec)
70            .deploy(&mut deployment);
71
72        deployment.run().unwrap();
73    }
74}