hydro_lang/
runtime_context.rs

1use dfir_rs::scheduled::context::Context;
2use proc_macro2::TokenStream;
3use quote::quote;
4use stageleft::runtime_support::FreeVariableWithContext;
5
6use crate::Location;
7
8pub static RUNTIME_CONTEXT: RuntimeContext = RuntimeContext { _private: &() };
9
10#[derive(Clone, Copy)]
11pub struct RuntimeContext<'a> {
12    _private: &'a (),
13}
14
15impl<'a, L: Location<'a>> FreeVariableWithContext<L> for RuntimeContext<'a> {
16    type O = &'a Context;
17
18    fn to_tokens(self, _ctx: &L) -> (Option<TokenStream>, Option<TokenStream>) {
19        (None, Some(quote!(&context)))
20    }
21}
22
23#[cfg(test)]
24mod tests {
25    use dfir_rs::futures::StreamExt;
26    use hydro_deploy::Deployment;
27
28    use crate::*;
29
30    struct P1 {}
31
32    #[tokio::test]
33    async fn runtime_context() {
34        let mut deployment = Deployment::new();
35
36        let flow = FlowBuilder::new();
37        let node = flow.process::<P1>();
38        let external = flow.external_process::<()>();
39
40        let out_port = node
41            .source_iter(q!(0..5))
42            .map(q!(|v| (v, RUNTIME_CONTEXT.current_tick().0)))
43            .send_bincode_external(&external);
44
45        let nodes = flow
46            .with_process(&node, deployment.Localhost())
47            .with_external(&external, deployment.Localhost())
48            .deploy(&mut deployment);
49
50        deployment.deploy().await.unwrap();
51
52        let mut external_out = nodes.connect_source_bincode(out_port).await;
53
54        deployment.start().await.unwrap();
55
56        for i in 0..5 {
57            assert_eq!(external_out.next().await.unwrap(), (i, 0));
58        }
59    }
60}