hydro_lang/
test_util.rs

1use std::future::Future;
2use std::panic::{AssertUnwindSafe, catch_unwind};
3use std::pin::Pin;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::{FlowBuilder, Process, Stream, Unbounded};
9
10pub async fn multi_location_test<
11    'a,
12    O: Serialize + DeserializeOwned + 'static,
13    C: Future<Output = ()>,
14    OutOrder,
15>(
16    thunk: impl FnOnce(
17        &FlowBuilder<'a>,
18        &Process<'a, ()>,
19    ) -> Stream<O, Process<'a>, Unbounded, OutOrder>,
20    check: impl FnOnce(Pin<Box<dyn dfir_rs::futures::Stream<Item = O>>>) -> C,
21) {
22    let mut deployment = hydro_deploy::Deployment::new();
23    let flow = FlowBuilder::new();
24    let process = flow.process::<()>();
25    let external = flow.external_process::<()>();
26    let out = thunk(&flow, &process);
27    let out_port = out.send_bincode_external(&external);
28    let nodes = flow
29        .with_remaining_processes(|| deployment.Localhost())
30        .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
31        .with_external(&external, deployment.Localhost())
32        .deploy(&mut deployment);
33
34    deployment.deploy().await.unwrap();
35
36    let external_out = nodes.connect_source_bincode(out_port).await;
37    deployment.start().await.unwrap();
38
39    check(external_out).await;
40}
41
42pub async fn stream_transform_test<
43    'a,
44    O: Serialize + DeserializeOwned + 'static,
45    C: Future<Output = ()>,
46    OutOrder,
47>(
48    thunk: impl FnOnce(&Process<'a>) -> Stream<O, Process<'a>, Unbounded, OutOrder>,
49    check: impl FnOnce(Pin<Box<dyn dfir_rs::futures::Stream<Item = O>>>) -> C,
50) {
51    let mut deployment = hydro_deploy::Deployment::new();
52    let flow = FlowBuilder::new();
53    let process = flow.process::<()>();
54    let external = flow.external_process::<()>();
55    let out = thunk(&process);
56    let out_port = out.send_bincode_external(&external);
57    let nodes = flow
58        .with_process(&process, deployment.Localhost())
59        .with_external(&external, deployment.Localhost())
60        .deploy(&mut deployment);
61
62    deployment.deploy().await.unwrap();
63
64    let external_out = nodes.connect_source_bincode(out_port).await;
65    deployment.start().await.unwrap();
66
67    check(external_out).await;
68}
69
70// from https://users.rust-lang.org/t/how-to-write-doctest-that-panic-with-an-expected-message/58650
71pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
72    let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
73
74    let chk = |panic_msg: &'_ str| {
75        if !panic_msg.contains(msg) {
76            panic!(
77                "Expected a panic message containing `{}`; got: `{}`.",
78                msg, panic_msg
79            );
80        }
81    };
82
83    err.downcast::<String>()
84        .map(|s| chk(&s))
85        .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
86        .expect("Unexpected panic type!");
87}