hydro_test/local/
futures.rs1use std::time::Duration;
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::prelude::*;
5use stageleft::q;
6
7pub fn unordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
8 process
9 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
10 .map(q!(|x| async move {
11 tokio::time::sleep(Duration::from_millis(10)).await;
12 x
13 }))
14 .resolve_futures()
15}
16
17pub fn ordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded> {
18 process
19 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
20 .map(q!(|x| async move {
21 tokio::time::sleep(Duration::from_millis(10)).await;
22 x
23 }))
24 .resolve_futures_ordered()
25}
26
27#[cfg(test)]
28mod tests {
29 use std::collections::HashSet;
30
31 use futures::StreamExt;
32 use hydro_deploy::Deployment;
33
34 #[tokio::test]
35 async fn test_unordered() {
36 let mut deployment = Deployment::new();
37
38 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
39 let external = builder.external::<()>();
40 let p1 = builder.process();
41
42 let out = super::unordered(&p1);
43 let out_recv = out.send_bincode_external(&external);
44
45 let built = builder.with_default_optimize();
46 let nodes = built
47 .with_process(&p1, deployment.Localhost())
48 .with_external(&external, deployment.Localhost())
49 .deploy(&mut deployment);
50
51 deployment.deploy().await.unwrap();
52
53 let out_recv = nodes.connect(out_recv).await;
54
55 deployment.start().await.unwrap();
56
57 let result: HashSet<u32> = out_recv
58 .take(9)
59 .collect::<Vec<_>>()
60 .await
61 .into_iter()
62 .collect();
63 assert_eq!(HashSet::from_iter(1..10), result);
64 }
65
66 #[tokio::test]
67 async fn test_ordered() {
68 let mut deployment = Deployment::new();
69
70 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
71 let external = builder.external::<()>();
72 let p1 = builder.process();
73
74 let out = super::ordered(&p1);
75 let out_recv = out.send_bincode_external(&external);
76
77 let built = builder.with_default_optimize();
78 let nodes = built
79 .with_process(&p1, deployment.Localhost())
80 .with_external(&external, deployment.Localhost())
81 .deploy(&mut deployment);
82
83 deployment.deploy().await.unwrap();
84
85 let out_recv = nodes.connect(out_recv).await;
86
87 deployment.start().await.unwrap();
88
89 assert_eq!(
90 out_recv.take(9).collect::<Vec<_>>().await,
91 &[2, 3, 1, 9, 6, 5, 4, 7, 8]
92 );
93 }
94}