hydro_test/distributed/
first_ten.rs
1use hydro_lang::*;
2use location::external_process::ExternalBincodeSink;
3use serde::{Deserialize, Serialize};
4
5#[derive(Serialize, Deserialize)]
6struct SendOverNetwork {
7 pub n: u32,
8}
9
10pub struct P1 {}
11pub struct P2 {}
12
13pub fn first_ten_distributed<'a>(
14 external: &ExternalProcess<'a, ()>,
15 process: &Process<'a, P1>,
16 second_process: &Process<'a, P2>,
17) -> ExternalBincodeSink<String> {
18 let (numbers_external_port, numbers_external) = external.source_external_bincode(process);
19 numbers_external.for_each(q!(|n| println!("hi: {:?}", n)));
20
21 let numbers = process.source_iter(q!(0..10));
22 numbers
23 .map(q!(|n| SendOverNetwork { n }))
24 .send_bincode(second_process)
25 .for_each(q!(|n| println!("{}", n.n)));
26
27 numbers_external_port
28}
29
30#[cfg(test)]
31mod tests {
32 use futures::SinkExt;
33 use hydro_deploy::Deployment;
34 use hydro_lang::deploy::DeployCrateWrapper;
35
36 #[tokio::test]
37 async fn first_ten_distributed() {
38 let mut deployment = Deployment::new();
39
40 let builder = hydro_lang::FlowBuilder::new();
41 let external = builder.external_process();
42 let p1 = builder.process();
43 let p2 = builder.process();
44 let external_port = super::first_ten_distributed(&external, &p1, &p2);
45
46 let built = builder.with_default_optimize();
47
48 insta::assert_debug_snapshot!(built.ir());
49
50 let nodes = built
51 .with_process(&p1, deployment.Localhost())
52 .with_process(&p2, deployment.Localhost())
53 .with_external(&external, deployment.Localhost())
54 .deploy(&mut deployment);
55
56 deployment.deploy().await.unwrap();
57
58 let mut external_port = nodes.connect_sink_bincode(external_port).await;
59
60 let mut first_node_stdout = nodes.get_process(&p1).stdout().await;
61 let mut second_node_stdout = nodes.get_process(&p2).stdout().await;
62
63 deployment.start().await.unwrap();
64
65 external_port
66 .send("this is some string".to_string())
67 .await
68 .unwrap();
69 assert_eq!(
70 first_node_stdout.recv().await.unwrap(),
71 "hi: \"this is some string\""
72 );
73
74 for i in 0..10 {
75 assert_eq!(second_node_stdout.recv().await.unwrap(), i.to_string());
76 }
77 }
78}