1#[cfg(feature = "test_embedded")]
2#[expect(
3 clippy::allow_attributes,
4 clippy::allow_attributes_without_reason,
5 reason = "generated code"
6)]
7#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
8pub mod embedded {
9 include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
10}
11
12#[cfg(feature = "test_embedded")]
13#[expect(
14 clippy::allow_attributes,
15 clippy::allow_attributes_without_reason,
16 reason = "generated code"
17)]
18#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
19pub mod singleton_input {
20 include!(concat!(env!("OUT_DIR"), "/singleton_input.rs"));
21}
22
23#[cfg(feature = "test_embedded")]
24#[expect(
25 clippy::allow_attributes,
26 clippy::allow_attributes_without_reason,
27 reason = "generated code"
28)]
29#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
30pub mod echo_network {
31 include!(concat!(env!("OUT_DIR"), "/echo_network.rs"));
32}
33
34#[cfg(feature = "test_embedded")]
35#[expect(
36 clippy::allow_attributes,
37 clippy::allow_attributes_without_reason,
38 reason = "generated code"
39)]
40#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
41pub mod o2m_broadcast {
42 include!(concat!(env!("OUT_DIR"), "/o2m_broadcast.rs"));
43}
44
45#[cfg(feature = "test_embedded")]
46#[expect(
47 clippy::allow_attributes,
48 clippy::allow_attributes_without_reason,
49 reason = "generated code"
50)]
51#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
52pub mod m2o_send {
53 include!(concat!(env!("OUT_DIR"), "/m2o_send.rs"));
54}
55
56#[cfg(feature = "test_embedded")]
57#[expect(
58 clippy::allow_attributes,
59 clippy::allow_attributes_without_reason,
60 reason = "generated code"
61)]
62#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
63pub mod m2m_broadcast {
64 include!(concat!(env!("OUT_DIR"), "/m2m_broadcast.rs"));
65}
66
67#[cfg(all(test, feature = "test_embedded"))]
68mod tests {
69 use dfir_rs::bytes::{Bytes, BytesMut};
70 use dfir_rs::futures::stream;
71 use hydro_lang::location::MembershipEvent;
72 use hydro_lang::location::member_id::TaglessMemberId;
73
74 async fn run_dfir(mut flow: dfir_rs::scheduled::graph::Dfir<'_>) {
75 tokio::task::LocalSet::new()
76 .run_until(flow.run_available())
77 .await;
78 }
79
80 #[tokio::test]
83 async fn test_embedded_capitalize() {
84 let input = stream::iter(vec![
85 "hello".to_owned(),
86 "world".to_owned(),
87 "hydro".to_owned(),
88 ]);
89 let mut collected = vec![];
90 let mut outputs = crate::embedded::capitalize::EmbeddedOutputs {
91 output: |s: String| collected.push(s),
92 };
93 let flow = crate::embedded::capitalize(input, &mut outputs);
94 run_dfir(flow).await;
95 assert_eq!(collected, vec!["HELLO", "WORLD", "HYDRO"]);
96 }
97
98 #[tokio::test]
101 async fn test_embedded_singleton_input() {
102 let names = stream::iter(vec!["Alice".to_owned(), "Bob".to_owned()]);
103 let mut collected = vec![];
104 let mut outputs = crate::singleton_input::prefix_names::EmbeddedOutputs {
105 output: |s: String| collected.push(s),
106 };
107 let flow = crate::singleton_input::prefix_names("Hello".to_owned(), names, &mut outputs);
108 run_dfir(flow).await;
109 assert_eq!(collected, vec!["Hello Alice", "Hello Bob"]);
110 }
111
112 #[tokio::test]
116 async fn test_echo_network() {
117 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
118
119 let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
121 let mut net_out = crate::echo_network::echo_sender::EmbeddedNetworkOut {
122 messages: move |bytes: Bytes| {
123 tx.send(bytes).unwrap();
124 },
125 };
126 run_dfir(crate::echo_network::echo_sender(input, &mut net_out)).await;
127
128 let mut bytes_vec = vec![];
129 while let Ok(b) = rx.try_recv() {
130 bytes_vec.push(Ok(BytesMut::from(b.as_ref())));
131 }
132 assert_eq!(bytes_vec.len(), 2);
133
134 let net_in = crate::echo_network::echo_receiver::EmbeddedNetworkIn {
136 messages: stream::iter(bytes_vec),
137 };
138 let mut received = vec![];
139 let mut outputs = crate::echo_network::echo_receiver::EmbeddedOutputs {
140 output: |s: String| received.push(s),
141 };
142 run_dfir(crate::echo_network::echo_receiver(&mut outputs, net_in)).await;
143 assert_eq!(received, vec!["HELLO", "WORLD"]);
144 }
145
146 #[tokio::test]
150 async fn test_o2m_broadcast() {
151 let member_id = TaglessMemberId::from_raw_id(0);
152 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
153
154 let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
156 let membership = crate::o2m_broadcast::o2m_sender::EmbeddedMembershipStreams {
157 o2m_receiver: stream::iter(vec![(member_id.clone(), MembershipEvent::Joined)]),
158 };
159 let mut net_out = crate::o2m_broadcast::o2m_sender::EmbeddedNetworkOut {
160 o2m_data: move |item: (TaglessMemberId, Bytes)| {
161 tx.send(item).unwrap();
162 },
163 };
164 run_dfir(crate::o2m_broadcast::o2m_sender(
165 membership,
166 input,
167 &mut net_out,
168 ))
169 .await;
170
171 let mut tagged_bytes = vec![];
172 while let Ok((id, b)) = rx.try_recv() {
173 assert_eq!(id, member_id);
174 tagged_bytes.push(Ok(BytesMut::from(b.as_ref())));
175 }
176 assert_eq!(tagged_bytes.len(), 2);
177
178 let net_in = crate::o2m_broadcast::o2m_receiver::EmbeddedNetworkIn {
180 o2m_data: stream::iter(tagged_bytes),
181 };
182 let mut received = vec![];
183 let mut outputs = crate::o2m_broadcast::o2m_receiver::EmbeddedOutputs {
184 output: |s: String| received.push(s),
185 };
186 run_dfir(crate::o2m_broadcast::o2m_receiver(
187 &member_id,
188 &mut outputs,
189 net_in,
190 ))
191 .await;
192 assert_eq!(received, vec!["HELLO", "WORLD"]);
193 }
194
195 #[tokio::test]
199 async fn test_m2o_send() {
200 let member_id = TaglessMemberId::from_raw_id(42);
201 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
202
203 let input = stream::iter(vec!["foo".to_owned(), "bar".to_owned()]);
205 let mut net_out = crate::m2o_send::m2o_sender::EmbeddedNetworkOut {
206 m2o_data: move |bytes: Bytes| {
207 tx.send(bytes).unwrap();
208 },
209 };
210 run_dfir(crate::m2o_send::m2o_sender(&member_id, input, &mut net_out)).await;
211
212 let mut tagged_bytes = vec![];
214 while let Ok(b) = rx.try_recv() {
215 tagged_bytes.push(Ok((member_id.clone(), BytesMut::from(b.as_ref()))));
216 }
217 assert_eq!(tagged_bytes.len(), 2);
218
219 let net_in = crate::m2o_send::m2o_receiver::EmbeddedNetworkIn {
221 m2o_data: stream::iter(tagged_bytes),
222 };
223 let mut received = vec![];
224 let mut outputs = crate::m2o_send::m2o_receiver::EmbeddedOutputs {
225 output: |s| received.push(s),
226 };
227 run_dfir(crate::m2o_send::m2o_receiver(&mut outputs, net_in)).await;
228 assert_eq!(received.len(), 2);
229 assert_eq!(received[0].1, "FOO");
231 assert_eq!(received[1].1, "BAR");
232 }
233
234 #[tokio::test]
238 async fn test_m2m_broadcast() {
239 let src_id = TaglessMemberId::from_raw_id(0);
240 let dst_id = TaglessMemberId::from_raw_id(0);
241 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
242
243 let input = stream::iter(vec!["ping".to_owned()]);
245 let membership = crate::m2m_broadcast::m2m_sender::EmbeddedMembershipStreams {
246 m2m_receiver: stream::iter(vec![(dst_id.clone(), MembershipEvent::Joined)]),
247 };
248 let mut net_out = crate::m2m_broadcast::m2m_sender::EmbeddedNetworkOut {
249 m2m_data: move |item: (TaglessMemberId, Bytes)| {
250 tx.send(item).unwrap();
251 },
252 };
253 run_dfir(crate::m2m_broadcast::m2m_sender(
254 &src_id,
255 membership,
256 input,
257 &mut net_out,
258 ))
259 .await;
260
261 let mut tagged_bytes = vec![];
262 while let Ok((id, b)) = rx.try_recv() {
263 assert_eq!(id, dst_id);
264 tagged_bytes.push(Ok((src_id.clone(), BytesMut::from(b.as_ref()))));
265 }
266 assert_eq!(tagged_bytes.len(), 1);
267
268 let net_in = crate::m2m_broadcast::m2m_receiver::EmbeddedNetworkIn {
270 m2m_data: stream::iter(tagged_bytes),
271 };
272 let mut received = vec![];
273 let mut outputs = crate::m2m_broadcast::m2m_receiver::EmbeddedOutputs {
274 output: |s| received.push(s),
275 };
276 run_dfir(crate::m2m_broadcast::m2m_receiver(
277 &dst_id,
278 &mut outputs,
279 net_in,
280 ))
281 .await;
282 assert_eq!(received.len(), 1);
283 assert_eq!(received[0].1, "PING");
284 }
285}