Skip to main content

hydro_test_embedded/
lib.rs

1#[cfg(feature = "test_embedded")]
2#[expect(
3    clippy::allow_attributes,
4    clippy::allow_attributes_without_reason,
5    reason = "generated code"
6)]
7#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
8pub mod embedded {
9    include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
10}
11
12#[cfg(feature = "test_embedded")]
13#[expect(
14    clippy::allow_attributes,
15    clippy::allow_attributes_without_reason,
16    reason = "generated code"
17)]
18#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
19pub mod singleton_input {
20    include!(concat!(env!("OUT_DIR"), "/singleton_input.rs"));
21}
22
23#[cfg(feature = "test_embedded")]
24#[expect(
25    clippy::allow_attributes,
26    clippy::allow_attributes_without_reason,
27    reason = "generated code"
28)]
29#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
30pub mod echo_network {
31    include!(concat!(env!("OUT_DIR"), "/echo_network.rs"));
32}
33
34#[cfg(feature = "test_embedded")]
35#[expect(
36    clippy::allow_attributes,
37    clippy::allow_attributes_without_reason,
38    reason = "generated code"
39)]
40#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
41pub mod o2m_broadcast {
42    include!(concat!(env!("OUT_DIR"), "/o2m_broadcast.rs"));
43}
44
45#[cfg(feature = "test_embedded")]
46#[expect(
47    clippy::allow_attributes,
48    clippy::allow_attributes_without_reason,
49    reason = "generated code"
50)]
51#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
52pub mod m2o_send {
53    include!(concat!(env!("OUT_DIR"), "/m2o_send.rs"));
54}
55
56#[cfg(feature = "test_embedded")]
57#[expect(
58    clippy::allow_attributes,
59    clippy::allow_attributes_without_reason,
60    reason = "generated code"
61)]
62#[allow(unused_imports, unused_qualifications, missing_docs, non_snake_case)]
63pub mod m2m_broadcast {
64    include!(concat!(env!("OUT_DIR"), "/m2m_broadcast.rs"));
65}
66
67#[cfg(all(test, feature = "test_embedded"))]
68mod tests {
69    use dfir_rs::bytes::{Bytes, BytesMut};
70    use dfir_rs::futures::stream;
71    use hydro_lang::location::MembershipEvent;
72    use hydro_lang::location::member_id::TaglessMemberId;
73
74    async fn run_flow(
75        flow: &mut dfir_rs::scheduled::context::Dfir<impl dfir_rs::scheduled::context::TickClosure>,
76    ) {
77        tokio::task::LocalSet::new()
78            .run_until(flow.run_tick())
79            .await;
80    }
81
82    // --- capitalize (no networking) ---
83    // Order: (inputs, outputs)
84    #[tokio::test]
85    async fn test_embedded_capitalize() {
86        let input = stream::iter(vec![
87            "hello".to_owned(),
88            "world".to_owned(),
89            "hydro".to_owned(),
90        ]);
91        let mut collected = vec![];
92        let mut outputs = crate::embedded::capitalize::EmbeddedOutputs {
93            output: |s: String| collected.push(s),
94        };
95        let mut flow = crate::embedded::capitalize(input, &mut outputs);
96        run_flow(&mut flow).await;
97        drop(flow);
98        assert_eq!(collected, vec!["HELLO", "WORLD", "HYDRO"]);
99    }
100
101    // --- singleton_input (singleton + stream, no networking) ---
102    // Order: (singleton_inputs, inputs, outputs)
103    #[tokio::test]
104    async fn test_embedded_singleton_input() {
105        let names = stream::iter(vec!["Alice".to_owned(), "Bob".to_owned()]);
106        let mut collected = vec![];
107        let mut outputs = crate::singleton_input::prefix_names::EmbeddedOutputs {
108            output: |s: String| collected.push(s),
109        };
110        let mut flow =
111            crate::singleton_input::prefix_names("Hello".to_owned(), names, &mut outputs);
112        run_flow(&mut flow).await;
113        drop(flow);
114        assert_eq!(collected, vec!["Hello Alice", "Hello Bob"]);
115    }
116
117    // --- echo_network (o2o) ---
118    // sender order: (inputs, network_out)
119    // receiver order: (outputs, network_in)
120    #[tokio::test]
121    async fn test_echo_network() {
122        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
123
124        // Sender: (input, net_out)
125        let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
126        let mut net_out = crate::echo_network::echo_sender::EmbeddedNetworkOut {
127            messages: move |bytes: Bytes| {
128                tx.send(bytes).unwrap();
129            },
130        };
131        let mut flow_sender = crate::echo_network::echo_sender(input, &mut net_out);
132        run_flow(&mut flow_sender).await;
133        drop(flow_sender);
134
135        let mut bytes_vec = vec![];
136        while let Ok(b) = rx.try_recv() {
137            bytes_vec.push(Ok(BytesMut::from(b.as_ref())));
138        }
139        assert_eq!(bytes_vec.len(), 2);
140
141        // Receiver: (outputs, network_in)
142        let net_in = crate::echo_network::echo_receiver::EmbeddedNetworkIn {
143            messages: stream::iter(bytes_vec),
144        };
145        let mut received = vec![];
146        let mut outputs = crate::echo_network::echo_receiver::EmbeddedOutputs {
147            output: |s: String| received.push(s),
148        };
149        let mut flow_receiver = crate::echo_network::echo_receiver(&mut outputs, net_in);
150        run_flow(&mut flow_receiver).await;
151        drop(flow_receiver);
152        assert_eq!(received, vec!["HELLO", "WORLD"]);
153    }
154
155    // --- o2m_broadcast (process -> cluster) ---
156    // sender (process): (membership, inputs, network_out)
157    // receiver (cluster): (self_id, outputs, network_in)
158    #[tokio::test]
159    async fn test_o2m_broadcast() {
160        let member_id = TaglessMemberId::from_raw_id(0);
161        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
162
163        // Sender (process): (membership, input, net_out)
164        let input = stream::iter(vec!["hello".to_owned(), "world".to_owned()]);
165        let membership = crate::o2m_broadcast::o2m_sender::EmbeddedMembershipStreams {
166            o2m_receiver: stream::iter(vec![(member_id.clone(), MembershipEvent::Joined)]),
167        };
168        let mut net_out = crate::o2m_broadcast::o2m_sender::EmbeddedNetworkOut {
169            o2m_data: move |item: (TaglessMemberId, Bytes)| {
170                tx.send(item).unwrap();
171            },
172        };
173        let mut flow_sender = crate::o2m_broadcast::o2m_sender(membership, input, &mut net_out);
174        run_flow(&mut flow_sender).await;
175        drop(flow_sender);
176
177        let mut tagged_bytes = vec![];
178        while let Ok((id, b)) = rx.try_recv() {
179            assert_eq!(id, member_id);
180            tagged_bytes.push(Ok(BytesMut::from(b.as_ref())));
181        }
182        assert_eq!(tagged_bytes.len(), 2);
183
184        // Receiver (cluster): (self_id, outputs, network_in)
185        let net_in = crate::o2m_broadcast::o2m_receiver::EmbeddedNetworkIn {
186            o2m_data: stream::iter(tagged_bytes),
187        };
188        let mut received = vec![];
189        let mut outputs = crate::o2m_broadcast::o2m_receiver::EmbeddedOutputs {
190            output: |s: String| received.push(s),
191        };
192        let mut flow_receiver =
193            crate::o2m_broadcast::o2m_receiver(&member_id, &mut outputs, net_in);
194        run_flow(&mut flow_receiver).await;
195        drop(flow_receiver);
196        assert_eq!(received, vec!["HELLO", "WORLD"]);
197    }
198
199    // --- m2o_send (cluster -> process) ---
200    // sender (cluster): (self_id, inputs, network_out)
201    // receiver (process): (outputs, network_in)
202    #[tokio::test]
203    async fn test_m2o_send() {
204        let member_id = TaglessMemberId::from_raw_id(42);
205        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
206
207        // Sender (cluster): (self_id, input, net_out)
208        let input = stream::iter(vec!["foo".to_owned(), "bar".to_owned()]);
209        let mut net_out = crate::m2o_send::m2o_sender::EmbeddedNetworkOut {
210            m2o_data: move |bytes: Bytes| {
211                tx.send(bytes).unwrap();
212            },
213        };
214        let mut flow_sender = crate::m2o_send::m2o_sender(&member_id, input, &mut net_out);
215        run_flow(&mut flow_sender).await;
216        drop(flow_sender);
217
218        let mut tagged_bytes = vec![];
219        // Wrap as tagged (simulating transport tagging by member id)
220        while let Ok(b) = rx.try_recv() {
221            tagged_bytes.push(Ok((member_id.clone(), BytesMut::from(b.as_ref()))));
222        }
223        assert_eq!(tagged_bytes.len(), 2);
224
225        // Receiver (process): (outputs, network_in)
226        let net_in = crate::m2o_send::m2o_receiver::EmbeddedNetworkIn {
227            m2o_data: stream::iter(tagged_bytes),
228        };
229        let mut received = vec![];
230        let mut outputs = crate::m2o_send::m2o_receiver::EmbeddedOutputs {
231            output: |s| received.push(s),
232        };
233        let mut flow_receiver = crate::m2o_send::m2o_receiver(&mut outputs, net_in);
234        run_flow(&mut flow_receiver).await;
235        drop(flow_receiver);
236        assert_eq!(received.len(), 2);
237        // Values are uppercased; entries() gives (MemberId, String)
238        assert_eq!(received[0].1, "FOO");
239        assert_eq!(received[1].1, "BAR");
240    }
241
242    // --- m2m_broadcast (cluster -> cluster) ---
243    // sender (cluster): (self_id, membership, inputs, network_out)
244    // receiver (cluster): (self_id, outputs, network_in)
245    #[tokio::test]
246    async fn test_m2m_broadcast() {
247        let src_id = TaglessMemberId::from_raw_id(0);
248        let dst_id = TaglessMemberId::from_raw_id(0);
249        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(TaglessMemberId, Bytes)>();
250
251        // Sender (cluster): (self_id, membership, input, net_out)
252        let input = stream::iter(vec!["ping".to_owned()]);
253        let membership = crate::m2m_broadcast::m2m_sender::EmbeddedMembershipStreams {
254            m2m_receiver: stream::iter(vec![(dst_id.clone(), MembershipEvent::Joined)]),
255        };
256        let mut net_out = crate::m2m_broadcast::m2m_sender::EmbeddedNetworkOut {
257            m2m_data: move |item: (TaglessMemberId, Bytes)| {
258                tx.send(item).unwrap();
259            },
260        };
261        let mut flow_sender =
262            crate::m2m_broadcast::m2m_sender(&src_id, membership, input, &mut net_out);
263        run_flow(&mut flow_sender).await;
264        drop(flow_sender);
265
266        let mut tagged_bytes = vec![];
267        while let Ok((id, b)) = rx.try_recv() {
268            assert_eq!(id, dst_id);
269            tagged_bytes.push(Ok((src_id.clone(), BytesMut::from(b.as_ref()))));
270        }
271        assert_eq!(tagged_bytes.len(), 1);
272
273        // Receiver (cluster): (self_id, outputs, network_in)
274        let net_in = crate::m2m_broadcast::m2m_receiver::EmbeddedNetworkIn {
275            m2m_data: stream::iter(tagged_bytes),
276        };
277        let mut received = vec![];
278        let mut outputs = crate::m2m_broadcast::m2m_receiver::EmbeddedOutputs {
279            output: |s| received.push(s),
280        };
281        let mut flow_receiver = crate::m2m_broadcast::m2m_receiver(&dst_id, &mut outputs, net_in);
282        run_flow(&mut flow_receiver).await;
283        drop(flow_receiver);
284        assert_eq!(received.len(), 1);
285        assert_eq!(received[0].1, "PING");
286    }
287}