hydro_test/maelstrom/
unique_ids.rs1use hydro_lang::location::cluster::CLUSTER_SELF_ID;
6use hydro_lang::prelude::*;
7use serde::{Deserialize, Serialize};
8
9#[derive(Serialize, Deserialize)]
10pub struct GenerateMessage {
11 pub msg_id: usize,
12}
13
14pub fn unique_id_server<'a, C: 'a>(
15 input: KeyedStream<String, GenerateMessage, Cluster<'a, C>>,
16 nondet_ids: NonDet,
17) -> KeyedStream<String, serde_json::Value, Cluster<'a, C>> {
18 input
19 .entries()
20 .assume_ordering(nondet_ids)
21 .enumerate()
22 .map(q!(move |(idx, (sender, msg))| {
23 let self_id = &CLUSTER_SELF_ID;
24 (
25 sender,
26 serde_json::json!({
27 "type": "generate_ok",
28 "id": format!("{}-{}", self_id, idx),
29 "in_reply_to": msg.msg_id
30 }),
31 )
32 }))
33 .into_keyed()
34}
35
36#[cfg(test)]
37mod tests {
38 use std::path::PathBuf;
39 use std::str::FromStr;
40
41 use hydro_lang::deploy::maelstrom::deploy_maelstrom::{
42 MaelstromClusterSpec, MaelstromDeployment,
43 };
44 use hydro_lang::deploy::maelstrom::maelstrom_bidi_clients;
45
46 use super::*;
47
48 #[tokio::test]
49 #[cfg_attr(not(maelstrom_available), ignore)]
50 async fn test_with_maelstrom() {
51 let mut flow = FlowBuilder::new();
52 let cluster = flow.cluster::<()>();
53
54 let (input, output_handle) = maelstrom_bidi_clients(&cluster);
55 output_handle.complete(unique_id_server(
56 input,
57 nondet!(),
58 ));
59
60 let mut deployment = MaelstromDeployment::new("unique-ids")
61 .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
62 .node_count(3)
63 .time_limit(30)
64 .rate(1000)
65 .availability("total")
66 .nemesis("partition");
67
68 let _ = flow
69 .with_cluster(&cluster, MaelstromClusterSpec)
70 .deploy(&mut deployment);
71
72 deployment.run().unwrap();
73 }
74}