hydro_test/local/
chat_app.rs1use hydro_lang::unsafety::NonDet;
2use hydro_lang::*;
3
4pub fn chat_app<'a>(
5 process: &Process<'a>,
6 users_stream: Stream<u32, Process<'a>, Unbounded>,
7 messages: Stream<String, Process<'a>, Unbounded>,
8 replay_messages: bool,
9 nondet_user_arrival_broadcast: NonDet,
11) -> Stream<(u32, String), Process<'a>, Unbounded, NoOrder> {
12 let tick = process.tick();
13
14 let users = users_stream
15 .batch(&tick, nondet_user_arrival_broadcast)
16 .persist();
17
18 let messages = if replay_messages {
19 messages
20 .batch(&tick, nondet_user_arrival_broadcast)
21 .persist()
22 } else {
23 messages.batch(&tick, nondet_user_arrival_broadcast)
24 };
25
26 let messages = messages.map(q!(|s| s.to_uppercase()));
28
29 let mut joined = users.cross_product(messages);
30 if replay_messages {
31 joined = joined.delta();
32 }
33
34 joined.all_ticks()
35}
36
37#[cfg(test)]
38mod tests {
39 use futures::{SinkExt, Stream, StreamExt};
40 use hydro_deploy::Deployment;
41 use hydro_lang::{Location, nondet};
42
43 async fn take_next_n<T>(stream: &mut (impl Stream<Item = T> + Unpin), n: usize) -> Vec<T> {
44 let mut out = Vec::with_capacity(n);
45 for _ in 0..n {
46 if let Some(item) = stream.next().await {
47 out.push(item);
48 } else {
49 panic!();
50 }
51 }
52 out
53 }
54
55 #[tokio::test]
56 async fn test_chat_app_no_replay() {
57 let mut deployment = Deployment::new();
58
59 let builder = hydro_lang::FlowBuilder::new();
60 let external = builder.external::<()>();
61 let p1 = builder.process();
62
63 let (users_send, users) = p1.source_external_bincode(&external);
64 let (messages_send, messages) = p1.source_external_bincode(&external);
65 let out = super::chat_app(&p1, users, messages, false, nondet!());
66 let out_recv = out.send_bincode_external(&external);
67
68 let built = builder.with_default_optimize();
69
70 hydro_build_utils::assert_snapshot!(
71 built
72 .preview_compile()
73 .dfir_for(&p1)
74 .to_mermaid(&Default::default())
75 );
76
77 let nodes = built
78 .with_process(&p1, deployment.Localhost())
79 .with_external(&external, deployment.Localhost())
80 .deploy(&mut deployment);
81
82 deployment.deploy().await.unwrap();
83
84 let mut users_send = nodes.connect_sink_bincode(users_send).await;
85 let mut messages_send = nodes.connect_sink_bincode(messages_send).await;
86 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
87
88 deployment.start().await.unwrap();
89
90 users_send.send(1).await.unwrap();
91 users_send.send(2).await.unwrap();
92
93 messages_send.send("hello".to_string()).await.unwrap();
94 messages_send.send("world".to_string()).await.unwrap();
95
96 assert_eq!(
97 take_next_n(&mut out_recv, 4).await,
98 &[
99 (1, "HELLO".to_string()),
100 (2, "HELLO".to_string()),
101 (1, "WORLD".to_string()),
102 (2, "WORLD".to_string())
103 ]
104 );
105
106 users_send.send(3).await.unwrap();
107
108 messages_send.send("goodbye".to_string()).await.unwrap();
109
110 assert_eq!(
111 take_next_n(&mut out_recv, 3).await,
112 &[
113 (1, "GOODBYE".to_string()),
114 (2, "GOODBYE".to_string()),
115 (3, "GOODBYE".to_string())
116 ]
117 );
118 }
119
120 #[tokio::test]
121 async fn test_chat_app_replay() {
122 let mut deployment = Deployment::new();
123
124 let builder = hydro_lang::FlowBuilder::new();
125 let external = builder.external::<()>();
126 let p1 = builder.process();
127
128 let (users_send, users) = p1.source_external_bincode(&external);
129 let (messages_send, messages) = p1.source_external_bincode(&external);
130 let out = super::chat_app(&p1, users, messages, true, nondet!());
131 let out_recv = out.send_bincode_external(&external);
132
133 let built = builder.with_default_optimize();
134
135 hydro_build_utils::assert_snapshot!(
136 built
137 .preview_compile()
138 .dfir_for(&p1)
139 .to_mermaid(&Default::default())
140 );
141
142 let nodes = built
143 .with_process(&p1, deployment.Localhost())
144 .with_external(&external, deployment.Localhost())
145 .deploy(&mut deployment);
146
147 deployment.deploy().await.unwrap();
148
149 let mut users_send = nodes.connect_sink_bincode(users_send).await;
150 let mut messages_send = nodes.connect_sink_bincode(messages_send).await;
151 let mut out_recv = nodes.connect_source_bincode(out_recv).await;
152
153 deployment.start().await.unwrap();
154
155 users_send.send(1).await.unwrap();
156 users_send.send(2).await.unwrap();
157
158 messages_send.send("hello".to_string()).await.unwrap();
159 messages_send.send("world".to_string()).await.unwrap();
160
161 assert_eq!(
162 take_next_n(&mut out_recv, 4).await,
163 &[
164 (1, "HELLO".to_string()),
165 (2, "HELLO".to_string()),
166 (1, "WORLD".to_string()),
167 (2, "WORLD".to_string())
168 ]
169 );
170
171 users_send.send(3).await.unwrap();
172
173 messages_send.send("goodbye".to_string()).await.unwrap();
174
175 assert_eq!(
176 take_next_n(&mut out_recv, 5).await,
177 &[
178 (3, "HELLO".to_string()),
179 (3, "WORLD".to_string()),
180 (1, "GOODBYE".to_string()),
181 (2, "GOODBYE".to_string()),
182 (3, "GOODBYE".to_string())
183 ]
184 );
185 }
186}