1use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39 type Meta = ();
40 type InstantiateEnv = MaelstromDeployment;
41
42 type Process = MaelstromProcess;
43 type Cluster = MaelstromCluster;
44 type External = MaelstromExternal;
45
46 fn o2o_sink_source(
47 _env: &mut Self::InstantiateEnv,
48 _p1: &Self::Process,
49 _p1_port: &<Self::Process as Node>::Port,
50 _p2: &Self::Process,
51 _p2_port: &<Self::Process as Node>::Port,
52 _name: Option<&str>,
53 _networking_info: &crate::networking::NetworkingInfo,
54 ) -> (syn::Expr, syn::Expr) {
55 panic!("Maelstrom deployment does not support processes, only clusters")
56 }
57
58 fn o2o_connect(
59 _p1: &Self::Process,
60 _p1_port: &<Self::Process as Node>::Port,
61 _p2: &Self::Process,
62 _p2_port: &<Self::Process as Node>::Port,
63 ) -> Box<dyn FnOnce()> {
64 panic!("Maelstrom deployment does not support processes, only clusters")
65 }
66
67 fn o2m_sink_source(
68 _env: &mut Self::InstantiateEnv,
69 _p1: &Self::Process,
70 _p1_port: &<Self::Process as Node>::Port,
71 _c2: &Self::Cluster,
72 _c2_port: &<Self::Cluster as Node>::Port,
73 _name: Option<&str>,
74 _networking_info: &crate::networking::NetworkingInfo,
75 ) -> (syn::Expr, syn::Expr) {
76 panic!("Maelstrom deployment does not support processes, only clusters")
77 }
78
79 fn o2m_connect(
80 _p1: &Self::Process,
81 _p1_port: &<Self::Process as Node>::Port,
82 _c2: &Self::Cluster,
83 _c2_port: &<Self::Cluster as Node>::Port,
84 ) -> Box<dyn FnOnce()> {
85 panic!("Maelstrom deployment does not support processes, only clusters")
86 }
87
88 fn m2o_sink_source(
89 _env: &mut Self::InstantiateEnv,
90 _c1: &Self::Cluster,
91 _c1_port: &<Self::Cluster as Node>::Port,
92 _p2: &Self::Process,
93 _p2_port: &<Self::Process as Node>::Port,
94 _name: Option<&str>,
95 _networking_info: &crate::networking::NetworkingInfo,
96 ) -> (syn::Expr, syn::Expr) {
97 panic!("Maelstrom deployment does not support processes, only clusters")
98 }
99
100 fn m2o_connect(
101 _c1: &Self::Cluster,
102 _c1_port: &<Self::Cluster as Node>::Port,
103 _p2: &Self::Process,
104 _p2_port: &<Self::Process as Node>::Port,
105 ) -> Box<dyn FnOnce()> {
106 panic!("Maelstrom deployment does not support processes, only clusters")
107 }
108
109 fn m2m_sink_source(
110 env: &mut Self::InstantiateEnv,
111 _c1: &Self::Cluster,
112 _c1_port: &<Self::Cluster as Node>::Port,
113 _c2: &Self::Cluster,
114 _c2_port: &<Self::Cluster as Node>::Port,
115 _name: Option<&str>,
116 networking_info: &crate::networking::NetworkingInfo,
117 ) -> (syn::Expr, syn::Expr) {
118 use crate::networking::{NetworkingInfo, TcpFault};
119 match networking_info {
120 NetworkingInfo::Tcp { fault } => match (fault, env.nemesis.as_deref()) {
121 (TcpFault::Lossy, _) => {} (_, None) => {} (TcpFault::FailStop, Some("partition")) => {
124 panic!(
125 "Maelstrom partition nemesis requires lossy networking, but fail_stop was used. \
126 Use `TCP.lossy().bincode()` instead of `TCP.fail_stop().bincode()`."
127 );
128 }
129 (TcpFault::FailStop, Some(_)) => {} },
131 }
132 deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
133 }
134
135 fn m2m_connect(
136 _c1: &Self::Cluster,
137 _c1_port: &<Self::Cluster as Node>::Port,
138 _c2: &Self::Cluster,
139 _c2_port: &<Self::Cluster as Node>::Port,
140 ) -> Box<dyn FnOnce()> {
141 Box::new(|| {})
143 }
144
145 fn e2o_many_source(
146 _extra_stmts: &mut Vec<syn::Stmt>,
147 _p2: &Self::Process,
148 _p2_port: &<Self::Process as Node>::Port,
149 _codec_type: &syn::Type,
150 _shared_handle: String,
151 ) -> syn::Expr {
152 panic!("Maelstrom deployment does not support processes, only clusters")
153 }
154
155 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
156 panic!("Maelstrom deployment does not support processes, only clusters")
157 }
158
159 fn e2o_source(
160 _extra_stmts: &mut Vec<syn::Stmt>,
161 _p1: &Self::External,
162 _p1_port: &<Self::External as Node>::Port,
163 _p2: &Self::Process,
164 _p2_port: &<Self::Process as Node>::Port,
165 _codec_type: &syn::Type,
166 _shared_handle: String,
167 ) -> syn::Expr {
168 panic!("Maelstrom deployment does not support processes, only clusters")
169 }
170
171 fn e2o_connect(
172 _p1: &Self::External,
173 _p1_port: &<Self::External as Node>::Port,
174 _p2: &Self::Process,
175 _p2_port: &<Self::Process as Node>::Port,
176 _many: bool,
177 _server_hint: NetworkHint,
178 ) -> Box<dyn FnOnce()> {
179 panic!("Maelstrom deployment does not support processes, only clusters")
180 }
181
182 fn o2e_sink(
183 _p1: &Self::Process,
184 _p1_port: &<Self::Process as Node>::Port,
185 _p2: &Self::External,
186 _p2_port: &<Self::External as Node>::Port,
187 _shared_handle: String,
188 ) -> syn::Expr {
189 panic!("Maelstrom deployment does not support processes, only clusters")
190 }
191
192 fn cluster_ids(
193 _of_cluster: LocationKey,
194 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
195 cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
196 }
197
198 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
199 cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
200 }
201
202 fn cluster_membership_stream(
203 _env: &mut Self::InstantiateEnv,
204 _at_location: &LocationId,
205 location_id: &LocationId,
206 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
207 {
208 cluster_membership_stream(location_id)
209 }
210}
211
212#[derive(Clone)]
214pub struct MaelstromProcess {
215 _private: (),
216}
217
218impl Node for MaelstromProcess {
219 type Port = String;
220 type Meta = ();
221 type InstantiateEnv = MaelstromDeployment;
222
223 fn next_port(&self) -> Self::Port {
224 panic!("Maelstrom deployment does not support processes")
225 }
226
227 fn update_meta(&self, _meta: &Self::Meta) {}
228
229 fn instantiate(
230 &self,
231 _env: &mut Self::InstantiateEnv,
232 _meta: &mut Self::Meta,
233 _graph: DfirGraph,
234 _extra_stmts: &[syn::Stmt],
235 _sidecars: &[syn::Expr],
236 ) {
237 panic!("Maelstrom deployment does not support processes")
238 }
239}
240
241#[derive(Clone)]
243pub struct MaelstromCluster {
244 next_port: Rc<RefCell<usize>>,
245 name_hint: Option<String>,
246}
247
248impl Node for MaelstromCluster {
249 type Port = String;
250 type Meta = ();
251 type InstantiateEnv = MaelstromDeployment;
252
253 fn next_port(&self) -> Self::Port {
254 let next_port = *self.next_port.borrow();
255 *self.next_port.borrow_mut() += 1;
256 format!("port_{}", next_port)
257 }
258
259 fn update_meta(&self, _meta: &Self::Meta) {}
260
261 fn instantiate(
262 &self,
263 env: &mut Self::InstantiateEnv,
264 _meta: &mut Self::Meta,
265 graph: DfirGraph,
266 extra_stmts: &[syn::Stmt],
267 sidecars: &[syn::Expr],
268 ) {
269 let (bin_name, config) = create_graph_trybuild(
270 graph,
271 extra_stmts,
272 sidecars,
273 self.name_hint.as_deref(),
274 crate::compile::trybuild::generate::DeployMode::Maelstrom,
275 LinkingMode::Static,
276 );
277
278 env.bin_name = Some(bin_name);
279 env.project_dir = Some(config.project_dir);
280 env.target_dir = Some(config.target_dir);
281 env.features = config.features;
282 }
283}
284
285#[derive(Clone)]
287pub enum MaelstromExternal {}
288
289impl Node for MaelstromExternal {
290 type Port = String;
291 type Meta = ();
292 type InstantiateEnv = MaelstromDeployment;
293
294 fn next_port(&self) -> Self::Port {
295 unreachable!()
296 }
297
298 fn update_meta(&self, _meta: &Self::Meta) {}
299
300 fn instantiate(
301 &self,
302 _env: &mut Self::InstantiateEnv,
303 _meta: &mut Self::Meta,
304 _graph: DfirGraph,
305 _extra_stmts: &[syn::Stmt],
306 _sidecars: &[syn::Expr],
307 ) {
308 unreachable!()
309 }
310}
311
312impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
313 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
314 unreachable!()
315 }
316
317 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
318 fn as_bytes_bidi(
319 &self,
320 _external_port_id: ExternalPortId,
321 ) -> impl Future<
322 Output = (
323 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
324 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
325 ),
326 > + 'a {
327 async move { unreachable!() }
328 }
329
330 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
331 fn as_bincode_bidi<InT, OutT>(
332 &self,
333 _external_port_id: ExternalPortId,
334 ) -> impl Future<
335 Output = (
336 Pin<Box<dyn Stream<Item = OutT>>>,
337 Pin<Box<dyn Sink<InT, Error = Error>>>,
338 ),
339 > + 'a
340 where
341 InT: Serialize + 'static,
342 OutT: DeserializeOwned + 'static,
343 {
344 async move { unreachable!() }
345 }
346
347 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
348 fn as_bincode_sink<T: Serialize + 'static>(
349 &self,
350 _external_port_id: ExternalPortId,
351 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
352 async move { unreachable!() }
353 }
354
355 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
356 fn as_bincode_source<T: DeserializeOwned + 'static>(
357 &self,
358 _external_port_id: ExternalPortId,
359 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
360 async move { unreachable!() }
361 }
362}
363
364#[derive(Clone)]
366pub struct MaelstromClusterSpec;
367
368impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
369 fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
370 assert_eq!(
371 key,
372 LocationKey::FIRST,
373 "there should only be one location for a Maelstrom deployment"
374 );
375 MaelstromCluster {
376 next_port: Rc::new(RefCell::new(0)),
377 name_hint: Some(name_hint.to_owned()),
378 }
379 }
380}
381
382pub struct MaelstromDeployment {
387 pub node_count: usize,
389 pub maelstrom_path: PathBuf,
391 pub workload: String,
393 pub time_limit: Option<u64>,
395 pub rate: Option<u64>,
397 pub availability: Option<String>,
399 pub nemesis: Option<String>,
401 pub extra_args: Vec<String>,
403
404 pub(crate) bin_name: Option<String>,
406 pub(crate) project_dir: Option<PathBuf>,
407 pub(crate) target_dir: Option<PathBuf>,
408 pub(crate) features: Option<Vec<String>>,
409}
410
411impl MaelstromDeployment {
412 pub fn new(workload: impl Into<String>) -> Self {
414 Self {
415 node_count: 1,
416 maelstrom_path: PathBuf::from("maelstrom"),
417 workload: workload.into(),
418 time_limit: None,
419 rate: None,
420 availability: None,
421 nemesis: None,
422 extra_args: vec![],
423 bin_name: None,
424 project_dir: None,
425 target_dir: None,
426 features: None,
427 }
428 }
429
430 pub fn node_count(mut self, count: usize) -> Self {
432 self.node_count = count;
433 self
434 }
435
436 pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
438 self.maelstrom_path = path.into();
439 self
440 }
441
442 pub fn time_limit(mut self, seconds: u64) -> Self {
444 self.time_limit = Some(seconds);
445 self
446 }
447
448 pub fn rate(mut self, rate: u64) -> Self {
450 self.rate = Some(rate);
451 self
452 }
453
454 pub fn availability(mut self, availability: impl Into<String>) -> Self {
456 self.availability = Some(availability.into());
457 self
458 }
459
460 pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
462 self.nemesis = Some(nemesis.into());
463 self
464 }
465
466 pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
468 self.extra_args.extend(args.into_iter().map(Into::into));
469 self
470 }
471
472 pub fn build(&self) -> Result<PathBuf, Error> {
475 let bin_name = self
476 .bin_name
477 .as_ref()
478 .expect("No binary name set - did you call deploy?");
479 let project_dir = self.project_dir.as_ref().expect("No project dir set");
480 let target_dir = self.target_dir.as_ref().expect("No target dir set");
481
482 let mut cmd = std::process::Command::new("cargo");
483 cmd.arg("build")
484 .arg("--example")
485 .arg(bin_name)
486 .arg("--no-default-features")
487 .current_dir(project_dir)
488 .env("CARGO_TARGET_DIR", target_dir)
489 .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
490
491 let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
493 if let Some(features) = &self.features {
494 all_features.extend(features.iter().cloned());
495 }
496 if !all_features.is_empty() {
497 cmd.arg("--features").arg(all_features.join(","));
498 }
499
500 let status = cmd.status()?;
501 if !status.success() {
502 return Err(Error::other(format!(
503 "cargo build failed with status: {}",
504 status
505 )));
506 }
507
508 Ok(target_dir.join("debug").join("examples").join(bin_name))
509 }
510
511 pub fn run(self) -> Result<(), Error> {
515 let binary_path = self.build()?;
516
517 let mut cmd = std::process::Command::new(&self.maelstrom_path);
518 cmd.arg("test")
519 .arg("-w")
520 .arg(&self.workload)
521 .arg("--bin")
522 .arg(&binary_path)
523 .arg("--node-count")
524 .arg(self.node_count.to_string())
525 .stdout(Stdio::piped());
526
527 if let Some(time_limit) = self.time_limit {
528 cmd.arg("--time-limit").arg(time_limit.to_string());
529 }
530
531 if let Some(rate) = self.rate {
532 cmd.arg("--rate").arg(rate.to_string());
533 }
534
535 if let Some(availability) = self.availability {
536 cmd.arg("--availability").arg(availability);
537 }
538
539 if let Some(nemesis) = self.nemesis {
540 cmd.arg("--nemesis").arg(nemesis);
541 }
542
543 for arg in &self.extra_args {
544 cmd.arg(arg);
545 }
546
547 let spawned = cmd.spawn()?;
548
549 for line in BufReader::new(spawned.stdout.unwrap()).lines() {
550 let line = line?;
551 eprintln!("{}", &line);
552
553 if line.starts_with("Analysis invalid!") {
554 return Err(Error::other("Analysis was invalid"));
555 } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
556 || line.starts_with("Everything looks good!")
557 {
558 return Ok(());
559 }
560 }
561
562 Err(Error::other("Maelstrom produced an unexpected result"))
563 }
564
565 pub fn binary_path(&self) -> Option<PathBuf> {
567 let bin_name = self.bin_name.as_ref()?;
568 let target_dir = self.target_dir.as_ref()?;
569 Some(target_dir.join("debug").join("examples").join(bin_name))
570 }
571}