hydro_test/distributed/
first_ten.rs

1use hydro_lang::*;
2use location::external_process::ExternalBincodeSink;
3use serde::{Deserialize, Serialize};
4
5#[derive(Serialize, Deserialize)]
6struct SendOverNetwork {
7    pub n: u32,
8}
9
10pub struct P1 {}
11pub struct P2 {}
12
13pub fn first_ten_distributed<'a>(
14    external: &External<'a, ()>,
15    process: &Process<'a, P1>,
16    second_process: &Process<'a, P2>,
17) -> ExternalBincodeSink<String> {
18    let (numbers_external_port, numbers_external) = process.source_external_bincode(external);
19    numbers_external.for_each(q!(|n| println!("hi: {:?}", n)));
20
21    let numbers = process.source_iter(q!(0..10));
22    numbers
23        .map(q!(|n| SendOverNetwork { n }))
24        .send_bincode(second_process)
25        .for_each(q!(|n| println!("{}", n.n)));
26
27    numbers_external_port
28}
29
30#[cfg(test)]
31mod tests {
32    use futures::SinkExt;
33    use hydro_deploy::Deployment;
34    use hydro_lang::deploy::DeployCrateWrapper;
35
36    #[test]
37    fn first_ten_distributed_ir() {
38        let builder = hydro_lang::FlowBuilder::new();
39        let external = builder.external();
40        let p1 = builder.process();
41        let p2 = builder.process();
42        super::first_ten_distributed(&external, &p1, &p2);
43
44        hydro_build_utils::assert_debug_snapshot!(builder.finalize().ir());
45    }
46
47    #[tokio::test]
48    async fn first_ten_distributed() {
49        let mut deployment = Deployment::new();
50
51        let builder = hydro_lang::FlowBuilder::new();
52        let external = builder.external();
53        let p1 = builder.process();
54        let p2 = builder.process();
55        let external_port = super::first_ten_distributed(&external, &p1, &p2);
56
57        let nodes = builder
58            .with_default_optimize()
59            .with_process(&p1, deployment.Localhost())
60            .with_process(&p2, deployment.Localhost())
61            .with_external(&external, deployment.Localhost())
62            .deploy(&mut deployment);
63
64        deployment.deploy().await.unwrap();
65
66        let mut external_port = nodes.connect_sink_bincode(external_port).await;
67
68        let mut first_node_stdout = nodes.get_process(&p1).stdout().await;
69        let mut second_node_stdout = nodes.get_process(&p2).stdout().await;
70
71        deployment.start().await.unwrap();
72
73        external_port
74            .send("this is some string".to_string())
75            .await
76            .unwrap();
77        assert_eq!(
78            first_node_stdout.recv().await.unwrap(),
79            "hi: \"this is some string\""
80        );
81
82        for i in 0..10 {
83            assert_eq!(second_node_stdout.recv().await.unwrap(), i.to_string());
84        }
85    }
86}