hydro_test/distributed/
distributed_echo.rs1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::location::NetworkHint;
3use hydro_lang::location::external_process::{ExternalBytesPort, Many};
4use hydro_lang::prelude::*;
5use tokio_util::codec::LengthDelimitedCodec;
6
7pub struct P1 {}
8pub struct C2 {}
9pub struct C3 {}
10pub struct P4 {}
11
12pub fn distributed_echo<'a>(
13 external: &External<'a, ()>,
14 p1: &Process<'a, P1>,
15 c2: &Cluster<'a, C2>,
16 c3: &Cluster<'a, C3>,
17 p4: &Process<'a, P4>,
18) -> ExternalBytesPort<Many> {
19 let (bidi_port, raw_input, _external_membership, response_sink) = p1
20 .bidi_external_many_bytes::<_, bytes::Bytes, LengthDelimitedCodec>(
21 external,
22 NetworkHint::Auto,
23 );
24
25 let input = raw_input.filter_map(q!(|data: bytes::BytesMut| {
27 let json_str = String::from_utf8_lossy(&data);
28 match serde_json::from_str::<u32>(&json_str) {
29 Ok(n) => Some(n),
30 Err(e) => {
31 println!("[P1] Failed to parse JSON input '{}': {}", json_str, e);
32 None
33 }
34 }
35 }));
36
37 let echo_result = input
38 .inspect_with_key(q!(|(client_id, n)| println!(
39 "[P1] received from external client {client_id}: {n}"
40 )))
41 .entries()
43 .map(q!(|(client_id, n)| (client_id, n + 1)))
44 .assume_ordering::<TotalOrder>(nondet!())
45 .round_robin(c2, TCP.fail_stop().bincode().name("p1_to_c2"), nondet!())
46 .inspect(q!(|(client_id, n)| println!(
47 "[C2] received from client {client_id}: {n}"
48 )))
49 .map(q!(|(client_id, n)| (client_id, n + 1)))
50 .round_robin(c3, TCP.fail_stop().bincode().name("c2_to_c3"), nondet!())
51 .inspect(q!(|(client_id, n)| println!(
52 "[C3] received from client {client_id}: {n}"
53 )))
54 .map(q!(|(client_id, n)| (client_id, n + 1)))
55 .values()
56 .send(p4, TCP.fail_stop().bincode().name("c3_to_p4"))
57 .inspect(q!(|(client_id, n)| println!(
58 "[P4] received from client {client_id}: {n}"
59 )))
60 .map(q!(|(client_id, n)| (client_id, n + 1)))
61 .values()
62 .send(p1, TCP.fail_stop().bincode().name("p4_to_p1"));
63
64 let tick = p1.tick();
65
66 let all_responses = echo_result
67 .batch(&tick, nondet!())
68 .map(q!(|(client_id, n)| {
69 let json = serde_json::to_string(&n).unwrap();
70 (client_id, bytes::Bytes::from(json))
71 }))
72 .into_keyed()
73 .all_ticks();
74
75 response_sink.complete(all_responses);
76
77 bidi_port
78}