hydro_test/external_client/
echo.rs1use std::time::Duration;
2
3use hydro_lang::keyed_stream::KeyedStream;
4use hydro_lang::location::MembershipEvent;
5use hydro_lang::*;
6use hydro_std::membership::track_membership;
7
8pub fn echo_server<'a, P>(
9 in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
10 membership: KeyedStream<u64, MembershipEvent, Process<'a, P>, Unbounded, TotalOrder>,
11) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder> {
12 let current_connections = track_membership(membership);
13
14 current_connections
15 .key_count()
16 .sample_every(q!(Duration::from_secs(1)), nondet!())
17 .for_each(q!(|count| {
18 println!("Current connections: {}", count);
19 }));
20
21 in_stream.inspect_with_key(q!(|(id, t)| println!(
22 "...received request {} from client #{}, echoing back...",
23 t, id
24 )))
25}