1use std::hash::Hash;
2
3use hydro_lang::live_collections::stream::NoOrder;
4use hydro_lang::location::{Location, NoTick};
5use hydro_lang::prelude::*;
6
7type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), L, Unbounded, NoOrder>;
8
9pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
16 responses: Stream<(K, V), L, Unbounded, NoOrder>,
17 metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
18) -> JoinResponses<K, M, V, L> {
19 sliced! {
20 let mut remaining_to_join = use::state_null::<Stream<(K, M), _, _, NoOrder>>();
21
22 let response_batch = use(responses, nondet!(
23 ));
26 let metadata_batch = use::atomic(metadata.all_ticks_atomic(), nondet!(
27 ));
29
30 let remaining_and_new = remaining_to_join.chain(metadata_batch);
31
32 let joined_this_tick = remaining_and_new
35 .clone()
36 .join(response_batch.clone())
37 .map(q!(|(key, (meta, resp))| (key, (meta, resp))));
38
39 remaining_to_join = remaining_and_new.anti_join(response_batch.map(q!(|(key, _)| key)));
40
41 joined_this_tick
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use hydro_lang::prelude::*;
48
49 use super::*;
50
51 #[test]
53 fn test_join_responses_basic() {
54 let mut flow = FlowBuilder::new();
55 let process = flow.process::<()>();
56
57 let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
59 let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
60
61 let metadata_processing = metadata_input.atomic();
63 let metadata_ack = metadata_processing.clone().end_atomic();
64 let metadata = metadata_processing
65 .batch_atomic(&process.tick(), nondet!())
66 .weaken_ordering();
67
68 let joined = join_responses(responses.weaken_ordering(), metadata);
70
71 let metadata_ack_recv = metadata_ack.sim_output();
73 let joined_recv = joined.sim_output();
74
75 flow.sim().exhaustive(async || {
76 metadata_send.send((1, 42));
78 metadata_ack_recv.assert_yields([(1, 42)]).await;
80
81 response_send.send((1, "hello".to_owned()));
83 joined_recv
85 .assert_yields_unordered([(1, (42, "hello".to_owned()))])
86 .await;
87 });
88 }
89
90 #[test]
92 fn test_join_responses_metadata_persists() {
93 let mut flow = FlowBuilder::new();
94 let process = flow.process::<()>();
95
96 let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
97 let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
98
99 let metadata_processing = metadata_input.atomic();
100 let metadata_ack = metadata_processing.clone().end_atomic();
101 let metadata = metadata_processing
102 .batch_atomic(&process.tick(), nondet!())
103 .weaken_ordering();
104
105 let joined = join_responses(responses.weaken_ordering(), metadata);
106
107 let metadata_ack_recv = metadata_ack.sim_output();
108 let joined_recv = joined.sim_output();
109
110 flow.sim().exhaustive(async || {
111 metadata_send.send_many([(1, 10), (2, 20)]);
113 metadata_ack_recv.assert_yields([(1, 10), (2, 20)]).await;
114
115 response_send.send_many([(2, "two".to_owned()), (1, "one".to_owned())]);
117 joined_recv
118 .assert_yields_only_unordered([
119 (1, (10, "one".to_owned())),
120 (2, (20, "two".to_owned())),
121 ])
122 .await;
123 });
124 }
125
126 #[test]
128 fn test_join_responses_no_metadata() {
129 let mut flow = FlowBuilder::new();
130 let process = flow.process::<()>();
131
132 let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
133 let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
134
135 let metadata_processing = metadata_input.atomic();
136 let metadata_ack = metadata_processing.clone().end_atomic();
137 let metadata = metadata_processing
138 .batch_atomic(&process.tick(), nondet!())
139 .weaken_ordering();
140
141 let joined = join_responses(responses.weaken_ordering(), metadata);
142
143 let metadata_ack_recv = metadata_ack.sim_output();
144 let joined_recv = joined.sim_output();
145
146 flow.sim().exhaustive(async || {
147 metadata_send.send((1, 42));
149 metadata_ack_recv.assert_yields([(1, 42)]).await;
150
151 response_send.send_many([(1, "matched".to_owned()), (2, "unmatched".to_owned())]);
153
154 joined_recv
156 .assert_yields_only_unordered([(1, (42, "matched".to_owned()))])
157 .await;
158 });
159 }
160
161 #[test]
163 fn test_join_responses_metadata_removed_after_match() {
164 let mut flow = FlowBuilder::new();
165 let process = flow.process::<()>();
166
167 let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
168 let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
169
170 let metadata_processing = metadata_input.atomic();
171 let metadata_ack = metadata_processing.clone().end_atomic();
172 let metadata = metadata_processing
173 .batch_atomic(&process.tick(), nondet!())
174 .weaken_ordering();
175
176 let joined = join_responses(responses.weaken_ordering(), metadata);
177
178 let metadata_ack_recv = metadata_ack.sim_output();
179 let joined_recv = joined.sim_output();
180
181 flow.sim().exhaustive(async || {
182 metadata_send.send((1, 42));
184 metadata_ack_recv.assert_yields([(1, 42)]).await;
185
186 response_send.send((1, "first".to_owned()));
188 joined_recv
189 .assert_yields_unordered([(1, (42, "first".to_owned()))])
190 .await;
191
192 response_send.send((1, "second".to_owned()));
194 joined_recv.assert_no_more().await;
195 });
196 }
197}