hydro_std/
request_response.rs1use std::hash::Hash;
2
3use hydro_lang::*;
4use location::NoTick;
5
6type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), Atomic<L>, Unbounded, NoOrder>;
7
8pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
15 tick: &Tick<L>,
16 responses: Stream<(K, V), Atomic<L>, Unbounded, NoOrder>,
17 metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
18) -> JoinResponses<K, M, V, L> {
19 let (remaining_to_join_complete_cycle, remaining_to_join) =
20 tick.cycle::<Stream<_, _, _, NoOrder>>();
21
22 let remaining_and_new: Stream<(K, M), Tick<L>, Bounded, _> = remaining_to_join.chain(metadata);
23
24 let responses = responses.batch(nondet!(
25 ));
28
29 let joined_this_tick =
32 remaining_and_new
33 .clone()
34 .join(responses.clone())
35 .map(q!(|(key, (meta, resp))| (key, (meta, resp))));
36
37 remaining_to_join_complete_cycle
38 .complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key))));
39
40 joined_this_tick.all_ticks_atomic()
41}