hydro_test_local/local/
futures.rs
1use std::time::Duration;
2
3use dfir_rs::scheduled::graph::Dfir;
4use hydro_lang::deploy::SingleProcessGraph;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData, q};
7use tokio::sync::mpsc::UnboundedSender;
8
9#[stageleft::entry]
10pub fn unordered<'a>(
11 flow: FlowBuilder<'a>,
12 output: RuntimeData<&'a UnboundedSender<u32>>,
13) -> impl Quoted<'a, Dfir<'a>> {
14 let process = flow.process::<()>();
15
16 process
17 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
18 .map(q!(|x| async move {
19 tokio::time::sleep(Duration::from_millis(10)).await;
20 x
21 }))
22 .resolve_futures()
23 .for_each(q!(|x| output.send(x).unwrap()));
24
25 flow.compile_no_network::<SingleProcessGraph>()
26}
27
28#[stageleft::entry]
29pub fn ordered<'a>(
30 flow: FlowBuilder<'a>,
31 output: RuntimeData<&'a UnboundedSender<u32>>,
32) -> impl Quoted<'a, Dfir<'a>> {
33 let process = flow.process::<()>();
34
35 process
36 .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
37 .map(q!(|x| async move {
38 tokio::time::sleep(Duration::from_millis(10)).await;
40 x
41 }))
42 .resolve_futures_ordered()
43 .for_each(q!(|x| output.send(x).unwrap()));
44
45 flow.compile_no_network::<SingleProcessGraph>()
46}
47
48#[cfg(stageleft_runtime)]
49#[cfg(test)]
50mod tests {
51 use std::collections::HashSet;
52 use std::time::Duration;
53
54 use dfir_rs::util::collect_ready_async;
55
56 #[tokio::test]
57 async fn test_unordered() {
58 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
59
60 let mut flow = super::unordered!(&out);
61 let handle = tokio::task::spawn(async move {
62 tokio::time::sleep(Duration::from_secs(1)).await;
63 assert_eq!(
64 HashSet::from_iter(1..10),
65 collect_ready_async::<HashSet<_>, _>(&mut out_recv).await
66 );
67 });
68
69 tokio::time::timeout(Duration::from_secs(2), flow.run_async())
70 .await
71 .expect_err("Expected time out");
72
73 handle.await.unwrap();
74 }
75
76 #[tokio::test]
77 async fn test_ordered() {
78 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
79
80 let mut flow = super::ordered!(&out);
81 let handle = tokio::task::spawn(async move {
82 tokio::time::sleep(Duration::from_secs(1)).await;
83 assert_eq!(
84 Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]),
85 collect_ready_async::<Vec<_>, _>(&mut out_recv).await
86 );
87 });
88
89 tokio::time::timeout(Duration::from_secs(2), flow.run_async())
90 .await
91 .expect_err("Expected time out");
92
93 handle.await.unwrap();
94 }
95}