1use std::hash::Hash;
23use hydro_lang::*;
4use location::NoTick;
56type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), Atomic<L>, Unbounded, NoOrder>;
78/// Given an incoming stream of request-response responses, joins with metadata generated
9/// at request time that is stored in-memory.
10///
11/// The metadata must be generated in the same or a previous tick than the response,
12/// typically at request time. Only one response element should be produced with a given
13/// key, same for the metadata stream.
14pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
15 tick: &Tick<L>,
16 responses: Stream<(K, V), Atomic<L>, Unbounded, NoOrder>,
17 metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
18) -> JoinResponses<K, M, V, L> {
19let (remaining_to_join_complete_cycle, remaining_to_join) =
20 tick.cycle::<Stream<_, _, _, NoOrder>>();
2122let remaining_and_new: Stream<(K, M), Tick<L>, Bounded, _> = remaining_to_join.chain(metadata);
2324let responses = unsafe {
25// SAFETY: because we persist the metadata, delays resulting from
26 // batching boundaries do not affect the output contents.
27responses.tick_batch()
28 };
2930// TODO(shadaj): we should have a "split-join" operator
31 // that returns both join and anti-join without cloning
32let joined_this_tick =
33 remaining_and_new
34 .clone()
35 .join(responses.clone())
36 .map(q!(|(key, (meta, resp))| (key, (meta, resp))));
3738 remaining_to_join_complete_cycle
39 .complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key))));
4041 joined_this_tick.all_ticks_atomic()
42}