hydro_test_local_macro/local/
futures.rs
1use std::time::Duration;
2
3use dfir_rs::tokio;
4use dfir_rs::tokio::sync::mpsc::UnboundedSender;
5use hydro_lang::deploy::SingleProcessGraph;
6use hydro_lang::dfir_rs::scheduled::graph::Dfir;
7use hydro_lang::*;
8use stageleft::{Quoted, RuntimeData, q};
9
10#[stageleft::entry]
11pub fn unordered<'a>(
12 flow: FlowBuilder<'a>,
13 output: RuntimeData<&'a UnboundedSender<u32>>,
14) -> impl Quoted<'a, Dfir<'a>> {
15 let process = flow.process::<()>();
16
17 process
18 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
19 .map(q!(|x| async move {
20 tokio::time::sleep(Duration::from_millis(10)).await;
22 x
23 }))
24 .resolve_futures()
25 .for_each(q!(|x| output.send(x).unwrap()));
26
27 flow.compile_no_network::<SingleProcessGraph>()
28}
29
30#[stageleft::entry]
31pub fn ordered<'a>(
32 flow: FlowBuilder<'a>,
33 output: RuntimeData<&'a UnboundedSender<u32>>,
34) -> impl Quoted<'a, Dfir<'a>> {
35 let process = flow.process::<()>();
36
37 process
38 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
39 .map(q!(|x| async move {
40 tokio::time::sleep(Duration::from_millis(10)).await;
42 x
43 }))
44 .resolve_futures_ordered()
45 .for_each(q!(|x| output.send(x).unwrap()));
46
47 flow.compile_no_network::<SingleProcessGraph>()
48}
49
50#[cfg(stageleft_runtime)]
51#[cfg(test)]
52mod tests {
53 use std::collections::HashSet;
54 use std::time::Duration;
55
56 use dfir_rs::util::collect_ready_async;
57
58 #[tokio::test]
59 async fn test_unordered() {
60 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
61
62 let mut flow = super::unordered!(&out);
63 let handle = tokio::task::spawn(async move {
64 tokio::time::sleep(Duration::from_secs(1)).await;
65 assert_eq!(
66 HashSet::from_iter(1..10),
67 collect_ready_async::<HashSet<_>, _>(&mut out_recv).await
68 );
69 });
70
71 tokio::time::timeout(Duration::from_secs(2), flow.run_async())
72 .await
73 .expect_err("Expected time out");
74
75 handle.await.unwrap();
76 }
77
78 #[tokio::test]
79 async fn test_ordered() {
80 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
81
82 let mut flow = super::ordered!(&out);
83 let handle = tokio::task::spawn(async move {
84 tokio::time::sleep(Duration::from_secs(1)).await;
85 assert_eq!(
86 Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]),
87 collect_ready_async::<Vec<_>, _>(&mut out_recv).await
88 );
89 });
90
91 tokio::time::timeout(Duration::from_secs(2), flow.run_async())
92 .await
93 .expect_err("Expected time out");
94
95 handle.await.unwrap();
96 }
97}