hydro_lang/
test_util.rs
1use std::future::Future;
2use std::panic::{AssertUnwindSafe, catch_unwind};
3use std::pin::Pin;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::{FlowBuilder, Process, Stream, Unbounded};
9
10pub async fn multi_location_test<'a, T, C, O>(
11 thunk: impl FnOnce(&FlowBuilder<'a>, &Process<'a, ()>) -> Stream<T, Process<'a>, Unbounded, O>,
12 check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
13) where
14 T: Serialize + DeserializeOwned + 'static,
15 C: Future<Output = ()>,
16{
17 let mut deployment = hydro_deploy::Deployment::new();
18 let flow = FlowBuilder::new();
19 let process = flow.process::<()>();
20 let external = flow.external_process::<()>();
21 let out = thunk(&flow, &process);
22 let out_port = out.send_bincode_external(&external);
23 let nodes = flow
24 .with_remaining_processes(|| deployment.Localhost())
25 .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
26 .with_external(&external, deployment.Localhost())
27 .deploy(&mut deployment);
28
29 deployment.deploy().await.unwrap();
30
31 let external_out = nodes.connect_source_bincode(out_port).await;
32 deployment.start().await.unwrap();
33
34 check(external_out).await;
35}
36
37pub async fn stream_transform_test<'a, T, C, O>(
38 thunk: impl FnOnce(&Process<'a>) -> Stream<T, Process<'a>, Unbounded, O>,
39 check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
40) where
41 T: Serialize + DeserializeOwned + 'static,
42 C: Future<Output = ()>,
43{
44 let mut deployment = hydro_deploy::Deployment::new();
45 let flow = FlowBuilder::new();
46 let process = flow.process::<()>();
47 let external = flow.external_process::<()>();
48 let out = thunk(&process);
49 let out_port = out.send_bincode_external(&external);
50 let nodes = flow
51 .with_process(&process, deployment.Localhost())
52 .with_external(&external, deployment.Localhost())
53 .deploy(&mut deployment);
54
55 deployment.deploy().await.unwrap();
56
57 let external_out = nodes.connect_source_bincode(out_port).await;
58 deployment.start().await.unwrap();
59
60 check(external_out).await;
61}
62
63pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
65 let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
66
67 let chk = |panic_msg: &'_ str| {
68 if !panic_msg.contains(msg) {
69 panic!(
70 "Expected a panic message containing `{}`; got: `{}`.",
71 msg, panic_msg
72 );
73 }
74 };
75
76 err.downcast::<String>()
77 .map(|s| chk(&s))
78 .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
79 .expect("Unexpected panic type!");
80}