use std::hash::Hash;
use hydro_lang::*;
use location::NoTick;
type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), Timestamped<L>, Unbounded, NoOrder>;
pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
tick: &Tick<L>,
responses: Stream<(K, V), Timestamped<L>, Unbounded, NoOrder>,
metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
) -> JoinResponses<K, M, V, L> {
let (remaining_to_join_complete_cycle, remaining_to_join) =
tick.cycle::<Stream<_, _, _, NoOrder>>();
let remaining_and_new: Stream<(K, M), Tick<L>, Bounded, _> = remaining_to_join.union(metadata);
let responses = unsafe {
responses.tick_batch()
};
let joined_this_tick =
remaining_and_new
.clone()
.join(responses.clone())
.map(q!(|(key, (meta, resp))| (key, (meta, resp))));
remaining_to_join_complete_cycle
.complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key))));
joined_this_tick.all_ticks()
}