hydro_lang/
test_util.rs
1use std::future::Future;
2use std::panic::{AssertUnwindSafe, catch_unwind};
3use std::pin::Pin;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::{FlowBuilder, Process, Stream, Unbounded};
9
10pub async fn multi_location_test<
11 'a,
12 O: Serialize + DeserializeOwned + 'static,
13 C: Future<Output = ()>,
14 OutOrder,
15>(
16 thunk: impl FnOnce(
17 &FlowBuilder<'a>,
18 &Process<'a, ()>,
19 ) -> Stream<O, Process<'a>, Unbounded, OutOrder>,
20 check: impl FnOnce(Pin<Box<dyn dfir_rs::futures::Stream<Item = O>>>) -> C,
21) {
22 let mut deployment = hydro_deploy::Deployment::new();
23 let flow = FlowBuilder::new();
24 let process = flow.process::<()>();
25 let external = flow.external_process::<()>();
26 let out = thunk(&flow, &process);
27 let out_port = out.send_bincode_external(&external);
28 let nodes = flow
29 .with_remaining_processes(|| deployment.Localhost())
30 .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
31 .with_external(&external, deployment.Localhost())
32 .deploy(&mut deployment);
33
34 deployment.deploy().await.unwrap();
35
36 let external_out = nodes.connect_source_bincode(out_port).await;
37 deployment.start().await.unwrap();
38
39 check(external_out).await;
40}
41
42pub async fn stream_transform_test<
43 'a,
44 O: Serialize + DeserializeOwned + 'static,
45 C: Future<Output = ()>,
46 OutOrder,
47>(
48 thunk: impl FnOnce(&Process<'a>) -> Stream<O, Process<'a>, Unbounded, OutOrder>,
49 check: impl FnOnce(Pin<Box<dyn dfir_rs::futures::Stream<Item = O>>>) -> C,
50) {
51 let mut deployment = hydro_deploy::Deployment::new();
52 let flow = FlowBuilder::new();
53 let process = flow.process::<()>();
54 let external = flow.external_process::<()>();
55 let out = thunk(&process);
56 let out_port = out.send_bincode_external(&external);
57 let nodes = flow
58 .with_process(&process, deployment.Localhost())
59 .with_external(&external, deployment.Localhost())
60 .deploy(&mut deployment);
61
62 deployment.deploy().await.unwrap();
63
64 let external_out = nodes.connect_source_bincode(out_port).await;
65 deployment.start().await.unwrap();
66
67 check(external_out).await;
68}
69
70pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
72 let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
73
74 let chk = |panic_msg: &'_ str| {
75 if !panic_msg.contains(msg) {
76 panic!(
77 "Expected a panic message containing `{}`; got: `{}`.",
78 msg, panic_msg
79 );
80 }
81 };
82
83 err.downcast::<String>()
84 .map(|s| chk(&s))
85 .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
86 .expect("Unexpected panic type!");
87}