hydro_test/external_client/
echo.rs

1use std::time::Duration;
2
3use hydro_lang::keyed_stream::KeyedStream;
4use hydro_lang::location::MembershipEvent;
5use hydro_lang::*;
6use hydro_std::membership::track_membership;
7
8pub fn echo_server<'a, P>(
9    in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
10    membership: KeyedStream<u64, MembershipEvent, Process<'a, P>, Unbounded, TotalOrder>,
11) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder> {
12    let current_connections = track_membership(membership);
13
14    current_connections
15        .key_count()
16        .sample_every(q!(Duration::from_secs(1)), nondet!(/** logging */))
17        .for_each(q!(|count| {
18            println!("Current connections: {}", count);
19        }));
20
21    in_stream.inspect_with_key(q!(|(id, t)| println!(
22        "...received request {} from client #{}, echoing back...",
23        t, id
24    )))
25}