hydro_lang/
test_util.rs

1use std::future::Future;
2use std::panic::{AssertUnwindSafe, catch_unwind};
3use std::pin::Pin;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::{FlowBuilder, Process, Stream, Unbounded};
9
10pub async fn multi_location_test<'a, T, C, O>(
11    thunk: impl FnOnce(&FlowBuilder<'a>, &Process<'a, ()>) -> Stream<T, Process<'a>, Unbounded, O>,
12    check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
13) where
14    T: Serialize + DeserializeOwned + 'static,
15    C: Future<Output = ()>,
16{
17    let mut deployment = hydro_deploy::Deployment::new();
18    let flow = FlowBuilder::new();
19    let process = flow.process::<()>();
20    let external = flow.external_process::<()>();
21    let out = thunk(&flow, &process);
22    let out_port = out.send_bincode_external(&external);
23    let nodes = flow
24        .with_remaining_processes(|| deployment.Localhost())
25        .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
26        .with_external(&external, deployment.Localhost())
27        .deploy(&mut deployment);
28
29    deployment.deploy().await.unwrap();
30
31    let external_out = nodes.connect_source_bincode(out_port).await;
32    deployment.start().await.unwrap();
33
34    check(external_out).await;
35}
36
37pub async fn stream_transform_test<'a, T, C, O>(
38    thunk: impl FnOnce(&Process<'a>) -> Stream<T, Process<'a>, Unbounded, O>,
39    check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
40) where
41    T: Serialize + DeserializeOwned + 'static,
42    C: Future<Output = ()>,
43{
44    let mut deployment = hydro_deploy::Deployment::new();
45    let flow = FlowBuilder::new();
46    let process = flow.process::<()>();
47    let external = flow.external_process::<()>();
48    let out = thunk(&process);
49    let out_port = out.send_bincode_external(&external);
50    let nodes = flow
51        .with_process(&process, deployment.Localhost())
52        .with_external(&external, deployment.Localhost())
53        .deploy(&mut deployment);
54
55    deployment.deploy().await.unwrap();
56
57    let external_out = nodes.connect_source_bincode(out_port).await;
58    deployment.start().await.unwrap();
59
60    check(external_out).await;
61}
62
63// from https://users.rust-lang.org/t/how-to-write-doctest-that-panic-with-an-expected-message/58650
64pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
65    let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
66
67    let chk = |panic_msg: &'_ str| {
68        if !panic_msg.contains(msg) {
69            panic!(
70                "Expected a panic message containing `{}`; got: `{}`.",
71                msg, panic_msg
72            );
73        }
74    };
75
76    err.downcast::<String>()
77        .map(|s| chk(&s))
78        .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
79        .expect("Unexpected panic type!");
80}