1use std::future::Future;
4use std::panic::{AssertUnwindSafe, catch_unwind};
5use std::pin::Pin;
6
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::compile::builder::FlowBuilder;
11use crate::live_collections::boundedness::Unbounded;
12use crate::live_collections::stream::{Ordering, Retries, Stream};
13use crate::location::Process;
14
15pub async fn multi_location_test<'a, T, C, O: Ordering, R: Retries>(
20 thunk: impl FnOnce(&FlowBuilder<'a>, &Process<'a, ()>) -> Stream<T, Process<'a>, Unbounded, O, R>,
21 check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
22) where
23 T: Serialize + DeserializeOwned + 'static,
24 C: Future<Output = ()>,
25{
26 let mut deployment = hydro_deploy::Deployment::new();
27 let flow = FlowBuilder::new();
28 let process = flow.process::<()>();
29 let external = flow.external::<()>();
30 let out = thunk(&flow, &process);
31 let out_port = out.send_bincode_external(&external);
32 let nodes = flow
33 .with_remaining_processes(|| deployment.Localhost())
34 .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
35 .with_external(&external, deployment.Localhost())
36 .deploy(&mut deployment);
37
38 deployment.deploy().await.unwrap();
39
40 let external_out = nodes.connect(out_port).await;
41 deployment.start().await.unwrap();
42
43 check(external_out).await;
44}
45
46pub async fn stream_transform_test<'a, T, C, O: Ordering, R: Retries>(
49 thunk: impl FnOnce(&Process<'a>) -> Stream<T, Process<'a>, Unbounded, O, R>,
50 check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
51) where
52 T: Serialize + DeserializeOwned + 'static,
53 C: Future<Output = ()>,
54{
55 let mut deployment = hydro_deploy::Deployment::new();
56 let flow = FlowBuilder::new();
57 let process = flow.process::<()>();
58 let external = flow.external::<()>();
59 let out = thunk(&process);
60 let out_port = out.send_bincode_external(&external);
61 let nodes = flow
62 .with_process(&process, deployment.Localhost())
63 .with_external(&external, deployment.Localhost())
64 .deploy(&mut deployment);
65
66 deployment.deploy().await.unwrap();
67
68 let external_out = nodes.connect(out_port).await;
69 deployment.start().await.unwrap();
70
71 check(external_out).await;
72}
73
74pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
77 let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
78
79 let chk = |panic_msg: &'_ str| {
80 if !panic_msg.contains(msg) {
81 panic!(
82 "Expected a panic message containing `{}`; got: `{}`.",
83 msg, panic_msg
84 );
85 }
86 };
87
88 err.downcast::<String>()
89 .map(|s| chk(&s))
90 .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
91 .expect("Unexpected panic type!");
92}