hydro_test/local/
chat_app.rs

1use hydro_lang::unsafety::NonDet;
2use hydro_lang::*;
3
4pub fn chat_app<'a>(
5    process: &Process<'a>,
6    users_stream: Stream<u32, Process<'a>, Unbounded>,
7    messages: Stream<String, Process<'a>, Unbounded>,
8    replay_messages: bool,
9    // intentionally non-deterministic to not send messages to users that joined after the message was sent
10    nondet_user_arrival_broadcast: NonDet,
11) -> Stream<(u32, String), Process<'a>, Unbounded, NoOrder> {
12    let tick = process.tick();
13
14    let users = users_stream
15        .batch(&tick, nondet_user_arrival_broadcast)
16        .persist();
17
18    let messages = if replay_messages {
19        messages
20            .batch(&tick, nondet_user_arrival_broadcast)
21            .persist()
22    } else {
23        messages.batch(&tick, nondet_user_arrival_broadcast)
24    };
25
26    // do this after the persist to test pullup
27    let messages = messages.map(q!(|s| s.to_uppercase()));
28
29    let mut joined = users.cross_product(messages);
30    if replay_messages {
31        joined = joined.delta();
32    }
33
34    joined.all_ticks()
35}
36
37#[cfg(test)]
38mod tests {
39    use futures::{SinkExt, Stream, StreamExt};
40    use hydro_deploy::Deployment;
41    use hydro_lang::{Location, nondet};
42
43    async fn take_next_n<T>(stream: &mut (impl Stream<Item = T> + Unpin), n: usize) -> Vec<T> {
44        let mut out = Vec::with_capacity(n);
45        for _ in 0..n {
46            if let Some(item) = stream.next().await {
47                out.push(item);
48            } else {
49                panic!();
50            }
51        }
52        out
53    }
54
55    #[tokio::test]
56    async fn test_chat_app_no_replay() {
57        let mut deployment = Deployment::new();
58
59        let builder = hydro_lang::FlowBuilder::new();
60        let external = builder.external::<()>();
61        let p1 = builder.process();
62
63        let (users_send, users) = p1.source_external_bincode(&external);
64        let (messages_send, messages) = p1.source_external_bincode(&external);
65        let out = super::chat_app(&p1, users, messages, false, nondet!(/** test */));
66        let out_recv = out.send_bincode_external(&external);
67
68        let built = builder.with_default_optimize();
69
70        hydro_build_utils::assert_snapshot!(
71            built
72                .preview_compile()
73                .dfir_for(&p1)
74                .to_mermaid(&Default::default())
75        );
76
77        let nodes = built
78            .with_process(&p1, deployment.Localhost())
79            .with_external(&external, deployment.Localhost())
80            .deploy(&mut deployment);
81
82        deployment.deploy().await.unwrap();
83
84        let mut users_send = nodes.connect_sink_bincode(users_send).await;
85        let mut messages_send = nodes.connect_sink_bincode(messages_send).await;
86        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
87
88        deployment.start().await.unwrap();
89
90        users_send.send(1).await.unwrap();
91        users_send.send(2).await.unwrap();
92
93        messages_send.send("hello".to_string()).await.unwrap();
94        messages_send.send("world".to_string()).await.unwrap();
95
96        assert_eq!(
97            take_next_n(&mut out_recv, 4).await,
98            &[
99                (1, "HELLO".to_string()),
100                (2, "HELLO".to_string()),
101                (1, "WORLD".to_string()),
102                (2, "WORLD".to_string())
103            ]
104        );
105
106        users_send.send(3).await.unwrap();
107
108        messages_send.send("goodbye".to_string()).await.unwrap();
109
110        assert_eq!(
111            take_next_n(&mut out_recv, 3).await,
112            &[
113                (1, "GOODBYE".to_string()),
114                (2, "GOODBYE".to_string()),
115                (3, "GOODBYE".to_string())
116            ]
117        );
118    }
119
120    #[tokio::test]
121    async fn test_chat_app_replay() {
122        let mut deployment = Deployment::new();
123
124        let builder = hydro_lang::FlowBuilder::new();
125        let external = builder.external::<()>();
126        let p1 = builder.process();
127
128        let (users_send, users) = p1.source_external_bincode(&external);
129        let (messages_send, messages) = p1.source_external_bincode(&external);
130        let out = super::chat_app(&p1, users, messages, true, nondet!(/** test */));
131        let out_recv = out.send_bincode_external(&external);
132
133        let built = builder.with_default_optimize();
134
135        hydro_build_utils::assert_snapshot!(
136            built
137                .preview_compile()
138                .dfir_for(&p1)
139                .to_mermaid(&Default::default())
140        );
141
142        let nodes = built
143            .with_process(&p1, deployment.Localhost())
144            .with_external(&external, deployment.Localhost())
145            .deploy(&mut deployment);
146
147        deployment.deploy().await.unwrap();
148
149        let mut users_send = nodes.connect_sink_bincode(users_send).await;
150        let mut messages_send = nodes.connect_sink_bincode(messages_send).await;
151        let mut out_recv = nodes.connect_source_bincode(out_recv).await;
152
153        deployment.start().await.unwrap();
154
155        users_send.send(1).await.unwrap();
156        users_send.send(2).await.unwrap();
157
158        messages_send.send("hello".to_string()).await.unwrap();
159        messages_send.send("world".to_string()).await.unwrap();
160
161        assert_eq!(
162            take_next_n(&mut out_recv, 4).await,
163            &[
164                (1, "HELLO".to_string()),
165                (2, "HELLO".to_string()),
166                (1, "WORLD".to_string()),
167                (2, "WORLD".to_string())
168            ]
169        );
170
171        users_send.send(3).await.unwrap();
172
173        messages_send.send("goodbye".to_string()).await.unwrap();
174
175        assert_eq!(
176            take_next_n(&mut out_recv, 5).await,
177            &[
178                (3, "HELLO".to_string()),
179                (3, "WORLD".to_string()),
180                (1, "GOODBYE".to_string()),
181                (2, "GOODBYE".to_string()),
182                (3, "GOODBYE".to_string())
183            ]
184        );
185    }
186}