hydro_test/external_client/
echo.rs1use std::time::Duration;
2
3use hydro_lang::live_collections::stream::{ExactlyOnce, TotalOrder};
4use hydro_lang::location::MembershipEvent;
5use hydro_lang::prelude::*;
6use hydro_std::membership::track_membership;
7
8pub fn echo_server<'a, P>(
9 in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
10 membership: KeyedStream<u64, MembershipEvent, Process<'a, P>, Unbounded, TotalOrder>,
11) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder> {
12 sliced! {
13 let conns = use(track_membership(membership), nondet!());
14 conns.filter(q!(|b| *b)).key_count()
15 }
16 .sample_every(q!(Duration::from_secs(1)), nondet!())
17 .assume_retries::<ExactlyOnce>(nondet!())
18 .for_each(q!(|count| {
19 println!("Current connections: {}", count);
20 }));
21
22 in_stream.inspect_with_key(q!(|(id, t)| println!(
23 "...received request {} from client #{}, echoing back...",
24 t, id
25 )))
26}