hydro_test_local_macro/local/
futures.rs

1use std::time::Duration;
2
3use dfir_rs::tokio;
4use dfir_rs::tokio::sync::mpsc::UnboundedSender;
5use hydro_lang::deploy::SingleProcessGraph;
6use hydro_lang::dfir_rs::scheduled::graph::Dfir;
7use hydro_lang::*;
8use stageleft::{Quoted, RuntimeData, q};
9
10#[stageleft::entry]
11pub fn unordered<'a>(
12    flow: FlowBuilder<'a>,
13    output: RuntimeData<&'a UnboundedSender<u32>>,
14) -> impl Quoted<'a, Dfir<'a>> {
15    let process = flow.process::<()>();
16
17    process
18        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
19        .map(q!(|x| async move {
20            // tokio::time::sleep works, import then just sleep does not, unsure why
21            tokio::time::sleep(Duration::from_millis(10)).await;
22            x
23        }))
24        .resolve_futures()
25        .for_each(q!(|x| output.send(x).unwrap()));
26
27    flow.compile_no_network::<SingleProcessGraph>()
28}
29
30#[stageleft::entry]
31pub fn ordered<'a>(
32    flow: FlowBuilder<'a>,
33    output: RuntimeData<&'a UnboundedSender<u32>>,
34) -> impl Quoted<'a, Dfir<'a>> {
35    let process = flow.process::<()>();
36
37    process
38        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
39        .map(q!(|x| async move {
40            // tokio::time::sleep works, import then just sleep does not, unsure why
41            tokio::time::sleep(Duration::from_millis(10)).await;
42            x
43        }))
44        .resolve_futures_ordered()
45        .for_each(q!(|x| output.send(x).unwrap()));
46
47    flow.compile_no_network::<SingleProcessGraph>()
48}
49
50#[cfg(stageleft_runtime)]
51#[cfg(test)]
52mod tests {
53    use std::collections::HashSet;
54    use std::time::Duration;
55
56    use dfir_rs::util::collect_ready_async;
57
58    #[tokio::test]
59    async fn test_unordered() {
60        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
61
62        let mut flow = super::unordered!(&out);
63        let handle = tokio::task::spawn(async move {
64            tokio::time::sleep(Duration::from_secs(1)).await;
65            assert_eq!(
66                HashSet::from_iter(1..10),
67                collect_ready_async::<HashSet<_>, _>(&mut out_recv).await
68            );
69        });
70
71        tokio::time::timeout(Duration::from_secs(2), flow.run_async())
72            .await
73            .expect_err("Expected time out");
74
75        handle.await.unwrap();
76    }
77
78    #[tokio::test]
79    async fn test_ordered() {
80        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
81
82        let mut flow = super::ordered!(&out);
83        let handle = tokio::task::spawn(async move {
84            tokio::time::sleep(Duration::from_secs(1)).await;
85            assert_eq!(
86                Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]),
87                collect_ready_async::<Vec<_>, _>(&mut out_recv).await
88            );
89        });
90
91        tokio::time::timeout(Duration::from_secs(2), flow.run_async())
92            .await
93            .expect_err("Expected time out");
94
95        handle.await.unwrap();
96    }
97}