hydro_test/local/
futures.rs1use std::time::Duration;
2
3use hydro_lang::*;
4use stageleft::q;
5
6pub fn unordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
7 process
8 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
9 .map(q!(|x| async move {
10 tokio::time::sleep(Duration::from_millis(10)).await;
11 x
12 }))
13 .resolve_futures()
14}
15
16pub fn ordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded> {
17 process
18 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
19 .map(q!(|x| async move {
20 tokio::time::sleep(Duration::from_millis(10)).await;
21 x
22 }))
23 .resolve_futures_ordered()
24}
25
26#[cfg(test)]
27mod tests {
28 use std::collections::HashSet;
29
30 use futures::StreamExt;
31 use hydro_deploy::Deployment;
32
33 #[tokio::test]
34 async fn test_unordered() {
35 let mut deployment = Deployment::new();
36
37 let builder = hydro_lang::FlowBuilder::new();
38 let external = builder.external::<()>();
39 let p1 = builder.process();
40
41 let out = super::unordered(&p1);
42 let out_recv = out.send_bincode_external(&external);
43
44 let built = builder.with_default_optimize();
45 let nodes = built
46 .with_process(&p1, deployment.Localhost())
47 .with_external(&external, deployment.Localhost())
48 .deploy(&mut deployment);
49
50 deployment.deploy().await.unwrap();
51
52 let out_recv = nodes.connect_source_bincode(out_recv).await;
53
54 deployment.start().await.unwrap();
55
56 let result: HashSet<u32> = out_recv
57 .take(9)
58 .collect::<Vec<_>>()
59 .await
60 .into_iter()
61 .collect();
62 assert_eq!(HashSet::from_iter(1..10), result);
63 }
64
65 #[tokio::test]
66 async fn test_ordered() {
67 let mut deployment = Deployment::new();
68
69 let builder = hydro_lang::FlowBuilder::new();
70 let external = builder.external::<()>();
71 let p1 = builder.process();
72
73 let out = super::ordered(&p1);
74 let out_recv = out.send_bincode_external(&external);
75
76 let built = builder.with_default_optimize();
77 let nodes = built
78 .with_process(&p1, deployment.Localhost())
79 .with_external(&external, deployment.Localhost())
80 .deploy(&mut deployment);
81
82 deployment.deploy().await.unwrap();
83
84 let out_recv = nodes.connect_source_bincode(out_recv).await;
85
86 deployment.start().await.unwrap();
87
88 assert_eq!(
89 out_recv.take(9).collect::<Vec<_>>().await,
90 &[2, 3, 1, 9, 6, 5, 4, 7, 8]
91 );
92 }
93}