hydro_test_local/local/
chat_app.rs

1use dfir_rs::tokio::sync::mpsc::UnboundedSender;
2use dfir_rs::tokio_stream::wrappers::UnboundedReceiverStream;
3use hydro_lang::deploy::SingleProcessGraph;
4use hydro_lang::dfir_rs::scheduled::graph::Dfir;
5use hydro_lang::*;
6use stageleft::{Quoted, RuntimeData};
7
8#[stageleft::entry]
9pub fn chat_app<'a>(
10    flow: FlowBuilder<'a>,
11    users_stream: RuntimeData<UnboundedReceiverStream<u32>>,
12    messages: RuntimeData<UnboundedReceiverStream<String>>,
13    output: RuntimeData<&'a UnboundedSender<(u32, String)>>,
14    replay_messages: bool,
15) -> impl Quoted<'a, Dfir<'a>> {
16    let process = flow.process::<()>();
17    let tick = process.tick();
18
19    let users = unsafe {
20        // SAFETY: intentionally non-deterministic to not send messaged
21        // to users that joined after the message was sent
22        process.source_stream(users_stream).tick_batch(&tick)
23    }
24    .persist();
25    let messages = process.source_stream(messages);
26    let messages = if replay_messages {
27        unsafe {
28            // SAFETY: see above
29            messages.tick_batch(&tick)
30        }
31        .persist()
32    } else {
33        unsafe {
34            // SAFETY: see above
35            messages.tick_batch(&tick)
36        }
37    };
38
39    // do this after the persist to test pullup
40    let messages = messages.map(q!(|s| s.to_uppercase()));
41
42    let mut joined = users.cross_product(messages);
43    if replay_messages {
44        joined = joined.delta();
45    }
46
47    joined.all_ticks().for_each(q!(|t| {
48        output.send(t).unwrap();
49    }));
50
51    flow.compile_no_network::<SingleProcessGraph>()
52}
53
54#[cfg(stageleft_runtime)]
55#[cfg(test)]
56mod tests {
57    use dfir_rs::assert_graphvis_snapshots;
58    use dfir_rs::util::collect_ready;
59
60    #[test]
61    fn test_chat_app_no_replay() {
62        let (users_send, users) = dfir_rs::util::unbounded_channel();
63        let (messages_send, messages) = dfir_rs::util::unbounded_channel();
64        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
65
66        let mut chat_server = super::chat_app!(users, messages, &out, false);
67        assert_graphvis_snapshots!(chat_server);
68
69        users_send.send(1).unwrap();
70        users_send.send(2).unwrap();
71
72        messages_send.send("hello".to_string()).unwrap();
73        messages_send.send("world".to_string()).unwrap();
74
75        chat_server.run_tick();
76
77        assert_eq!(
78            &*collect_ready::<Vec<_>, _>(&mut out_recv),
79            &[
80                (1, "HELLO".to_string()),
81                (2, "HELLO".to_string()),
82                (1, "WORLD".to_string()),
83                (2, "WORLD".to_string())
84            ]
85        );
86
87        users_send.send(3).unwrap();
88
89        messages_send.send("goodbye".to_string()).unwrap();
90
91        chat_server.run_tick();
92
93        assert_eq!(
94            &*collect_ready::<Vec<_>, _>(&mut out_recv),
95            &[
96                (1, "GOODBYE".to_string()),
97                (2, "GOODBYE".to_string()),
98                (3, "GOODBYE".to_string())
99            ]
100        );
101    }
102
103    #[test]
104    fn test_chat_app_replay() {
105        let (users_send, users) = dfir_rs::util::unbounded_channel();
106        let (messages_send, messages) = dfir_rs::util::unbounded_channel();
107        let (out, mut out_recv) = dfir_rs::util::unbounded_channel();
108
109        let mut chat_server = super::chat_app!(users, messages, &out, true);
110        assert_graphvis_snapshots!(chat_server);
111
112        users_send.send(1).unwrap();
113        users_send.send(2).unwrap();
114
115        messages_send.send("hello".to_string()).unwrap();
116        messages_send.send("world".to_string()).unwrap();
117
118        chat_server.run_tick();
119
120        assert_eq!(
121            &*collect_ready::<Vec<_>, _>(&mut out_recv),
122            &[
123                (1, "HELLO".to_string()),
124                (2, "HELLO".to_string()),
125                (1, "WORLD".to_string()),
126                (2, "WORLD".to_string())
127            ]
128        );
129
130        users_send.send(3).unwrap();
131
132        messages_send.send("goodbye".to_string()).unwrap();
133
134        chat_server.run_tick();
135
136        assert_eq!(
137            &*collect_ready::<Vec<_>, _>(&mut out_recv),
138            &[
139                (3, "HELLO".to_string()),
140                (3, "WORLD".to_string()),
141                (1, "GOODBYE".to_string()),
142                (2, "GOODBYE".to_string()),
143                (3, "GOODBYE".to_string())
144            ]
145        );
146    }
147}