hydro_lang/
runtime_context.rs

1use dfir_rs::scheduled::context::Context;
2use quote::quote;
3use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
4
5use crate::Location;
6
7pub static RUNTIME_CONTEXT: RuntimeContext = RuntimeContext { _private: &() };
8
9#[derive(Clone, Copy)]
10pub struct RuntimeContext<'a> {
11    _private: &'a (),
12}
13
14impl<'a, L> FreeVariableWithContext<L> for RuntimeContext<'a>
15where
16    L: Location<'a>,
17{
18    type O = &'a Context;
19
20    fn to_tokens(self, _ctx: &L) -> QuoteTokens {
21        QuoteTokens {
22            prelude: None,
23            expr: Some(quote!(&context)),
24        }
25    }
26}
27
28#[cfg(test)]
29mod tests {
30    use futures::StreamExt;
31    use hydro_deploy::Deployment;
32
33    use crate::*;
34
35    struct P1 {}
36
37    #[tokio::test]
38    async fn runtime_context() {
39        let mut deployment = Deployment::new();
40
41        let flow = FlowBuilder::new();
42        let node = flow.process::<P1>();
43        let external = flow.external_process::<()>();
44
45        let out_port = node
46            .source_iter(q!(0..5))
47            .map(q!(|v| (v, RUNTIME_CONTEXT.current_tick().0)))
48            .send_bincode_external(&external);
49
50        let nodes = flow
51            .with_process(&node, deployment.Localhost())
52            .with_external(&external, deployment.Localhost())
53            .deploy(&mut deployment);
54
55        deployment.deploy().await.unwrap();
56
57        let mut external_out = nodes.connect_source_bincode(out_port).await;
58
59        deployment.start().await.unwrap();
60
61        for i in 0..5 {
62            assert_eq!(external_out.next().await.unwrap(), (i, 0));
63        }
64    }
65}