Skip to main content

hydro_test/external_client/
echo.rs

1use std::time::Duration;
2
3use hydro_lang::live_collections::stream::{ExactlyOnce, TotalOrder};
4use hydro_lang::location::MembershipEvent;
5use hydro_lang::prelude::*;
6use hydro_std::membership::track_membership;
7
8pub fn echo_server<'a, P>(
9    in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
10    membership: KeyedStream<u64, MembershipEvent, Process<'a, P>, Unbounded, TotalOrder>,
11) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder> {
12    sliced! {
13        let conns = use(track_membership(membership), nondet!(/** logging */));
14        conns.filter(q!(|b| *b)).key_count()
15    }
16    .sample_every(q!(Duration::from_secs(1)), nondet!(/** logging */))
17    .assume_retries::<ExactlyOnce>(nondet!(/** extra logs due to duplicate samples are okay */))
18    .for_each(q!(|count| {
19        println!("Current connections: {}", count);
20    }));
21
22    in_stream.inspect_with_key(q!(|(id, t)| println!(
23        "...received request {} from client #{}, echoing back...",
24        t, id
25    )))
26}