Skip to main content

hydro_test/maelstrom/
echo.rs

1//! This implements the Maelstrom echo workload.
2//!
3//! See <https://fly.io/dist-sys/1/>
4
5use hydro_lang::prelude::*;
6use serde::{Deserialize, Serialize};
7
8#[derive(Serialize, Deserialize)]
9pub struct EchoMessage {
10    pub msg_id: usize,
11    pub echo: String,
12}
13
14pub fn echo_server<'a, C>(
15    input: KeyedStream<String, EchoMessage, Cluster<'a, C>>,
16) -> KeyedStream<String, serde_json::Value, Cluster<'a, C>> {
17    input.map(q!(|msg| {
18        serde_json::json!({
19            "type": "echo_ok",
20            "echo": msg.echo,
21            "in_reply_to": msg.msg_id
22        })
23    }))
24}
25
26#[cfg(test)]
27mod tests {
28    use std::path::PathBuf;
29    use std::str::FromStr;
30
31    use hydro_lang::deploy::maelstrom::deploy_maelstrom::{
32        MaelstromClusterSpec, MaelstromDeployment,
33    };
34    use hydro_lang::deploy::maelstrom::maelstrom_bidi_clients;
35
36    use super::*;
37
38    #[tokio::test]
39    #[cfg_attr(not(maelstrom_available), ignore)]
40    async fn test_with_maelstrom() {
41        let mut flow = FlowBuilder::new();
42        let cluster = flow.cluster::<()>();
43
44        let (input, output_handle) = maelstrom_bidi_clients(&cluster);
45        output_handle.complete(echo_server(input));
46
47        let mut deployment = MaelstromDeployment::new("echo")
48            .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
49            .node_count(1)
50            .time_limit(10);
51
52        let _ = flow
53            .with_cluster(&cluster, MaelstromClusterSpec)
54            .deploy(&mut deployment);
55
56        deployment.run().unwrap();
57    }
58}