hydro_test_local/local/
chat_app.rs
1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream;
3use hydro_lang::deploy::SingleProcessGraph;
4use hydro_lang::dfir_rs::scheduled::graph::Dfir;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData};
7
8#[stageleft::entry]
9pub fn chat_app<'a>(
10 flow: FlowBuilder<'a>,
11 users_stream: RuntimeData<UnboundedReceiverStream<u32>>,
12 messages: RuntimeData<UnboundedReceiverStream<String>>,
13 output: RuntimeData<&'a UnboundedSender<(u32, String)>>,
14 replay_messages: bool,
15) -> impl Quoted<'a, Dfir<'a>> {
16 let process = flow.process::<()>();
17 let tick = process.tick();
18
19 let users = unsafe {
20 process.source_stream(users_stream).tick_batch(&tick)
23 }
24 .persist();
25 let messages = process.source_stream(messages);
26 let messages = if replay_messages {
27 unsafe {
28 messages.tick_batch(&tick)
30 }
31 .persist()
32 } else {
33 unsafe {
34 messages.tick_batch(&tick)
36 }
37 };
38
39 let messages = messages.map(q!(|s| s.to_uppercase()));
41
42 let mut joined = users.cross_product(messages);
43 if replay_messages {
44 joined = joined.delta();
45 }
46
47 joined.all_ticks().for_each(q!(|t| {
48 output.send(t).unwrap();
49 }));
50
51 flow.compile_no_network::<SingleProcessGraph>()
52}
53
54#[cfg(stageleft_runtime)]
55#[cfg(test)]
56mod tests {
57 use dfir_rs::assert_graphvis_snapshots;
58 use dfir_rs::util::collect_ready;
59
60 #[test]
61 fn test_chat_app_no_replay() {
62 let (users_send, users) = dfir_rs::util::unbounded_channel();
63 let (messages_send, messages) = dfir_rs::util::unbounded_channel();
64 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
65
66 let mut chat_server = super::chat_app!(users, messages, &out, false);
67 assert_graphvis_snapshots!(chat_server);
68
69 users_send.send(1).unwrap();
70 users_send.send(2).unwrap();
71
72 messages_send.send("hello".to_string()).unwrap();
73 messages_send.send("world".to_string()).unwrap();
74
75 chat_server.run_tick();
76
77 assert_eq!(
78 &*collect_ready::<Vec<_>, _>(&mut out_recv),
79 &[
80 (1, "HELLO".to_string()),
81 (2, "HELLO".to_string()),
82 (1, "WORLD".to_string()),
83 (2, "WORLD".to_string())
84 ]
85 );
86
87 users_send.send(3).unwrap();
88
89 messages_send.send("goodbye".to_string()).unwrap();
90
91 chat_server.run_tick();
92
93 assert_eq!(
94 &*collect_ready::<Vec<_>, _>(&mut out_recv),
95 &[
96 (1, "GOODBYE".to_string()),
97 (2, "GOODBYE".to_string()),
98 (3, "GOODBYE".to_string())
99 ]
100 );
101 }
102
103 #[test]
104 fn test_chat_app_replay() {
105 let (users_send, users) = dfir_rs::util::unbounded_channel();
106 let (messages_send, messages) = dfir_rs::util::unbounded_channel();
107 let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
108
109 let mut chat_server = super::chat_app!(users, messages, &out, true);
110 assert_graphvis_snapshots!(chat_server);
111
112 users_send.send(1).unwrap();
113 users_send.send(2).unwrap();
114
115 messages_send.send("hello".to_string()).unwrap();
116 messages_send.send("world".to_string()).unwrap();
117
118 chat_server.run_tick();
119
120 assert_eq!(
121 &*collect_ready::<Vec<_>, _>(&mut out_recv),
122 &[
123 (1, "HELLO".to_string()),
124 (2, "HELLO".to_string()),
125 (1, "WORLD".to_string()),
126 (2, "WORLD".to_string())
127 ]
128 );
129
130 users_send.send(3).unwrap();
131
132 messages_send.send("goodbye".to_string()).unwrap();
133
134 chat_server.run_tick();
135
136 assert_eq!(
137 &*collect_ready::<Vec<_>, _>(&mut out_recv),
138 &[
139 (3, "HELLO".to_string()),
140 (3, "WORLD".to_string()),
141 (1, "GOODBYE".to_string()),
142 (2, "GOODBYE".to_string()),
143 (3, "GOODBYE".to_string())
144 ]
145 );
146 }
147}