hydro_lang/rewrites/
decoupler.rs

1use proc_macro2::Span;
2
3use crate::ir::*;
4use crate::location::LocationId;
5use crate::stream::{deserialize_bincode_with_type, serialize_bincode_with_type};
6
7pub struct Decoupler {
8    pub nodes_to_decouple: Vec<usize>,
9    pub new_location: LocationId,
10}
11
12fn decouple_node(node: &mut HydroNode, decoupler: &Decoupler, next_stmt_id: &mut usize) {
13    let metadata = node.metadata().clone();
14    if decoupler.nodes_to_decouple.contains(next_stmt_id) {
15        println!("Decoupling node {} {}", next_stmt_id, node.print_root());
16
17        let output_debug_type = metadata.output_type.clone().unwrap();
18
19        let parent_id = match metadata.location_kind {
20            LocationId::Cluster(id) => id,
21            _ => std::panic!(
22                "Expected parent location to be a cluster, got {:?}",
23                metadata.location_kind
24            ),
25        };
26        let node_content = std::mem::replace(node, HydroNode::Placeholder);
27
28        // Map from b to (ClusterId, b), where ClusterId is the id of the decoupled node we're sending to
29        let ident = syn::Ident::new(
30            &format!("__hydro_lang_cluster_self_id_{}", parent_id),
31            Span::call_site(),
32        );
33        let f: syn::Expr = syn::parse_quote!(|b| (
34            ClusterId::<()>::from_raw(#ident),
35            b
36        ));
37        let mapped_node = HydroNode::Map {
38            f: f.into(),
39            input: Box::new(node_content),
40            metadata: metadata.clone(),
41        };
42
43        // Set up the network node
44        let network_metadata = HydroIrMetadata {
45            location_kind: decoupler.new_location.clone(),
46            output_type: Some(output_debug_type.clone()),
47            cardinality: None,
48            cpu_usage: None,
49        };
50        let output_type = output_debug_type.0;
51        let network_node = HydroNode::Network {
52            from_key: None,
53            to_location: decoupler.new_location.clone(),
54            to_key: None,
55            serialize_fn: Some(serialize_bincode_with_type(true, output_type.clone()))
56                .map(|e| e.into()),
57            instantiate_fn: DebugInstantiate::Building,
58            deserialize_fn: Some(deserialize_bincode_with_type(
59                Some(stageleft::quote_type::<()>()),
60                output_type.clone(),
61            ))
62            .map(|e| e.into()),
63            input: Box::new(mapped_node),
64            metadata: network_metadata.clone(),
65        };
66
67        // Map again to remove the cluster Id (mimicking send_anonymous)
68        let f: syn::Expr = syn::parse_quote!(|(_, b)| b);
69        let mapped_node = HydroNode::Map {
70            f: f.into(),
71            input: Box::new(network_node),
72            metadata: network_metadata,
73        };
74        *node = mapped_node;
75    }
76}
77
78/// Limitations: Cannot decouple across a cycle. Can only decouple clusters (not processes).
79pub fn decouple(ir: &mut [HydroLeaf], decoupler: &Decoupler) {
80    traverse_dfir(
81        ir,
82        |_, _| {},
83        |node, next_stmt_id| {
84            decouple_node(node, decoupler, next_stmt_id);
85        },
86    );
87}