1#[cfg(feature = "test_embedded")]
2#[expect(
3 clippy::allow_attributes,
4 clippy::allow_attributes_without_reason,
5 reason = "generated code"
6)]
7#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
8pub mod embedded {
9 include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
10}
11
12#[cfg(feature = "test_embedded")]
13#[expect(
14 clippy::allow_attributes,
15 clippy::allow_attributes_without_reason,
16 reason = "generated code"
17)]
18#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
19pub mod singleton_input {
20 include!(concat!(env!("OUT_DIR"), "/singleton_input.rs"));
21}
22
23#[cfg(feature = "test_embedded")]
24#[expect(
25 clippy::allow_attributes,
26 clippy::allow_attributes_without_reason,
27 reason = "generated code"
28)]
29#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
30pub mod echo_network {
31 include!(concat!(env!("OUT_DIR"), "/echo_network.rs"));
32}
33
34#[cfg(feature = "test_embedded")]
35#[expect(
36 clippy::allow_attributes,
37 clippy::allow_attributes_without_reason,
38 reason = "generated code"
39)]
40#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
41pub mod o2m_broadcast {
42 include!(concat!(env!("OUT_DIR"), "/o2m_broadcast.rs"));
43}
44
45#[cfg(feature = "test_embedded")]
46#[expect(
47 clippy::allow_attributes,
48 clippy::allow_attributes_without_reason,
49 reason = "generated code"
50)]
51#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
52pub mod m2o_send {
53 include!(concat!(env!("OUT_DIR"), "/m2o_send.rs"));
54}
55
56#[cfg(feature = "test_embedded")]
57#[expect(
58 clippy::allow_attributes,
59 clippy::allow_attributes_without_reason,
60 reason = "generated code"
61)]
62#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
63pub mod m2m_broadcast {
64 include!(concat!(env!("OUT_DIR"), "/m2m_broadcast.rs"));
65}
66
67#[cfg(all(test, feature = "test_embedded"))]
68mod tests {
69 use dfir_rs::bytes::{Bytes, BytesMut};
70 use dfir_rs::futures::stream;
71 use hydro_lang::location::MembershipEvent;
72 use hydro_lang::location::member_id::TaglessMemberId;
73
74 async fn run_flow(
75 flow: &mut dfir_rs::scheduled::context::Dfir<impl dfir_rs::scheduled::context::TickClosure>,
76 ) {
77 tokio::task::LocalSet::new()
78 .run_until(flow.run_tick())
79 .await;
80 }
81
82 #[tokio::test]
85 async fn test_embedded_capitalize() {
86 let input = stream::iter(vec![
87 "hello".to_owned(),
88 "world".to_owned(),
89 "hydro".to_owned(),
90 ]);
91 let mut collected = vec![];
92 let mut outputs = crate::embedded::capitalize::EmbeddedOutputs {
93 output: |s: String| collected.push(s),
94 };
95 let mut flow = crate::embedded::capitalize(input, &mut outputs);
96 run_flow(&mut flow).await;
97 drop(flow);
98 assert_eq!(collected, vec!["HELLO", "WORLD", "HYDRO"]);
99 }
100
101 #[tokio::test]
104 async fn test_embedded_singleton_input() {
105 let names = stream::iter(vec!["Alice".to_owned(), "Bob".to_owned()]);
106 let mut collected = vec![];
107 let mut outputs = crate::singleton_input::prefix_names::EmbeddedOutputs {
108 output: |s: String| collected.push(s),
109 };
110 let mut flow =
111 crate::singleton_input::prefix_names("Hello".to_owned(), names, &mut outputs);
112 run_flow(&mut flow).await;
113 drop(flow);
114 assert_eq!(collected, vec!["Hello Alice", "Hello Bob"]);
115 }
116
117 #[tokio::test]
121 async fn test_echo_network() {
122 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
123
124 let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
126 let mut net_out = crate::echo_network::echo_sender::EmbeddedNetworkOut {
127 messages: move |bytes: Bytes| {
128 tx.send(bytes).unwrap();
129 },
130 };
131 let mut flow_sender = crate::echo_network::echo_sender(input, &mut net_out);
132 run_flow(&mut flow_sender).await;
133 drop(flow_sender);
134
135 let mut bytes_vec = vec![];
136 while let Ok(b) = rx.try_recv() {
137 bytes_vec.push(Ok(BytesMut::from(b.as_ref())));
138 }
139 assert_eq!(bytes_vec.len(), 2);
140
141 let net_in = crate::echo_network::echo_receiver::EmbeddedNetworkIn {
143 messages: stream::iter(bytes_vec),
144 };
145 let mut received = vec![];
146 let mut outputs = crate::echo_network::echo_receiver::EmbeddedOutputs {
147 output: |s: String| received.push(s),
148 };
149 let mut flow_receiver = crate::echo_network::echo_receiver(&mut outputs, net_in);
150 run_flow(&mut flow_receiver).await;
151 drop(flow_receiver);
152 assert_eq!(received, vec!["HELLO", "WORLD"]);
153 }
154
155 #[tokio::test]
159 async fn test_o2m_broadcast() {
160 let member_id = TaglessMemberId::from_raw_id(0);
161 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
162
163 let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
165 let membership = crate::o2m_broadcast::o2m_sender::EmbeddedMembershipStreams {
166 o2m_receiver: stream::iter(vec![(member_id.clone(), MembershipEvent::Joined)]),
167 };
168 let mut net_out = crate::o2m_broadcast::o2m_sender::EmbeddedNetworkOut {
169 o2m_data: move |item: (TaglessMemberId, Bytes)| {
170 tx.send(item).unwrap();
171 },
172 };
173 let mut flow_sender = crate::o2m_broadcast::o2m_sender(membership, input, &mut net_out);
174 run_flow(&mut flow_sender).await;
175 drop(flow_sender);
176
177 let mut tagged_bytes = vec![];
178 while let Ok((id, b)) = rx.try_recv() {
179 assert_eq!(id, member_id);
180 tagged_bytes.push(Ok(BytesMut::from(b.as_ref())));
181 }
182 assert_eq!(tagged_bytes.len(), 2);
183
184 let net_in = crate::o2m_broadcast::o2m_receiver::EmbeddedNetworkIn {
186 o2m_data: stream::iter(tagged_bytes),
187 };
188 let mut received = vec![];
189 let mut outputs = crate::o2m_broadcast::o2m_receiver::EmbeddedOutputs {
190 output: |s: String| received.push(s),
191 };
192 let mut flow_receiver =
193 crate::o2m_broadcast::o2m_receiver(&member_id, &mut outputs, net_in);
194 run_flow(&mut flow_receiver).await;
195 drop(flow_receiver);
196 assert_eq!(received, vec!["HELLO", "WORLD"]);
197 }
198
199 #[tokio::test]
203 async fn test_m2o_send() {
204 let member_id = TaglessMemberId::from_raw_id(42);
205 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
206
207 let input = stream::iter(vec!["foo".to_owned(), "bar".to_owned()]);
209 let mut net_out = crate::m2o_send::m2o_sender::EmbeddedNetworkOut {
210 m2o_data: move |bytes: Bytes| {
211 tx.send(bytes).unwrap();
212 },
213 };
214 let mut flow_sender = crate::m2o_send::m2o_sender(&member_id, input, &mut net_out);
215 run_flow(&mut flow_sender).await;
216 drop(flow_sender);
217
218 let mut tagged_bytes = vec![];
219 while let Ok(b) = rx.try_recv() {
221 tagged_bytes.push(Ok((member_id.clone(), BytesMut::from(b.as_ref()))));
222 }
223 assert_eq!(tagged_bytes.len(), 2);
224
225 let net_in = crate::m2o_send::m2o_receiver::EmbeddedNetworkIn {
227 m2o_data: stream::iter(tagged_bytes),
228 };
229 let mut received = vec![];
230 let mut outputs = crate::m2o_send::m2o_receiver::EmbeddedOutputs {
231 output: |s| received.push(s),
232 };
233 let mut flow_receiver = crate::m2o_send::m2o_receiver(&mut outputs, net_in);
234 run_flow(&mut flow_receiver).await;
235 drop(flow_receiver);
236 assert_eq!(received.len(), 2);
237 assert_eq!(received[0].1, "FOO");
239 assert_eq!(received[1].1, "BAR");
240 }
241
242 #[tokio::test]
246 async fn test_m2m_broadcast() {
247 let src_id = TaglessMemberId::from_raw_id(0);
248 let dst_id = TaglessMemberId::from_raw_id(0);
249 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
250
251 let input = stream::iter(vec!["ping".to_owned()]);
253 let membership = crate::m2m_broadcast::m2m_sender::EmbeddedMembershipStreams {
254 m2m_receiver: stream::iter(vec![(dst_id.clone(), MembershipEvent::Joined)]),
255 };
256 let mut net_out = crate::m2m_broadcast::m2m_sender::EmbeddedNetworkOut {
257 m2m_data: move |item: (TaglessMemberId, Bytes)| {
258 tx.send(item).unwrap();
259 },
260 };
261 let mut flow_sender =
262 crate::m2m_broadcast::m2m_sender(&src_id, membership, input, &mut net_out);
263 run_flow(&mut flow_sender).await;
264 drop(flow_sender);
265
266 let mut tagged_bytes = vec![];
267 while let Ok((id, b)) = rx.try_recv() {
268 assert_eq!(id, dst_id);
269 tagged_bytes.push(Ok((src_id.clone(), BytesMut::from(b.as_ref()))));
270 }
271 assert_eq!(tagged_bytes.len(), 1);
272
273 let net_in = crate::m2m_broadcast::m2m_receiver::EmbeddedNetworkIn {
275 m2m_data: stream::iter(tagged_bytes),
276 };
277 let mut received = vec![];
278 let mut outputs = crate::m2m_broadcast::m2m_receiver::EmbeddedOutputs {
279 output: |s| received.push(s),
280 };
281 let mut flow_receiver = crate::m2m_broadcast::m2m_receiver(&dst_id, &mut outputs, net_in);
282 run_flow(&mut flow_receiver).await;
283 drop(flow_receiver);
284 assert_eq!(received.len(), 1);
285 assert_eq!(received[0].1, "PING");
286 }
287}