hydro_lang/sim/
flow.rs

1//! Entrypoint for compiling and running Hydro simulations.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::panic::RefUnwindSafe;
6use std::rc::Rc;
7
8use libloading::Library;
9
10use super::builder::SimBuilder;
11use super::compiled::{CompiledSim, CompiledSimInstance};
12use super::graph::{SimDeploy, SimExternal, SimNode, compile_sim, create_sim_graph_trybuild};
13use crate::compile::ir::HydroRoot;
14use crate::location::Location;
15use crate::location::dynamic::LocationId;
16use crate::prelude::Cluster;
17use crate::sim::graph::SimExternalPortRegistry;
18use crate::staging_util::Invariant;
19
20/// A not-yet-compiled simulator for a Hydro program.
21pub struct SimFlow<'a> {
22    pub(crate) ir: Vec<HydroRoot>,
23
24    pub(crate) processes: HashMap<usize, SimNode>,
25    pub(crate) clusters: HashMap<usize, SimNode>,
26    pub(crate) externals: HashMap<usize, SimExternal>,
27
28    pub(crate) externals_port_registry: Rc<RefCell<SimExternalPortRegistry>>,
29
30    pub(crate) cluster_max_sizes: HashMap<LocationId, usize>,
31
32    /// Lists all the processes that were created in the flow, same ID as `processes`
33    /// but with the type name of the tag.
34    pub(crate) _process_id_name: Vec<(usize, String)>,
35    pub(crate) _external_id_name: Vec<(usize, String)>,
36    pub(crate) _cluster_id_name: Vec<(usize, String)>,
37
38    pub(crate) _phantom: Invariant<'a>,
39}
40
41impl<'a> SimFlow<'a> {
42    /// Sets the maximum size of the given cluster in the simulation.
43    pub fn with_cluster_size<C>(mut self, cluster: &Cluster<'a, C>, max_size: usize) -> Self {
44        self.cluster_max_sizes.insert(cluster.id(), max_size);
45        self
46    }
47
48    /// Executes the given closure with a single instance of the compiled simulation.
49    pub fn with_instance<T>(self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
50        self.compiled().with_instance(thunk)
51    }
52
53    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
54    /// closure will be repeatedly executed with instances of the Hydro program where the
55    /// batching boundaries, order of messages, and retries are varied.
56    ///
57    /// During development, you should run the test that invokes this function with the `cargo sim`
58    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
59    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
60    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
61    /// be executed, and if no reproducer is found a small number of random executions will be
62    /// performed.
63    pub fn fuzz(self, thunk: impl AsyncFn() + RefUnwindSafe) {
64        self.compiled().fuzz(thunk)
65    }
66
67    /// Exhaustively searches all possible executions of the simulation. The provided
68    /// closure will be repeatedly executed with instances of the Hydro program where the
69    /// batching boundaries, order of messages, and retries are varied.
70    ///
71    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
72    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
73    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
74    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
75    ///
76    /// Returns the number of distinct executions explored.
77    pub fn exhaustive(self, thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
78        self.compiled().exhaustive(thunk)
79    }
80
81    /// Compiles the simulation into a dynamically loadable library, and returns a handle to it.
82    pub fn compiled(mut self) -> CompiledSim {
83        use std::collections::BTreeMap;
84
85        use dfir_lang::graph::{eliminate_extra_unions_tees, partition_graph};
86
87        let mut sim_emit = SimBuilder {
88            process_graphs: BTreeMap::new(),
89            cluster_graphs: BTreeMap::new(),
90            process_tick_dfirs: BTreeMap::new(),
91            cluster_tick_dfirs: BTreeMap::new(),
92            extra_stmts_global: vec![],
93            extra_stmts_cluster: BTreeMap::new(),
94            next_hoff_id: 0,
95        };
96
97        // Ensure the default (0) external is always present.
98        self.externals.insert(
99            0,
100            SimExternal {
101                shared_inner: self.externals_port_registry.clone(),
102            },
103        );
104
105        let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
106        self.ir.iter_mut().for_each(|leaf| {
107            leaf.compile_network::<SimDeploy>(
108                &mut BTreeMap::new(),
109                &mut seen_tees_instantiate,
110                &self.processes,
111                &self.clusters,
112                &self.externals,
113            );
114        });
115
116        let mut seen_tees = HashMap::new();
117        let mut built_tees = HashMap::new();
118        let mut next_stmt_id = 0;
119        for leaf in &mut self.ir {
120            leaf.emit::<SimDeploy>(
121                &mut sim_emit,
122                &mut seen_tees,
123                &mut built_tees,
124                &mut next_stmt_id,
125            );
126        }
127
128        let process_graphs = sim_emit
129            .process_graphs
130            .into_iter()
131            .map(|(l, g)| {
132                let (mut flat_graph, _, _) = g.build();
133                eliminate_extra_unions_tees(&mut flat_graph);
134                (
135                    l,
136                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
137                )
138            })
139            .collect::<BTreeMap<_, _>>();
140
141        let cluster_graphs = sim_emit
142            .cluster_graphs
143            .into_iter()
144            .map(|(l, g)| {
145                let (mut flat_graph, _, _) = g.build();
146                eliminate_extra_unions_tees(&mut flat_graph);
147                (
148                    l,
149                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
150                )
151            })
152            .collect::<BTreeMap<_, _>>();
153
154        let process_tick_graphs = sim_emit
155            .process_tick_dfirs
156            .into_iter()
157            .map(|(l, g)| {
158                let (mut flat_graph, _, _) = g.build();
159                eliminate_extra_unions_tees(&mut flat_graph);
160                (
161                    l,
162                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
163                )
164            })
165            .collect::<BTreeMap<_, _>>();
166
167        let cluster_tick_graphs = sim_emit
168            .cluster_tick_dfirs
169            .into_iter()
170            .map(|(l, g)| {
171                let (mut flat_graph, _, _) = g.build();
172                eliminate_extra_unions_tees(&mut flat_graph);
173                (
174                    l,
175                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
176                )
177            })
178            .collect::<BTreeMap<_, _>>();
179
180        for c in self.clusters.keys() {
181            assert!(
182                self.cluster_max_sizes
183                    .contains_key(&LocationId::Cluster(*c)),
184                "Cluster {:?} missing max size; call with_cluster_size() before compiled()",
185                c
186            );
187        }
188
189        let (bin, trybuild) = create_sim_graph_trybuild(
190            process_graphs,
191            cluster_graphs,
192            self.cluster_max_sizes,
193            process_tick_graphs,
194            cluster_tick_graphs,
195            sim_emit.extra_stmts_global,
196            sim_emit.extra_stmts_cluster,
197        );
198
199        let out = compile_sim(bin, trybuild).unwrap();
200        let lib = unsafe { Library::new(&out).unwrap() };
201
202        CompiledSim {
203            _path: out,
204            lib,
205            externals_port_registry: self.externals_port_registry.take(),
206        }
207    }
208}