hydro_test_local/local/
futures.rs

1use std::time::Duration;
2
3use dfir_rs::scheduled::graph::Dfir;
4use hydro_lang::deploy::SingleProcessGraph;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData, q};
7use tokio::sync::mpsc::UnboundedSender;
8
9#[stageleft::entry]
10pub fn unordered<'a>(
11    flow: FlowBuilder<'a>,
12    output: RuntimeData<&'a UnboundedSender<u32>>,
13) -> impl Quoted<'a, Dfir<'a>> {
14    let process = flow.process::<()>();
15
16    process
17        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
18        .map(q!(|x| async move {
19            tokio::time::sleep(Duration::from_millis(10)).await;
20            x
21        }))
22        .resolve_futures()
23        .for_each(q!(|x| output.send(x).unwrap()));
24
25    flow.compile_no_network::<SingleProcessGraph>()
26}
27
28#[stageleft::entry]
29pub fn ordered<'a>(
30    flow: FlowBuilder<'a>,
31    output: RuntimeData<&'a UnboundedSender<u32>>,
32) -> impl Quoted<'a, Dfir<'a>> {
33    let process = flow.process::<()>();
34
35    process
36        .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
37        .map(q!(|x| async move {
38            // tokio::time::sleep works, import then just sleep does not, unsure why
39            tokio::time::sleep(Duration::from_millis(10)).await;
40            x
41        }))
42        .resolve_futures_ordered()
43        .for_each(q!(|x| output.send(x).unwrap()));
44
45    flow.compile_no_network::<SingleProcessGraph>()
46}
47
48#[cfg(stageleft_runtime)]
49#[cfg(test)]
50mod tests {
51    use std::collections::HashSet;
52    use std::time::Duration;
53
54    use dfir_rs::util::collect_ready_async;
55
56    #[tokio::test]
57    async fn test_unordered() {
58        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
59
60        let mut flow = super::unordered!(&out);
61        let handle = tokio::task::spawn(async move {
62            tokio::time::sleep(Duration::from_secs(1)).await;
63            assert_eq!(
64                HashSet::from_iter(1..10),
65                collect_ready_async::<HashSet<_>, _>(&mut out_recv).await
66            );
67        });
68
69        tokio::time::timeout(Duration::from_secs(2), flow.run_async())
70            .await
71            .expect_err("Expected time out");
72
73        handle.await.unwrap();
74    }
75
76    #[tokio::test]
77    async fn test_ordered() {
78        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
79
80        let mut flow = super::ordered!(&out);
81        let handle = tokio::task::spawn(async move {
82            tokio::time::sleep(Duration::from_secs(1)).await;
83            assert_eq!(
84                Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]),
85                collect_ready_async::<Vec<_>, _>(&mut out_recv).await
86            );
87        });
88
89        tokio::time::timeout(Duration::from_secs(2), flow.run_async())
90            .await
91            .expect_err("Expected time out");
92
93        handle.await.unwrap();
94    }
95}