hydro_test/distributed/
first_ten.rs

1use hydro_lang::*;
2use location::external_process::ExternalBincodeSink;
3use serde::{Deserialize, Serialize};
4
5#[derive(Serialize, Deserialize)]
6struct SendOverNetwork {
7    pub n: u32,
8}
9
10pub struct P1 {}
11pub struct P2 {}
12
13pub fn first_ten_distributed<'a>(
14    external: &ExternalProcess<'a, ()>,
15    process: &Process<'a, P1>,
16    second_process: &Process<'a, P2>,
17) -> ExternalBincodeSink<String> {
18    let (numbers_external_port, numbers_external) = external.source_external_bincode(process);
19    numbers_external.for_each(q!(|n| println!("hi: {:?}", n)));
20
21    let numbers = process.source_iter(q!(0..10));
22    numbers
23        .map(q!(|n| SendOverNetwork { n }))
24        .send_bincode(second_process)
25        .for_each(q!(|n| println!("{}", n.n)));
26
27    numbers_external_port
28}
29
30#[cfg(test)]
31mod tests {
32    use futures::SinkExt;
33    use hydro_deploy::Deployment;
34    use hydro_lang::deploy::DeployCrateWrapper;
35
36    #[tokio::test]
37    async fn first_ten_distributed() {
38        let mut deployment = Deployment::new();
39
40        let builder = hydro_lang::FlowBuilder::new();
41        let external = builder.external_process();
42        let p1 = builder.process();
43        let p2 = builder.process();
44        let external_port = super::first_ten_distributed(&external, &p1, &p2);
45
46        let built = builder.with_default_optimize();
47
48        insta::assert_debug_snapshot!(built.ir());
49
50        let nodes = built
51            .with_process(&p1, deployment.Localhost())
52            .with_process(&p2, deployment.Localhost())
53            .with_external(&external, deployment.Localhost())
54            .deploy(&mut deployment);
55
56        deployment.deploy().await.unwrap();
57
58        let mut external_port = nodes.connect_sink_bincode(external_port).await;
59
60        let mut first_node_stdout = nodes.get_process(&p1).stdout().await;
61        let mut second_node_stdout = nodes.get_process(&p2).stdout().await;
62
63        deployment.start().await.unwrap();
64
65        external_port
66            .send("this is some string".to_string())
67            .await
68            .unwrap();
69        assert_eq!(
70            first_node_stdout.recv().await.unwrap(),
71            "hi: \"this is some string\""
72        );
73
74        for i in 0..10 {
75            assert_eq!(second_node_stdout.recv().await.unwrap(), i.to_string());
76        }
77    }
78}