hydro_test/local/
chat_app.rs1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::nondet::NonDet;
3use hydro_lang::prelude::*;
4
5pub fn chat_app<'a>(
6 users_stream: Stream<u32, Process<'a>, Unbounded>,
7 messages: Stream<String, Process<'a>, Unbounded>,
8 replay_messages: bool,
9 nondet_user_arrival_broadcast: NonDet,
11) -> Stream<(u32, String), Process<'a>, Unbounded, NoOrder> {
12 let messages = messages.map(q!(|s| s.to_uppercase()));
13 if replay_messages {
14 users_stream.cross_product(messages)
15 } else {
16 let current_users = users_stream.collect_vec();
17
18 sliced! {
19 let users = use(current_users, nondet_user_arrival_broadcast);
20 let messages = use(messages, nondet_user_arrival_broadcast);
21
22 users.flatten_ordered().cross_product(messages)
23 }
24 }
25}
26
27#[cfg(test)]
28mod tests {
29 use futures::{SinkExt, Stream, StreamExt};
30 use hydro_deploy::Deployment;
31 use hydro_lang::location::Location;
32 use hydro_lang::nondet::nondet;
33
34 async fn take_next_n<T>(stream: &mut (impl Stream<Item = T> + Unpin), n: usize) -> Vec<T> {
35 let mut out = Vec::with_capacity(n);
36 for _ in 0..n {
37 if let Some(item) = stream.next().await {
38 out.push(item);
39 } else {
40 panic!();
41 }
42 }
43 out
44 }
45
46 #[tokio::test]
47 async fn test_chat_app_no_replay() {
48 let mut deployment = Deployment::new();
49
50 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
51 let external = builder.external::<()>();
52 let p1 = builder.process();
53
54 let (users_send, users) = p1.source_external_bincode(&external);
55 let (messages_send, messages) = p1.source_external_bincode(&external);
56 let out = super::chat_app(users, messages, false, nondet!());
57 let out_recv = out.send_bincode_external(&external);
58
59 let mut built = builder.with_default_optimize();
60
61 hydro_build_utils::assert_snapshot!(
62 built
63 .preview_compile()
64 .dfir_for(&p1)
65 .to_mermaid(&Default::default())
66 );
67
68 let nodes = built
69 .with_process(&p1, deployment.Localhost())
70 .with_external(&external, deployment.Localhost())
71 .deploy(&mut deployment);
72
73 deployment.deploy().await.unwrap();
74
75 let mut users_send = nodes.connect(users_send).await;
76 let mut messages_send = nodes.connect(messages_send).await;
77 let mut out_recv = nodes.connect(out_recv).await;
78
79 deployment.start().await.unwrap();
80
81 users_send.send(1).await.unwrap();
82 users_send.send(2).await.unwrap();
83
84 messages_send.send("hello".to_owned()).await.unwrap();
85 messages_send.send("world".to_owned()).await.unwrap();
86
87 {
88 let out = take_next_n(&mut out_recv, 4).await;
89 let h1 = out.iter().position(|x| *x == (1, "HELLO".to_owned()));
90 let h2 = out.iter().position(|x| *x == (2, "HELLO".to_owned()));
91 let w1 = out.iter().position(|x| *x == (1, "WORLD".to_owned()));
92 let w2 = out.iter().position(|x| *x == (2, "WORLD".to_owned()));
93 assert!(h1.is_some());
95 assert!(h2.is_some());
96 assert!(w1.is_some());
97 assert!(w2.is_some());
98 assert!(h1.unwrap() < h2.unwrap());
100 assert!(h1.unwrap() < w1.unwrap());
101 assert!(h2.unwrap() < w2.unwrap());
102 assert!(w1.unwrap() < w2.unwrap());
103 }
104
105 users_send.send(3).await.unwrap();
106
107 messages_send.send("goodbye".to_owned()).await.unwrap();
108
109 assert_eq!(
110 take_next_n(&mut out_recv, 3).await,
111 &[
112 (1, "GOODBYE".to_owned()),
113 (2, "GOODBYE".to_owned()),
114 (3, "GOODBYE".to_owned())
115 ]
116 );
117 }
118
119 #[tokio::test]
120 async fn test_chat_app_replay() {
121 let mut deployment = Deployment::new();
122
123 let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
124 let external = builder.external::<()>();
125 let p1 = builder.process();
126
127 let (users_send, users) = p1.source_external_bincode(&external);
128 let (messages_send, messages) = p1.source_external_bincode(&external);
129 let out = super::chat_app(users, messages, true, nondet!());
130 let out_recv = out.send_bincode_external(&external);
131
132 let mut built = builder.with_default_optimize();
133
134 hydro_build_utils::assert_snapshot!(
135 built
136 .preview_compile()
137 .dfir_for(&p1)
138 .to_mermaid(&Default::default())
139 );
140
141 let nodes = built
142 .with_process(&p1, deployment.Localhost())
143 .with_external(&external, deployment.Localhost())
144 .deploy(&mut deployment);
145
146 deployment.deploy().await.unwrap();
147
148 let mut users_send = nodes.connect(users_send).await;
149 let mut messages_send = nodes.connect(messages_send).await;
150 let mut out_recv = nodes.connect(out_recv).await;
151
152 deployment.start().await.unwrap();
153
154 users_send.send(1).await.unwrap();
155 users_send.send(2).await.unwrap();
156
157 messages_send.send("hello".to_owned()).await.unwrap();
158 messages_send.send("world".to_owned()).await.unwrap();
159
160 {
161 let out = take_next_n(&mut out_recv, 4).await;
162 let h1 = out.iter().position(|x| *x == (1, "HELLO".to_owned()));
163 let h2 = out.iter().position(|x| *x == (2, "HELLO".to_owned()));
164 let w1 = out.iter().position(|x| *x == (1, "WORLD".to_owned()));
165 let w2 = out.iter().position(|x| *x == (2, "WORLD".to_owned()));
166 assert!(h1.is_some());
168 assert!(h2.is_some());
169 assert!(w1.is_some());
170 assert!(w2.is_some());
171 assert!(h1.unwrap() < h2.unwrap());
173 assert!(h1.unwrap() < w1.unwrap());
174 assert!(h2.unwrap() < w2.unwrap());
175 assert!(w1.unwrap() < w2.unwrap());
176 }
177
178 users_send.send(3).await.unwrap();
179
180 messages_send.send("goodbye".to_owned()).await.unwrap();
181
182 {
183 let out = take_next_n(&mut out_recv, 5).await;
184 let h3 = out.iter().position(|x| *x == (3, "HELLO".to_owned()));
185 let w3 = out.iter().position(|x| *x == (3, "WORLD".to_owned()));
186 let g1 = out.iter().position(|x| *x == (1, "GOODBYE".to_owned()));
187 let g2 = out.iter().position(|x| *x == (2, "GOODBYE".to_owned()));
188 let g3 = out.iter().position(|x| *x == (3, "GOODBYE".to_owned()));
189 assert!(h3.is_some());
191 assert!(w3.is_some());
192 assert!(g1.is_some());
193 assert!(g2.is_some());
194 assert!(g3.is_some());
195 assert!(g1.unwrap() < g2.unwrap());
197 assert!(g2.unwrap() < g3.unwrap());
198 assert!(h3.unwrap() < w3.unwrap());
199 assert!(w3.unwrap() < g3.unwrap());
200 }
201 }
202}