hydro_test/distributed/
first_ten.rs1use hydro_lang::*;
2use location::external_process::ExternalBincodeSink;
3use serde::{Deserialize, Serialize};
4
5#[derive(Serialize, Deserialize)]
6struct SendOverNetwork {
7 pub n: u32,
8}
9
10pub struct P1 {}
11pub struct P2 {}
12
13pub fn first_ten_distributed<'a>(
14 external: &External<'a, ()>,
15 process: &Process<'a, P1>,
16 second_process: &Process<'a, P2>,
17) -> ExternalBincodeSink<String> {
18 let (numbers_external_port, numbers_external) = process.source_external_bincode(external);
19 numbers_external.for_each(q!(|n| println!("hi: {:?}", n)));
20
21 let numbers = process.source_iter(q!(0..10));
22 numbers
23 .map(q!(|n| SendOverNetwork { n }))
24 .send_bincode(second_process)
25 .for_each(q!(|n| println!("{}", n.n)));
26
27 numbers_external_port
28}
29
30#[cfg(test)]
31mod tests {
32 use futures::SinkExt;
33 use hydro_deploy::Deployment;
34 use hydro_lang::deploy::DeployCrateWrapper;
35
36 #[test]
37 fn first_ten_distributed_ir() {
38 let builder = hydro_lang::FlowBuilder::new();
39 let external = builder.external();
40 let p1 = builder.process();
41 let p2 = builder.process();
42 super::first_ten_distributed(&external, &p1, &p2);
43
44 hydro_build_utils::assert_debug_snapshot!(builder.finalize().ir());
45 }
46
47 #[tokio::test]
48 async fn first_ten_distributed() {
49 let mut deployment = Deployment::new();
50
51 let builder = hydro_lang::FlowBuilder::new();
52 let external = builder.external();
53 let p1 = builder.process();
54 let p2 = builder.process();
55 let external_port = super::first_ten_distributed(&external, &p1, &p2);
56
57 let nodes = builder
58 .with_default_optimize()
59 .with_process(&p1, deployment.Localhost())
60 .with_process(&p2, deployment.Localhost())
61 .with_external(&external, deployment.Localhost())
62 .deploy(&mut deployment);
63
64 deployment.deploy().await.unwrap();
65
66 let mut external_port = nodes.connect_sink_bincode(external_port).await;
67
68 let mut first_node_stdout = nodes.get_process(&p1).stdout().await;
69 let mut second_node_stdout = nodes.get_process(&p2).stdout().await;
70
71 deployment.start().await.unwrap();
72
73 external_port
74 .send("this is some string".to_string())
75 .await
76 .unwrap();
77 assert_eq!(
78 first_node_stdout.recv().await.unwrap(),
79 "hi: \"this is some string\""
80 );
81
82 for i in 0..10 {
83 assert_eq!(second_node_stdout.recv().await.unwrap(), i.to_string());
84 }
85 }
86}