Skip to main content

hydro_test_embedded/
lib.rs

1#[cfg(feature = "test_embedded")]
2#[expect(
3    clippy::allow_attributes,
4    clippy::allow_attributes_without_reason,
5    reason = "generated code"
6)]
7#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
8pub mod embedded {
9    include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
10}
11
12#[cfg(feature = "test_embedded")]
13#[expect(
14    clippy::allow_attributes,
15    clippy::allow_attributes_without_reason,
16    reason = "generated code"
17)]
18#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
19pub mod singleton_input {
20    include!(concat!(env!("OUT_DIR"), "/singleton_input.rs"));
21}
22
23#[cfg(feature = "test_embedded")]
24#[expect(
25    clippy::allow_attributes,
26    clippy::allow_attributes_without_reason,
27    reason = "generated code"
28)]
29#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
30pub mod echo_network {
31    include!(concat!(env!("OUT_DIR"), "/echo_network.rs"));
32}
33
34#[cfg(feature = "test_embedded")]
35#[expect(
36    clippy::allow_attributes,
37    clippy::allow_attributes_without_reason,
38    reason = "generated code"
39)]
40#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
41pub mod o2m_broadcast {
42    include!(concat!(env!("OUT_DIR"), "/o2m_broadcast.rs"));
43}
44
45#[cfg(feature = "test_embedded")]
46#[expect(
47    clippy::allow_attributes,
48    clippy::allow_attributes_without_reason,
49    reason = "generated code"
50)]
51#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
52pub mod m2o_send {
53    include!(concat!(env!("OUT_DIR"), "/m2o_send.rs"));
54}
55
56#[cfg(feature = "test_embedded")]
57#[expect(
58    clippy::allow_attributes,
59    clippy::allow_attributes_without_reason,
60    reason = "generated code"
61)]
62#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
63pub mod m2m_broadcast {
64    include!(concat!(env!("OUT_DIR"), "/m2m_broadcast.rs"));
65}
66
67#[cfg(all(test, feature = "test_embedded"))]
68mod tests {
69    use dfir_rs::bytes::{Bytes, BytesMut};
70    use dfir_rs::futures::stream;
71    use hydro_lang::location::MembershipEvent;
72    use hydro_lang::location::member_id::TaglessMemberId;
73
74    async fn run_dfir(mut flow: dfir_rs::scheduled::graph::Dfir<'_>) {
75        tokio::task::LocalSet::new()
76            .run_until(flow.run_available())
77            .await;
78    }
79
80    // --- capitalize (no networking) ---
81    // Order: (inputs, outputs)
82    #[tokio::test]
83    async fn test_embedded_capitalize() {
84        let input = stream::iter(vec![
85            "hello".to_owned(),
86            "world".to_owned(),
87            "hydro".to_owned(),
88        ]);
89        let mut collected = vec![];
90        let mut outputs = crate::embedded::capitalize::EmbeddedOutputs {
91            output: |s: String| collected.push(s),
92        };
93        let flow = crate::embedded::capitalize(input, &mut outputs);
94        run_dfir(flow).await;
95        assert_eq!(collected, vec!["HELLO", "WORLD", "HYDRO"]);
96    }
97
98    // --- singleton_input (singleton + stream, no networking) ---
99    // Order: (singleton_inputs, inputs, outputs)
100    #[tokio::test]
101    async fn test_embedded_singleton_input() {
102        let names = stream::iter(vec!["Alice".to_owned(), "Bob".to_owned()]);
103        let mut collected = vec![];
104        let mut outputs = crate::singleton_input::prefix_names::EmbeddedOutputs {
105            output: |s: String| collected.push(s),
106        };
107        let flow = crate::singleton_input::prefix_names("Hello".to_owned(), names, &mut outputs);
108        run_dfir(flow).await;
109        assert_eq!(collected, vec!["Hello Alice", "Hello Bob"]);
110    }
111
112    // --- echo_network (o2o) ---
113    // sender order: (inputs, network_out)
114    // receiver order: (outputs, network_in)
115    #[tokio::test]
116    async fn test_echo_network() {
117        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
118
119        // Sender: (input, net_out)
120        let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
121        let mut net_out = crate::echo_network::echo_sender::EmbeddedNetworkOut {
122            messages: move |bytes: Bytes| {
123                tx.send(bytes).unwrap();
124            },
125        };
126        run_dfir(crate::echo_network::echo_sender(input, &mut net_out)).await;
127
128        let mut bytes_vec = vec![];
129        while let Ok(b) = rx.try_recv() {
130            bytes_vec.push(Ok(BytesMut::from(b.as_ref())));
131        }
132        assert_eq!(bytes_vec.len(), 2);
133
134        // Receiver: (outputs, network_in)
135        let net_in = crate::echo_network::echo_receiver::EmbeddedNetworkIn {
136            messages: stream::iter(bytes_vec),
137        };
138        let mut received = vec![];
139        let mut outputs = crate::echo_network::echo_receiver::EmbeddedOutputs {
140            output: |s: String| received.push(s),
141        };
142        run_dfir(crate::echo_network::echo_receiver(&mut outputs, net_in)).await;
143        assert_eq!(received, vec!["HELLO", "WORLD"]);
144    }
145
146    // --- o2m_broadcast (process -> cluster) ---
147    // sender (process): (membership, inputs, network_out)
148    // receiver (cluster): (self_id, outputs, network_in)
149    #[tokio::test]
150    async fn test_o2m_broadcast() {
151        let member_id = TaglessMemberId::from_raw_id(0);
152        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
153
154        // Sender (process): (membership, input, net_out)
155        let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
156        let membership = crate::o2m_broadcast::o2m_sender::EmbeddedMembershipStreams {
157            o2m_receiver: stream::iter(vec![(member_id.clone(), MembershipEvent::Joined)]),
158        };
159        let mut net_out = crate::o2m_broadcast::o2m_sender::EmbeddedNetworkOut {
160            o2m_data: move |item: (TaglessMemberId, Bytes)| {
161                tx.send(item).unwrap();
162            },
163        };
164        run_dfir(crate::o2m_broadcast::o2m_sender(
165            membership,
166            input,
167            &mut net_out,
168        ))
169        .await;
170
171        let mut tagged_bytes = vec![];
172        while let Ok((id, b)) = rx.try_recv() {
173            assert_eq!(id, member_id);
174            tagged_bytes.push(Ok(BytesMut::from(b.as_ref())));
175        }
176        assert_eq!(tagged_bytes.len(), 2);
177
178        // Receiver (cluster): (self_id, outputs, network_in)
179        let net_in = crate::o2m_broadcast::o2m_receiver::EmbeddedNetworkIn {
180            o2m_data: stream::iter(tagged_bytes),
181        };
182        let mut received = vec![];
183        let mut outputs = crate::o2m_broadcast::o2m_receiver::EmbeddedOutputs {
184            output: |s: String| received.push(s),
185        };
186        run_dfir(crate::o2m_broadcast::o2m_receiver(
187            &member_id,
188            &mut outputs,
189            net_in,
190        ))
191        .await;
192        assert_eq!(received, vec!["HELLO", "WORLD"]);
193    }
194
195    // --- m2o_send (cluster -> process) ---
196    // sender (cluster): (self_id, inputs, network_out)
197    // receiver (process): (outputs, network_in)
198    #[tokio::test]
199    async fn test_m2o_send() {
200        let member_id = TaglessMemberId::from_raw_id(42);
201        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
202
203        // Sender (cluster): (self_id, input, net_out)
204        let input = stream::iter(vec!["foo".to_owned(), "bar".to_owned()]);
205        let mut net_out = crate::m2o_send::m2o_sender::EmbeddedNetworkOut {
206            m2o_data: move |bytes: Bytes| {
207                tx.send(bytes).unwrap();
208            },
209        };
210        run_dfir(crate::m2o_send::m2o_sender(&member_id, input, &mut net_out)).await;
211
212        // Wrap as tagged (simulating transport tagging by member id)
213        let mut tagged_bytes = vec![];
214        while let Ok(b) = rx.try_recv() {
215            tagged_bytes.push(Ok((member_id.clone(), BytesMut::from(b.as_ref()))));
216        }
217        assert_eq!(tagged_bytes.len(), 2);
218
219        // Receiver (process): (outputs, network_in)
220        let net_in = crate::m2o_send::m2o_receiver::EmbeddedNetworkIn {
221            m2o_data: stream::iter(tagged_bytes),
222        };
223        let mut received = vec![];
224        let mut outputs = crate::m2o_send::m2o_receiver::EmbeddedOutputs {
225            output: |s| received.push(s),
226        };
227        run_dfir(crate::m2o_send::m2o_receiver(&mut outputs, net_in)).await;
228        assert_eq!(received.len(), 2);
229        // Values are uppercased; entries() gives (MemberId, String)
230        assert_eq!(received[0].1, "FOO");
231        assert_eq!(received[1].1, "BAR");
232    }
233
234    // --- m2m_broadcast (cluster -> cluster) ---
235    // sender (cluster): (self_id, membership, inputs, network_out)
236    // receiver (cluster): (self_id, outputs, network_in)
237    #[tokio::test]
238    async fn test_m2m_broadcast() {
239        let src_id = TaglessMemberId::from_raw_id(0);
240        let dst_id = TaglessMemberId::from_raw_id(0);
241        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
242
243        // Sender (cluster): (self_id, membership, input, net_out)
244        let input = stream::iter(vec!["ping".to_owned()]);
245        let membership = crate::m2m_broadcast::m2m_sender::EmbeddedMembershipStreams {
246            m2m_receiver: stream::iter(vec![(dst_id.clone(), MembershipEvent::Joined)]),
247        };
248        let mut net_out = crate::m2m_broadcast::m2m_sender::EmbeddedNetworkOut {
249            m2m_data: move |item: (TaglessMemberId, Bytes)| {
250                tx.send(item).unwrap();
251            },
252        };
253        run_dfir(crate::m2m_broadcast::m2m_sender(
254            &src_id,
255            membership,
256            input,
257            &mut net_out,
258        ))
259        .await;
260
261        let mut tagged_bytes = vec![];
262        while let Ok((id, b)) = rx.try_recv() {
263            assert_eq!(id, dst_id);
264            tagged_bytes.push(Ok((src_id.clone(), BytesMut::from(b.as_ref()))));
265        }
266        assert_eq!(tagged_bytes.len(), 1);
267
268        // Receiver (cluster): (self_id, outputs, network_in)
269        let net_in = crate::m2m_broadcast::m2m_receiver::EmbeddedNetworkIn {
270            m2m_data: stream::iter(tagged_bytes),
271        };
272        let mut received = vec![];
273        let mut outputs = crate::m2m_broadcast::m2m_receiver::EmbeddedOutputs {
274            output: |s| received.push(s),
275        };
276        run_dfir(crate::m2m_broadcast::m2m_receiver(
277            &dst_id,
278            &mut outputs,
279            net_in,
280        ))
281        .await;
282        assert_eq!(received.len(), 1);
283        assert_eq!(received[0].1, "PING");
284    }
285}