Skip to main content

hydro_test/local/
futures.rs

1use std::time::Duration;
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::prelude::*;
5use stageleft::q;
6
7pub fn unordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded, NoOrder> {
8    process
9        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
10        .map(q!(|x| async move {
11            tokio::time::sleep(Duration::from_millis(10)).await;
12            x
13        }))
14        .resolve_futures()
15}
16
17pub fn ordered<'a>(process: &Process<'a>) -> Stream<u32, Process<'a>, Unbounded> {
18    process
19        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
20        .map(q!(|x| async move {
21            tokio::time::sleep(Duration::from_millis(10)).await;
22            x
23        }))
24        .resolve_futures_ordered()
25}
26
27#[cfg(test)]
28mod tests {
29    use std::collections::HashSet;
30
31    use futures::StreamExt;
32    use hydro_deploy::Deployment;
33
34    #[tokio::test]
35    async fn test_unordered() {
36        let mut deployment = Deployment::new();
37
38        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
39        let external = builder.external::<()>();
40        let p1 = builder.process();
41
42        let out = super::unordered(&p1);
43        let out_recv = out.send_bincode_external(&external);
44
45        let built = builder.with_default_optimize();
46        let nodes = built
47            .with_process(&p1, deployment.Localhost())
48            .with_external(&external, deployment.Localhost())
49            .deploy(&mut deployment);
50
51        deployment.deploy().await.unwrap();
52
53        let out_recv = nodes.connect(out_recv).await;
54
55        deployment.start().await.unwrap();
56
57        let result: HashSet<u32> = out_recv
58            .take(9)
59            .collect::<Vec<_>>()
60            .await
61            .into_iter()
62            .collect();
63        assert_eq!(HashSet::from_iter(1..10), result);
64    }
65
66    #[tokio::test]
67    async fn test_ordered() {
68        let mut deployment = Deployment::new();
69
70        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
71        let external = builder.external::<()>();
72        let p1 = builder.process();
73
74        let out = super::ordered(&p1);
75        let out_recv = out.send_bincode_external(&external);
76
77        let built = builder.with_default_optimize();
78        let nodes = built
79            .with_process(&p1, deployment.Localhost())
80            .with_external(&external, deployment.Localhost())
81            .deploy(&mut deployment);
82
83        deployment.deploy().await.unwrap();
84
85        let out_recv = nodes.connect(out_recv).await;
86
87        deployment.start().await.unwrap();
88
89        assert_eq!(
90            out_recv.take(9).collect::<Vec<_>>().await,
91            &[2, 3, 1, 9, 6, 5, 4, 7, 8]
92        );
93    }
94}