Skip to main content

hydro_test/distributed/
distributed_echo.rs

1use hydro_lang::live_collections::stream::TotalOrder;
2use hydro_lang::location::NetworkHint;
3use hydro_lang::location::external_process::{ExternalBytesPort, Many};
4use hydro_lang::prelude::*;
5use tokio_util::codec::LengthDelimitedCodec;
6
7pub struct P1 {}
8pub struct C2 {}
9pub struct C3 {}
10pub struct P4 {}
11
12pub fn distributed_echo<'a>(
13    external: &External<'a, ()>,
14    p1: &Process<'a, P1>,
15    c2: &Cluster<'a, C2>,
16    c3: &Cluster<'a, C3>,
17    p4: &Process<'a, P4>,
18) -> ExternalBytesPort<Many> {
19    let (bidi_port, raw_input, _external_membership, response_sink) = p1
20        .bidi_external_many_bytes::<_, bytes::Bytes, LengthDelimitedCodec>(
21            external,
22            NetworkHint::Auto,
23        );
24
25    // Parse incoming JSON bytes to u32
26    let input = raw_input.filter_map(q!(|data: bytes::BytesMut| {
27        let json_str = String::from_utf8_lossy(&data);
28        match serde_json::from_str::<u32>(&json_str) {
29            Ok(n) => Some(n),
30            Err(e) => {
31                println!("[P1] Failed to parse JSON input '{}': {}", json_str, e);
32                None
33            }
34        }
35    }));
36
37    let echo_result = input
38        .inspect_with_key(q!(|(client_id, n)| println!(
39            "[P1] received from external client {client_id}: {n}"
40        )))
41        // Convert keyed stream to stream of tuples, incrementing the value
42        .entries()
43        .map(q!(|(client_id, n)| (client_id, n + 1)))
44        .assume_ordering::<TotalOrder>(nondet!(/** external input order */))
45        .round_robin(c2, TCP.fail_stop().bincode().name("p1_to_c2"), nondet!(/** test */))
46        .inspect(q!(|(client_id, n)| println!(
47            "[C2] received from client {client_id}: {n}"
48        )))
49        .map(q!(|(client_id, n)| (client_id, n + 1)))
50        .round_robin(c3, TCP.fail_stop().bincode().name("c2_to_c3"), nondet!(/** test */))
51        .inspect(q!(|(client_id, n)| println!(
52            "[C3] received from client {client_id}: {n}"
53        )))
54        .map(q!(|(client_id, n)| (client_id, n + 1)))
55        .values()
56        .send(p4, TCP.fail_stop().bincode().name("c3_to_p4"))
57        .inspect(q!(|(client_id, n)| println!(
58            "[P4] received from client {client_id}: {n}"
59        )))
60        .map(q!(|(client_id, n)| (client_id, n + 1)))
61        .values()
62        .send(p1, TCP.fail_stop().bincode().name("p4_to_p1"));
63
64    let tick = p1.tick();
65
66    let all_responses = echo_result
67        .batch(&tick, nondet!(/** test */))
68        .map(q!(|(client_id, n)| {
69            let json = serde_json::to_string(&n).unwrap();
70            (client_id, bytes::Bytes::from(json))
71        }))
72        .into_keyed()
73        .all_ticks();
74
75    response_sink.complete(all_responses);
76
77    bidi_port
78}