hydro_lang/
runtime_context.rs
1use dfir_rs::scheduled::context::Context;
2use proc_macro2::TokenStream;
3use quote::quote;
4use stageleft::runtime_support::FreeVariableWithContext;
5
6use crate::Location;
7
8pub static RUNTIME_CONTEXT: RuntimeContext = RuntimeContext { _private: &() };
9
10#[derive(Clone, Copy)]
11pub struct RuntimeContext<'a> {
12 _private: &'a (),
13}
14
15impl<'a, L: Location<'a>> FreeVariableWithContext<L> for RuntimeContext<'a> {
16 type O = &'a Context;
17
18 fn to_tokens(self, _ctx: &L) -> (Option<TokenStream>, Option<TokenStream>) {
19 (None, Some(quote!(&context)))
20 }
21}
22
23#[cfg(test)]
24mod tests {
25 use dfir_rs::futures::StreamExt;
26 use hydro_deploy::Deployment;
27
28 use crate::*;
29
30 struct P1 {}
31
32 #[tokio::test]
33 async fn runtime_context() {
34 let mut deployment = Deployment::new();
35
36 let flow = FlowBuilder::new();
37 let node = flow.process::<P1>();
38 let external = flow.external_process::<()>();
39
40 let out_port = node
41 .source_iter(q!(0..5))
42 .map(q!(|v| (v, RUNTIME_CONTEXT.current_tick().0)))
43 .send_bincode_external(&external);
44
45 let nodes = flow
46 .with_process(&node, deployment.Localhost())
47 .with_external(&external, deployment.Localhost())
48 .deploy(&mut deployment);
49
50 deployment.deploy().await.unwrap();
51
52 let mut external_out = nodes.connect_source_bincode(out_port).await;
53
54 deployment.start().await.unwrap();
55
56 for i in 0..5 {
57 assert_eq!(external_out.next().await.unwrap(), (i, 0));
58 }
59 }
60}