Skip to main content

hydro_test/local/
chat_app.rs

1use hydro_lang::live_collections::stream::NoOrder;
2use hydro_lang::nondet::NonDet;
3use hydro_lang::prelude::*;
4
5pub fn chat_app<'a>(
6    users_stream: Stream<u32, Process<'a>, Unbounded>,
7    messages: Stream<String, Process<'a>, Unbounded>,
8    replay_messages: bool,
9    // intentionally non-deterministic to not send messages to users that joined after the message was sent
10    nondet_user_arrival_broadcast: NonDet,
11) -> Stream<(u32, String), Process<'a>, Unbounded, NoOrder> {
12    let messages = messages.map(q!(|s| s.to_uppercase()));
13    if replay_messages {
14        users_stream.cross_product(messages)
15    } else {
16        let current_users = users_stream.collect_vec();
17
18        sliced! {
19            let users = use(current_users, nondet_user_arrival_broadcast);
20            let messages = use(messages, nondet_user_arrival_broadcast);
21
22            users.flatten_ordered().cross_product(messages)
23        }
24    }
25}
26
27#[cfg(test)]
28mod tests {
29    use futures::{SinkExt, Stream, StreamExt};
30    use hydro_deploy::Deployment;
31    use hydro_lang::location::Location;
32    use hydro_lang::nondet::nondet;
33
34    async fn take_next_n<T>(stream: &mut (impl Stream<Item = T> + Unpin), n: usize) -> Vec<T> {
35        let mut out = Vec::with_capacity(n);
36        for _ in 0..n {
37            if let Some(item) = stream.next().await {
38                out.push(item);
39            } else {
40                panic!();
41            }
42        }
43        out
44    }
45
46    #[tokio::test]
47    async fn test_chat_app_no_replay() {
48        let mut deployment = Deployment::new();
49
50        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
51        let external = builder.external::<()>();
52        let p1 = builder.process();
53
54        let (users_send, users) = p1.source_external_bincode(&external);
55        let (messages_send, messages) = p1.source_external_bincode(&external);
56        let out = super::chat_app(users, messages, false, nondet!(/** test */));
57        let out_recv = out.send_bincode_external(&external);
58
59        let mut built = builder.with_default_optimize();
60
61        hydro_build_utils::assert_snapshot!(
62            built
63                .preview_compile()
64                .dfir_for(&p1)
65                .to_mermaid(&Default::default())
66        );
67
68        let nodes = built
69            .with_process(&p1, deployment.Localhost())
70            .with_external(&external, deployment.Localhost())
71            .deploy(&mut deployment);
72
73        deployment.deploy().await.unwrap();
74
75        let mut users_send = nodes.connect(users_send).await;
76        let mut messages_send = nodes.connect(messages_send).await;
77        let mut out_recv = nodes.connect(out_recv).await;
78
79        deployment.start().await.unwrap();
80
81        users_send.send(1).await.unwrap();
82        users_send.send(2).await.unwrap();
83
84        messages_send.send("hello".to_owned()).await.unwrap();
85        messages_send.send("world".to_owned()).await.unwrap();
86
87        {
88            let out = take_next_n(&mut out_recv, 4).await;
89            let h1 = out.iter().position(|x| *x == (1, "HELLO".to_owned()));
90            let h2 = out.iter().position(|x| *x == (2, "HELLO".to_owned()));
91            let w1 = out.iter().position(|x| *x == (1, "WORLD".to_owned()));
92            let w2 = out.iter().position(|x| *x == (2, "WORLD".to_owned()));
93            // Assert all items exist.
94            assert!(h1.is_some());
95            assert!(h2.is_some());
96            assert!(w1.is_some());
97            assert!(w2.is_some());
98            // Assert partial order is preserved
99            assert!(h1.unwrap() < h2.unwrap());
100            assert!(h1.unwrap() < w1.unwrap());
101            assert!(h2.unwrap() < w2.unwrap());
102            assert!(w1.unwrap() < w2.unwrap());
103        }
104
105        users_send.send(3).await.unwrap();
106
107        messages_send.send("goodbye".to_owned()).await.unwrap();
108
109        assert_eq!(
110            take_next_n(&mut out_recv, 3).await,
111            &[
112                (1, "GOODBYE".to_owned()),
113                (2, "GOODBYE".to_owned()),
114                (3, "GOODBYE".to_owned())
115            ]
116        );
117    }
118
119    #[tokio::test]
120    async fn test_chat_app_replay() {
121        let mut deployment = Deployment::new();
122
123        let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
124        let external = builder.external::<()>();
125        let p1 = builder.process();
126
127        let (users_send, users) = p1.source_external_bincode(&external);
128        let (messages_send, messages) = p1.source_external_bincode(&external);
129        let out = super::chat_app(users, messages, true, nondet!(/** test */));
130        let out_recv = out.send_bincode_external(&external);
131
132        let mut built = builder.with_default_optimize();
133
134        hydro_build_utils::assert_snapshot!(
135            built
136                .preview_compile()
137                .dfir_for(&p1)
138                .to_mermaid(&Default::default())
139        );
140
141        let nodes = built
142            .with_process(&p1, deployment.Localhost())
143            .with_external(&external, deployment.Localhost())
144            .deploy(&mut deployment);
145
146        deployment.deploy().await.unwrap();
147
148        let mut users_send = nodes.connect(users_send).await;
149        let mut messages_send = nodes.connect(messages_send).await;
150        let mut out_recv = nodes.connect(out_recv).await;
151
152        deployment.start().await.unwrap();
153
154        users_send.send(1).await.unwrap();
155        users_send.send(2).await.unwrap();
156
157        messages_send.send("hello".to_owned()).await.unwrap();
158        messages_send.send("world".to_owned()).await.unwrap();
159
160        {
161            let out = take_next_n(&mut out_recv, 4).await;
162            let h1 = out.iter().position(|x| *x == (1, "HELLO".to_owned()));
163            let h2 = out.iter().position(|x| *x == (2, "HELLO".to_owned()));
164            let w1 = out.iter().position(|x| *x == (1, "WORLD".to_owned()));
165            let w2 = out.iter().position(|x| *x == (2, "WORLD".to_owned()));
166            // Assert all items exist.
167            assert!(h1.is_some());
168            assert!(h2.is_some());
169            assert!(w1.is_some());
170            assert!(w2.is_some());
171            // Assert partial order is preserved
172            assert!(h1.unwrap() < h2.unwrap());
173            assert!(h1.unwrap() < w1.unwrap());
174            assert!(h2.unwrap() < w2.unwrap());
175            assert!(w1.unwrap() < w2.unwrap());
176        }
177
178        users_send.send(3).await.unwrap();
179
180        messages_send.send("goodbye".to_owned()).await.unwrap();
181
182        {
183            let out = take_next_n(&mut out_recv, 5).await;
184            let h3 = out.iter().position(|x| *x == (3, "HELLO".to_owned()));
185            let w3 = out.iter().position(|x| *x == (3, "WORLD".to_owned()));
186            let g1 = out.iter().position(|x| *x == (1, "GOODBYE".to_owned()));
187            let g2 = out.iter().position(|x| *x == (2, "GOODBYE".to_owned()));
188            let g3 = out.iter().position(|x| *x == (3, "GOODBYE".to_owned()));
189            // Assert all items exist.
190            assert!(h3.is_some());
191            assert!(w3.is_some());
192            assert!(g1.is_some());
193            assert!(g2.is_some());
194            assert!(g3.is_some());
195            // Assert partial order is preserved
196            assert!(g1.unwrap() < g2.unwrap());
197            assert!(g2.unwrap() < g3.unwrap());
198            assert!(h3.unwrap() < w3.unwrap());
199            assert!(w3.unwrap() < g3.unwrap());
200        }
201    }
202}