hydro_test/local/
futures.rs

1use std::time::Duration;
2
3use hydro_lang::*;
4use stageleft::q;
5
6pub fn unordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
7    process
8        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
9        .map(q!(|x| async move {
10            tokio::time::sleep(Duration::from_millis(10)).await;
11            x
12        }))
13        .resolve_futures()
14}
15
16pub fn ordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded> {
17    process
18        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
19        .map(q!(|x| async move {
20            tokio::time::sleep(Duration::from_millis(10)).await;
21            x
22        }))
23        .resolve_futures_ordered()
24}
25
26#[cfg(test)]
27mod tests {
28    use std::collections::HashSet;
29
30    use futures::StreamExt;
31    use hydro_deploy::Deployment;
32
33    #[tokio::test]
34    async fn test_unordered() {
35        let mut deployment = Deployment::new();
36
37        let builder = hydro_lang::FlowBuilder::new();
38        let external = builder.external::<()>();
39        let p1 = builder.process();
40
41        let out = super::unordered(&p1);
42        let out_recv = out.send_bincode_external(&external);
43
44        let built = builder.with_default_optimize();
45        let nodes = built
46            .with_process(&p1, deployment.Localhost())
47            .with_external(&external, deployment.Localhost())
48            .deploy(&mut deployment);
49
50        deployment.deploy().await.unwrap();
51
52        let out_recv = nodes.connect_source_bincode(out_recv).await;
53
54        deployment.start().await.unwrap();
55
56        let result: HashSet<u32> = out_recv
57            .take(9)
58            .collect::<Vec<_>>()
59            .await
60            .into_iter()
61            .collect();
62        assert_eq!(HashSet::from_iter(1..10), result);
63    }
64
65    #[tokio::test]
66    async fn test_ordered() {
67        let mut deployment = Deployment::new();
68
69        let builder = hydro_lang::FlowBuilder::new();
70        let external = builder.external::<()>();
71        let p1 = builder.process();
72
73        let out = super::ordered(&p1);
74        let out_recv = out.send_bincode_external(&external);
75
76        let built = builder.with_default_optimize();
77        let nodes = built
78            .with_process(&p1, deployment.Localhost())
79            .with_external(&external, deployment.Localhost())
80            .deploy(&mut deployment);
81
82        deployment.deploy().await.unwrap();
83
84        let out_recv = nodes.connect_source_bincode(out_recv).await;
85
86        deployment.start().await.unwrap();
87
88        assert_eq!(
89            out_recv.take(9).collect::<Vec<_>>().await,
90            &[2, 3, 1, 9, 6, 5, 4, 7, 8]
91        );
92    }
93}