hydro_std/
request_response.rs

1use std::hash::Hash;
2
3use hydro_lang::*;
4use location::NoTick;
5
6type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), Atomic<L>, Unbounded, NoOrder>;
7
8/// Given an incoming stream of request-response responses, joins with metadata generated
9/// at request time that is stored in-memory.
10///
11/// The metadata must be generated in the same or a previous tick than the response,
12/// typically at request time. Only one response element should be produced with a given
13/// key, same for the metadata stream.
14pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
15    tick: &Tick<L>,
16    responses: Stream<(K, V), Atomic<L>, Unbounded, NoOrder>,
17    metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
18) -> JoinResponses<K, M, V, L> {
19    let (remaining_to_join_complete_cycle, remaining_to_join) =
20        tick.cycle::<Stream<_, _, _, NoOrder>>();
21
22    let remaining_and_new: Stream<(K, M), Tick<L>, Bounded, _> = remaining_to_join.chain(metadata);
23
24    let responses = unsafe {
25        // SAFETY: because we persist the metadata, delays resulting from
26        // batching boundaries do not affect the output contents.
27        responses.tick_batch()
28    };
29
30    // TODO(shadaj): we should have a "split-join" operator
31    // that returns both join and anti-join without cloning
32    let joined_this_tick =
33        remaining_and_new
34            .clone()
35            .join(responses.clone())
36            .map(q!(|(key, (meta, resp))| (key, (meta, resp))));
37
38    remaining_to_join_complete_cycle
39        .complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key))));
40
41    joined_this_tick.all_ticks_atomic()
42}