Skip to main content

hydro_lang/deploy/maelstrom/
deploy_maelstrom.rs

1//! Deployment backend for Hydro that targets Maelstrom for distributed systems testing.
2//!
3//! Maelstrom is a workbench for learning distributed systems by writing your own.
4//! This backend compiles Hydro programs to binaries that communicate via Maelstrom's
5//! stdin/stdout JSON protocol.
6
7use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30/// Deployment backend that targets Maelstrom for distributed systems testing.
31///
32/// This backend compiles Hydro programs to binaries that communicate via Maelstrom's
33/// stdin/stdout JSON protocol. It is restricted to programs with:
34/// - Exactly one cluster (no processes)
35/// - A single external input channel for client communication
36pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39    type Meta = ();
40    type InstantiateEnv = MaelstromDeployment;
41
42    type Process = MaelstromProcess;
43    type Cluster = MaelstromCluster;
44    type External = MaelstromExternal;
45
46    fn o2o_sink_source(
47        _env: &mut Self::InstantiateEnv,
48        _p1: &Self::Process,
49        _p1_port: &<Self::Process as Node>::Port,
50        _p2: &Self::Process,
51        _p2_port: &<Self::Process as Node>::Port,
52        _name: Option<&str>,
53        _networking_info: &crate::networking::NetworkingInfo,
54    ) -> (syn::Expr, syn::Expr) {
55        panic!("Maelstrom deployment does not support processes, only clusters")
56    }
57
58    fn o2o_connect(
59        _p1: &Self::Process,
60        _p1_port: &<Self::Process as Node>::Port,
61        _p2: &Self::Process,
62        _p2_port: &<Self::Process as Node>::Port,
63    ) -> Box<dyn FnOnce()> {
64        panic!("Maelstrom deployment does not support processes, only clusters")
65    }
66
67    fn o2m_sink_source(
68        _env: &mut Self::InstantiateEnv,
69        _p1: &Self::Process,
70        _p1_port: &<Self::Process as Node>::Port,
71        _c2: &Self::Cluster,
72        _c2_port: &<Self::Cluster as Node>::Port,
73        _name: Option<&str>,
74        _networking_info: &crate::networking::NetworkingInfo,
75    ) -> (syn::Expr, syn::Expr) {
76        panic!("Maelstrom deployment does not support processes, only clusters")
77    }
78
79    fn o2m_connect(
80        _p1: &Self::Process,
81        _p1_port: &<Self::Process as Node>::Port,
82        _c2: &Self::Cluster,
83        _c2_port: &<Self::Cluster as Node>::Port,
84    ) -> Box<dyn FnOnce()> {
85        panic!("Maelstrom deployment does not support processes, only clusters")
86    }
87
88    fn m2o_sink_source(
89        _env: &mut Self::InstantiateEnv,
90        _c1: &Self::Cluster,
91        _c1_port: &<Self::Cluster as Node>::Port,
92        _p2: &Self::Process,
93        _p2_port: &<Self::Process as Node>::Port,
94        _name: Option<&str>,
95        _networking_info: &crate::networking::NetworkingInfo,
96    ) -> (syn::Expr, syn::Expr) {
97        panic!("Maelstrom deployment does not support processes, only clusters")
98    }
99
100    fn m2o_connect(
101        _c1: &Self::Cluster,
102        _c1_port: &<Self::Cluster as Node>::Port,
103        _p2: &Self::Process,
104        _p2_port: &<Self::Process as Node>::Port,
105    ) -> Box<dyn FnOnce()> {
106        panic!("Maelstrom deployment does not support processes, only clusters")
107    }
108
109    fn m2m_sink_source(
110        env: &mut Self::InstantiateEnv,
111        _c1: &Self::Cluster,
112        _c1_port: &<Self::Cluster as Node>::Port,
113        _c2: &Self::Cluster,
114        _c2_port: &<Self::Cluster as Node>::Port,
115        _name: Option<&str>,
116        networking_info: &crate::networking::NetworkingInfo,
117    ) -> (syn::Expr, syn::Expr) {
118        use crate::networking::{NetworkingInfo, TcpFault};
119        match networking_info {
120            NetworkingInfo::Tcp { fault } => match (fault, env.nemesis.as_deref()) {
121                (TcpFault::Lossy, _) => {} // lossy is always allowed
122                (_, None) => {}            // no nemesis means any fault model is fine
123                (TcpFault::FailStop, Some("partition")) => {
124                    panic!(
125                        "Maelstrom partition nemesis requires lossy networking, but fail_stop was used. \
126                         Use `TCP.lossy().bincode()` instead of `TCP.fail_stop().bincode()`."
127                    );
128                }
129                (TcpFault::FailStop, Some(_)) => {} // other nemeses are fine with fail_stop
130            },
131        }
132        deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
133    }
134
135    fn m2m_connect(
136        _c1: &Self::Cluster,
137        _c1_port: &<Self::Cluster as Node>::Port,
138        _c2: &Self::Cluster,
139        _c2_port: &<Self::Cluster as Node>::Port,
140    ) -> Box<dyn FnOnce()> {
141        // No runtime connection needed for Maelstrom - all routing is via stdin/stdout
142        Box::new(|| {})
143    }
144
145    fn e2o_many_source(
146        _extra_stmts: &mut Vec<syn::Stmt>,
147        _p2: &Self::Process,
148        _p2_port: &<Self::Process as Node>::Port,
149        _codec_type: &syn::Type,
150        _shared_handle: String,
151    ) -> syn::Expr {
152        panic!("Maelstrom deployment does not support processes, only clusters")
153    }
154
155    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
156        panic!("Maelstrom deployment does not support processes, only clusters")
157    }
158
159    fn e2o_source(
160        _extra_stmts: &mut Vec<syn::Stmt>,
161        _p1: &Self::External,
162        _p1_port: &<Self::External as Node>::Port,
163        _p2: &Self::Process,
164        _p2_port: &<Self::Process as Node>::Port,
165        _codec_type: &syn::Type,
166        _shared_handle: String,
167    ) -> syn::Expr {
168        panic!("Maelstrom deployment does not support processes, only clusters")
169    }
170
171    fn e2o_connect(
172        _p1: &Self::External,
173        _p1_port: &<Self::External as Node>::Port,
174        _p2: &Self::Process,
175        _p2_port: &<Self::Process as Node>::Port,
176        _many: bool,
177        _server_hint: NetworkHint,
178    ) -> Box<dyn FnOnce()> {
179        panic!("Maelstrom deployment does not support processes, only clusters")
180    }
181
182    fn o2e_sink(
183        _p1: &Self::Process,
184        _p1_port: &<Self::Process as Node>::Port,
185        _p2: &Self::External,
186        _p2_port: &<Self::External as Node>::Port,
187        _shared_handle: String,
188    ) -> syn::Expr {
189        panic!("Maelstrom deployment does not support processes, only clusters")
190    }
191
192    fn cluster_ids(
193        _of_cluster: LocationKey,
194    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
195        cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
196    }
197
198    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
199        cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
200    }
201
202    fn cluster_membership_stream(
203        _env: &mut Self::InstantiateEnv,
204        _at_location: &LocationId,
205        location_id: &LocationId,
206    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
207    {
208        cluster_membership_stream(location_id)
209    }
210}
211
212/// A dummy process type for Maelstrom (processes are not supported).
213#[derive(Clone)]
214pub struct MaelstromProcess {
215    _private: (),
216}
217
218impl Node for MaelstromProcess {
219    type Port = String;
220    type Meta = ();
221    type InstantiateEnv = MaelstromDeployment;
222
223    fn next_port(&self) -> Self::Port {
224        panic!("Maelstrom deployment does not support processes")
225    }
226
227    fn update_meta(&self, _meta: &Self::Meta) {}
228
229    fn instantiate(
230        &self,
231        _env: &mut Self::InstantiateEnv,
232        _meta: &mut Self::Meta,
233        _graph: DfirGraph,
234        _extra_stmts: &[syn::Stmt],
235        _sidecars: &[syn::Expr],
236    ) {
237        panic!("Maelstrom deployment does not support processes")
238    }
239}
240
241/// Represents a cluster in Maelstrom deployment.
242#[derive(Clone)]
243pub struct MaelstromCluster {
244    next_port: Rc<RefCell<usize>>,
245    name_hint: Option<String>,
246}
247
248impl Node for MaelstromCluster {
249    type Port = String;
250    type Meta = ();
251    type InstantiateEnv = MaelstromDeployment;
252
253    fn next_port(&self) -> Self::Port {
254        let next_port = *self.next_port.borrow();
255        *self.next_port.borrow_mut() += 1;
256        format!("port_{}", next_port)
257    }
258
259    fn update_meta(&self, _meta: &Self::Meta) {}
260
261    fn instantiate(
262        &self,
263        env: &mut Self::InstantiateEnv,
264        _meta: &mut Self::Meta,
265        graph: DfirGraph,
266        extra_stmts: &[syn::Stmt],
267        sidecars: &[syn::Expr],
268    ) {
269        let (bin_name, config) = create_graph_trybuild(
270            graph,
271            extra_stmts,
272            sidecars,
273            self.name_hint.as_deref(),
274            crate::compile::trybuild::generate::DeployMode::Maelstrom,
275            LinkingMode::Static,
276        );
277
278        env.bin_name = Some(bin_name);
279        env.project_dir = Some(config.project_dir);
280        env.target_dir = Some(config.target_dir);
281        env.features = config.features;
282    }
283}
284
285/// Represents an external client in Maelstrom deployment.
286#[derive(Clone)]
287pub enum MaelstromExternal {}
288
289impl Node for MaelstromExternal {
290    type Port = String;
291    type Meta = ();
292    type InstantiateEnv = MaelstromDeployment;
293
294    fn next_port(&self) -> Self::Port {
295        unreachable!()
296    }
297
298    fn update_meta(&self, _meta: &Self::Meta) {}
299
300    fn instantiate(
301        &self,
302        _env: &mut Self::InstantiateEnv,
303        _meta: &mut Self::Meta,
304        _graph: DfirGraph,
305        _extra_stmts: &[syn::Stmt],
306        _sidecars: &[syn::Expr],
307    ) {
308        unreachable!()
309    }
310}
311
312impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
313    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
314        unreachable!()
315    }
316
317    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
318    fn as_bytes_bidi(
319        &self,
320        _external_port_id: ExternalPortId,
321    ) -> impl Future<
322        Output = (
323            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
324            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
325        ),
326    > + 'a {
327        async move { unreachable!() }
328    }
329
330    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
331    fn as_bincode_bidi<InT, OutT>(
332        &self,
333        _external_port_id: ExternalPortId,
334    ) -> impl Future<
335        Output = (
336            Pin<Box<dyn Stream<Item = OutT>>>,
337            Pin<Box<dyn Sink<InT, Error = Error>>>,
338        ),
339    > + 'a
340    where
341        InT: Serialize + 'static,
342        OutT: DeserializeOwned + 'static,
343    {
344        async move { unreachable!() }
345    }
346
347    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
348    fn as_bincode_sink<T: Serialize + 'static>(
349        &self,
350        _external_port_id: ExternalPortId,
351    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
352        async move { unreachable!() }
353    }
354
355    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
356    fn as_bincode_source<T: DeserializeOwned + 'static>(
357        &self,
358        _external_port_id: ExternalPortId,
359    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
360        async move { unreachable!() }
361    }
362}
363
364/// Specification for building a Maelstrom cluster.
365#[derive(Clone)]
366pub struct MaelstromClusterSpec;
367
368impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
369    fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
370        assert_eq!(
371            key,
372            LocationKey::FIRST,
373            "there should only be one location for a Maelstrom deployment"
374        );
375        MaelstromCluster {
376            next_port: Rc::new(RefCell::new(0)),
377            name_hint: Some(name_hint.to_owned()),
378        }
379    }
380}
381
382/// The Maelstrom deployment environment.
383///
384/// This holds configuration for the Maelstrom run and accumulates
385/// compilation artifacts during deployment.
386pub struct MaelstromDeployment {
387    /// Number of nodes in the cluster.
388    pub node_count: usize,
389    /// Path to the maelstrom binary.
390    pub maelstrom_path: PathBuf,
391    /// Workload to run (e.g., "echo", "broadcast", "g-counter").
392    pub workload: String,
393    /// Time limit in seconds.
394    pub time_limit: Option<u64>,
395    /// Rate of requests per second.
396    pub rate: Option<u64>,
397    /// The availability of nodes.
398    pub availability: Option<String>,
399    /// Nemesis to run during tests.
400    pub nemesis: Option<String>,
401    /// Additional maelstrom arguments.
402    pub extra_args: Vec<String>,
403
404    // Populated during deployment
405    pub(crate) bin_name: Option<String>,
406    pub(crate) project_dir: Option<PathBuf>,
407    pub(crate) target_dir: Option<PathBuf>,
408    pub(crate) features: Option<Vec<String>>,
409}
410
411impl MaelstromDeployment {
412    /// Create a new Maelstrom deployment with the given node count.
413    pub fn new(workload: impl Into<String>) -> Self {
414        Self {
415            node_count: 1,
416            maelstrom_path: PathBuf::from("maelstrom"),
417            workload: workload.into(),
418            time_limit: None,
419            rate: None,
420            availability: None,
421            nemesis: None,
422            extra_args: vec![],
423            bin_name: None,
424            project_dir: None,
425            target_dir: None,
426            features: None,
427        }
428    }
429
430    /// Set the node count.
431    pub fn node_count(mut self, count: usize) -> Self {
432        self.node_count = count;
433        self
434    }
435
436    /// Set the path to the maelstrom binary.
437    pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
438        self.maelstrom_path = path.into();
439        self
440    }
441
442    /// Set the time limit in seconds.
443    pub fn time_limit(mut self, seconds: u64) -> Self {
444        self.time_limit = Some(seconds);
445        self
446    }
447
448    /// Set the request rate per second.
449    pub fn rate(mut self, rate: u64) -> Self {
450        self.rate = Some(rate);
451        self
452    }
453
454    /// Set the availability for the test.
455    pub fn availability(mut self, availability: impl Into<String>) -> Self {
456        self.availability = Some(availability.into());
457        self
458    }
459
460    /// Set the nemesis for the test.
461    pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
462        self.nemesis = Some(nemesis.into());
463        self
464    }
465
466    /// Add extra arguments to pass to maelstrom.
467    pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
468        self.extra_args.extend(args.into_iter().map(Into::into));
469        self
470    }
471
472    /// Build the compiled binary in dev mode.
473    /// Returns the path to the compiled binary.
474    pub fn build(&self) -> Result<PathBuf, Error> {
475        let bin_name = self
476            .bin_name
477            .as_ref()
478            .expect("No binary name set - did you call deploy?");
479        let project_dir = self.project_dir.as_ref().expect("No project dir set");
480        let target_dir = self.target_dir.as_ref().expect("No target dir set");
481
482        let mut cmd = std::process::Command::new("cargo");
483        cmd.arg("build")
484            .arg("--example")
485            .arg(bin_name)
486            .arg("--no-default-features")
487            .current_dir(project_dir)
488            .env("CARGO_TARGET_DIR", target_dir)
489            .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
490
491        // Always include maelstrom_runtime feature for runtime support
492        let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
493        if let Some(features) = &self.features {
494            all_features.extend(features.iter().cloned());
495        }
496        if !all_features.is_empty() {
497            cmd.arg("--features").arg(all_features.join(","));
498        }
499
500        let status = cmd.status()?;
501        if !status.success() {
502            return Err(Error::other(format!(
503                "cargo build failed with status: {}",
504                status
505            )));
506        }
507
508        Ok(target_dir.join("debug").join("examples").join(bin_name))
509    }
510
511    /// Run Maelstrom with the compiled binary, return Ok(()) if all checks pass.
512    ///
513    /// This will block until Maelstrom completes.
514    pub fn run(self) -> Result<(), Error> {
515        let binary_path = self.build()?;
516
517        let mut cmd = std::process::Command::new(&self.maelstrom_path);
518        cmd.arg("test")
519            .arg("-w")
520            .arg(&self.workload)
521            .arg("--bin")
522            .arg(&binary_path)
523            .arg("--node-count")
524            .arg(self.node_count.to_string())
525            .stdout(Stdio::piped());
526
527        if let Some(time_limit) = self.time_limit {
528            cmd.arg("--time-limit").arg(time_limit.to_string());
529        }
530
531        if let Some(rate) = self.rate {
532            cmd.arg("--rate").arg(rate.to_string());
533        }
534
535        if let Some(availability) = self.availability {
536            cmd.arg("--availability").arg(availability);
537        }
538
539        if let Some(nemesis) = self.nemesis {
540            cmd.arg("--nemesis").arg(nemesis);
541        }
542
543        for arg in &self.extra_args {
544            cmd.arg(arg);
545        }
546
547        let spawned = cmd.spawn()?;
548
549        for line in BufReader::new(spawned.stdout.unwrap()).lines() {
550            let line = line?;
551            eprintln!("{}", &line);
552
553            if line.starts_with("Analysis invalid!") {
554                return Err(Error::other("Analysis was invalid"));
555            } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
556                || line.starts_with("Everything looks good!")
557            {
558                return Ok(());
559            }
560        }
561
562        Err(Error::other("Maelstrom produced an unexpected result"))
563    }
564
565    /// Get the path to the compiled binary (after building).
566    pub fn binary_path(&self) -> Option<PathBuf> {
567        let bin_name = self.bin_name.as_ref()?;
568        let target_dir = self.target_dir.as_ref()?;
569        Some(target_dir.join("debug").join("examples").join(bin_name))
570    }
571}