hydro_lang/
test_util.rs

1//! Various utilities for testing short Hydro programs, especially in doctests.
2
3use std::future::Future;
4use std::panic::{AssertUnwindSafe, catch_unwind};
5use std::pin::Pin;
6
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::compile::builder::FlowBuilder;
11use crate::live_collections::boundedness::Unbounded;
12use crate::live_collections::stream::{Ordering, Retries, Stream};
13use crate::location::Process;
14
15/// Sets up a test with multiple processes / clusters declared in the test logic (`thunk`). The test logic must return
16/// a single streaming output, which can then be read in `check` (an async closure) to perform assertions.
17///
18/// Each declared process is deployed as a single local process, and each cluster is deployed as four local processes.
19pub async fn multi_location_test<'a, T, C, O: Ordering, R: Retries>(
20    thunk: impl FnOnce(&FlowBuilder<'a>, &Process<'a, ()>) -> Stream<T, Process<'a>, Unbounded, O, R>,
21    check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
22) where
23    T: Serialize + DeserializeOwned + 'static,
24    C: Future<Output = ()>,
25{
26    let mut deployment = hydro_deploy::Deployment::new();
27    let flow = FlowBuilder::new();
28    let process = flow.process::<()>();
29    let external = flow.external::<()>();
30    let out = thunk(&flow, &process);
31    let out_port = out.send_bincode_external(&external);
32    let nodes = flow
33        .with_remaining_processes(|| deployment.Localhost())
34        .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
35        .with_external(&external, deployment.Localhost())
36        .deploy(&mut deployment);
37
38    deployment.deploy().await.unwrap();
39
40    let external_out = nodes.connect(out_port).await;
41    deployment.start().await.unwrap();
42
43    check(external_out).await;
44}
45
46/// Sets up a test declared in `thunk` that executes on a single [`Process`], returning a streaming output
47/// that can be read in `check` (an async closure) to perform assertions.
48pub async fn stream_transform_test<'a, T, C, O: Ordering, R: Retries>(
49    thunk: impl FnOnce(&Process<'a>) -> Stream<T, Process<'a>, Unbounded, O, R>,
50    check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
51) where
52    T: Serialize + DeserializeOwned + 'static,
53    C: Future<Output = ()>,
54{
55    let mut deployment = hydro_deploy::Deployment::new();
56    let flow = FlowBuilder::new();
57    let process = flow.process::<()>();
58    let external = flow.external::<()>();
59    let out = thunk(&process);
60    let out_port = out.send_bincode_external(&external);
61    let nodes = flow
62        .with_process(&process, deployment.Localhost())
63        .with_external(&external, deployment.Localhost())
64        .deploy(&mut deployment);
65
66    deployment.deploy().await.unwrap();
67
68    let external_out = nodes.connect(out_port).await;
69    deployment.start().await.unwrap();
70
71    check(external_out).await;
72}
73
74// from https://users.rust-lang.org/t/how-to-write-doctest-that-panic-with-an-expected-message/58650
75/// Asserts that running the given closure results in a panic with a message containing `msg`.
76pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
77    let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
78
79    let chk = |panic_msg: &'_ str| {
80        if !panic_msg.contains(msg) {
81            panic!(
82                "Expected a panic message containing `{}`; got: `{}`.",
83                msg, panic_msg
84            );
85        }
86    };
87
88    err.downcast::<String>()
89        .map(|s| chk(&s))
90        .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
91        .expect("Unexpected panic type!");
92}