Skip to main content

hydro_std/
request_response.rs

1use std::hash::Hash;
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::location::{Location, NoTick};
5use hydro_lang::prelude::*;
6
7type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), L, Unbounded, NoOrder>;
8
9/// Given an incoming stream of request-response responses, joins with metadata generated
10/// at request time that is stored in-memory.
11///
12/// The metadata must be generated in the same or a previous tick than the response,
13/// typically at request time. Only one response element should be produced with a given
14/// key, same for the metadata stream.
15pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
16    responses: Stream<(K, V), L, Unbounded, NoOrder>,
17    metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
18) -> JoinResponses<K, M, V, L> {
19    sliced! {
20        let mut remaining_to_join = use::state_null::<Stream<(K, M), _, _, NoOrder>>();
21
22        let response_batch = use(responses, nondet!(
23            /// Because we persist the metadata, delays resulting from
24            /// batching boundaries do not affect the output contents.
25        ));
26        let metadata_batch = use::atomic(metadata.all_ticks_atomic(), nondet!(
27            /// Metadata is synchronized with the tick input.
28        ));
29
30        let remaining_and_new = remaining_to_join.chain(metadata_batch);
31
32        // TODO(shadaj): we should have a "split-join" operator
33        // that returns both join and anti-join without cloning
34        let joined_this_tick = remaining_and_new
35            .clone()
36            .join(response_batch.clone())
37            .map(q!(|(key, (meta, resp))| (key, (meta, resp))));
38
39        remaining_to_join = remaining_and_new.anti_join(response_batch.map(q!(|(key, _)| key)));
40
41        joined_this_tick
42    }
43}
44
45#[cfg(test)]
46mod tests {
47    use hydro_lang::prelude::*;
48
49    use super::*;
50
51    /// Test that join_responses correctly joins metadata with responses.
52    #[test]
53    fn test_join_responses_basic() {
54        let mut flow = FlowBuilder::new();
55        let process = flow.process::<()>();
56
57        // Set up inputs (TotalOrder for deterministic simulation)
58        let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
59        let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
60
61        // Create an atomic tick for metadata processing
62        let metadata_processing = metadata_input.atomic();
63        let metadata_ack = metadata_processing.clone().end_atomic();
64        let metadata = metadata_processing
65            .batch_atomic(&process.tick(), nondet!(/** test */))
66            .weaken_ordering();
67
68        // Join responses with metadata (weaken ordering for join_responses)
69        let joined = join_responses(responses.weaken_ordering(), metadata);
70
71        // Set up outputs
72        let metadata_ack_recv = metadata_ack.sim_output();
73        let joined_recv = joined.sim_output();
74
75        flow.sim().exhaustive(async || {
76            // Send metadata first
77            metadata_send.send((1, 42));
78            // Wait for metadata ack to ensure it's processed
79            metadata_ack_recv.assert_yields([(1, 42)]).await;
80
81            // Now send response
82            response_send.send((1, "hello".to_owned()));
83            // Should get joined result
84            joined_recv
85                .assert_yields_unordered([(1, (42, "hello".to_owned()))])
86                .await;
87        });
88    }
89
90    /// Test that metadata persists across ticks until matched with a response.
91    #[test]
92    fn test_join_responses_metadata_persists() {
93        let mut flow = FlowBuilder::new();
94        let process = flow.process::<()>();
95
96        let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
97        let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
98
99        let metadata_processing = metadata_input.atomic();
100        let metadata_ack = metadata_processing.clone().end_atomic();
101        let metadata = metadata_processing
102            .batch_atomic(&process.tick(), nondet!(/** test */))
103            .weaken_ordering();
104
105        let joined = join_responses(responses.weaken_ordering(), metadata);
106
107        let metadata_ack_recv = metadata_ack.sim_output();
108        let joined_recv = joined.sim_output();
109
110        flow.sim().exhaustive(async || {
111            // Send multiple metadata entries
112            metadata_send.send_many([(1, 10), (2, 20)]);
113            metadata_ack_recv.assert_yields([(1, 10), (2, 20)]).await;
114
115            // Send responses for both keys
116            response_send.send_many([(2, "two".to_owned()), (1, "one".to_owned())]);
117            joined_recv
118                .assert_yields_only_unordered([
119                    (1, (10, "one".to_owned())),
120                    (2, (20, "two".to_owned())),
121                ])
122                .await;
123        });
124    }
125
126    /// Test that responses without metadata are not emitted.
127    #[test]
128    fn test_join_responses_no_metadata() {
129        let mut flow = FlowBuilder::new();
130        let process = flow.process::<()>();
131
132        let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
133        let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
134
135        let metadata_processing = metadata_input.atomic();
136        let metadata_ack = metadata_processing.clone().end_atomic();
137        let metadata = metadata_processing
138            .batch_atomic(&process.tick(), nondet!(/** test */))
139            .weaken_ordering();
140
141        let joined = join_responses(responses.weaken_ordering(), metadata);
142
143        let metadata_ack_recv = metadata_ack.sim_output();
144        let joined_recv = joined.sim_output();
145
146        flow.sim().exhaustive(async || {
147            // Send metadata for key 1
148            metadata_send.send((1, 42));
149            metadata_ack_recv.assert_yields([(1, 42)]).await;
150
151            // Send responses for key 1 (has metadata) and key 2 (no metadata)
152            response_send.send_many([(1, "matched".to_owned()), (2, "unmatched".to_owned())]);
153
154            // Only key 1 should produce output
155            joined_recv
156                .assert_yields_only_unordered([(1, (42, "matched".to_owned()))])
157                .await;
158        });
159    }
160
161    /// Test that metadata is removed after being matched with a response.
162    #[test]
163    fn test_join_responses_metadata_removed_after_match() {
164        let mut flow = FlowBuilder::new();
165        let process = flow.process::<()>();
166
167        let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
168        let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
169
170        let metadata_processing = metadata_input.atomic();
171        let metadata_ack = metadata_processing.clone().end_atomic();
172        let metadata = metadata_processing
173            .batch_atomic(&process.tick(), nondet!(/** test */))
174            .weaken_ordering();
175
176        let joined = join_responses(responses.weaken_ordering(), metadata);
177
178        let metadata_ack_recv = metadata_ack.sim_output();
179        let joined_recv = joined.sim_output();
180
181        flow.sim().exhaustive(async || {
182            // Send metadata for key 1
183            metadata_send.send((1, 42));
184            metadata_ack_recv.assert_yields([(1, 42)]).await;
185
186            // First response for key 1 should match
187            response_send.send((1, "first".to_owned()));
188            joined_recv
189                .assert_yields_unordered([(1, (42, "first".to_owned()))])
190                .await;
191
192            // Second response for key 1 should be dropped (metadata already consumed)
193            response_send.send((1, "second".to_owned()));
194            joined_recv.assert_no_more().await;
195        });
196    }
197}