hydro_test/maelstrom/
echo.rs1use hydro_lang::prelude::*;
6use serde::{Deserialize, Serialize};
7
8#[derive(Serialize, Deserialize)]
9pub struct EchoMessage {
10 pub msg_id: usize,
11 pub echo: String,
12}
13
14pub fn echo_server<'a, C>(
15 input: KeyedStream<String, EchoMessage, Cluster<'a, C>>,
16) -> KeyedStream<String, serde_json::Value, Cluster<'a, C>> {
17 input.map(q!(|msg| {
18 serde_json::json!({
19 "type": "echo_ok",
20 "echo": msg.echo,
21 "in_reply_to": msg.msg_id
22 })
23 }))
24}
25
26#[cfg(test)]
27mod tests {
28 use std::path::PathBuf;
29 use std::str::FromStr;
30
31 use hydro_lang::deploy::maelstrom::deploy_maelstrom::{
32 MaelstromClusterSpec, MaelstromDeployment,
33 };
34 use hydro_lang::deploy::maelstrom::maelstrom_bidi_clients;
35
36 use super::*;
37
38 #[tokio::test]
39 #[cfg_attr(not(maelstrom_available), ignore)]
40 async fn test_with_maelstrom() {
41 let mut flow = FlowBuilder::new();
42 let cluster = flow.cluster::<()>();
43
44 let (input, output_handle) = maelstrom_bidi_clients(&cluster);
45 output_handle.complete(echo_server(input));
46
47 let mut deployment = MaelstromDeployment::new("echo")
48 .maelstrom_path(PathBuf::from_str(&std::env::var("MAELSTROM_PATH").unwrap()).unwrap())
49 .node_count(1)
50 .time_limit(10);
51
52 let _ = flow
53 .with_cluster(&cluster, MaelstromClusterSpec)
54 .deploy(&mut deployment);
55
56 deployment.run().unwrap();
57 }
58}