hydro_lang/sim/
flow.rs

1//! Entrypoint for compiling and running Hydro simulations.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::panic::RefUnwindSafe;
6use std::rc::Rc;
7
8use libloading::Library;
9
10use super::builder::SimBuilder;
11use super::compiled::{CompiledSim, CompiledSimInstance};
12use super::graph::{SimDeploy, SimExternal, SimNode, compile_sim, create_sim_graph_trybuild};
13use crate::compile::ir::HydroRoot;
14use crate::location::Location;
15use crate::location::dynamic::LocationId;
16use crate::prelude::Cluster;
17use crate::staging_util::Invariant;
18
19/// A not-yet-compiled simulator for a Hydro program.
20pub struct SimFlow<'a> {
21    pub(crate) ir: Vec<HydroRoot>,
22
23    pub(crate) external_ports: Rc<RefCell<(Vec<usize>, usize)>>,
24
25    pub(crate) processes: HashMap<usize, SimNode>,
26    pub(crate) clusters: HashMap<usize, SimNode>,
27    pub(crate) externals: HashMap<usize, SimExternal>,
28
29    /// A mapping from external "keys", which are used for looking up connections, to the IDs
30    /// of the external channels created in the simulation.
31    pub(crate) external_registered: Rc<RefCell<HashMap<usize, usize>>>,
32
33    pub(crate) cluster_max_sizes: HashMap<LocationId, usize>,
34
35    /// Lists all the processes that were created in the flow, same ID as `processes`
36    /// but with the type name of the tag.
37    pub(crate) _process_id_name: Vec<(usize, String)>,
38    pub(crate) _external_id_name: Vec<(usize, String)>,
39    pub(crate) _cluster_id_name: Vec<(usize, String)>,
40
41    pub(crate) _phantom: Invariant<'a>,
42}
43
44impl<'a> SimFlow<'a> {
45    /// Sets the maximum size of the given cluster in the simulation.
46    pub fn with_cluster_size<C>(mut self, cluster: &Cluster<'a, C>, max_size: usize) -> Self {
47        self.cluster_max_sizes.insert(cluster.id(), max_size);
48        self
49    }
50
51    /// Executes the given closure with a single instance of the compiled simulation.
52    pub fn with_instance<T>(self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
53        self.compiled().with_instance(thunk)
54    }
55
56    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
57    /// closure will be repeatedly executed with instances of the Hydro program where the
58    /// batching boundaries, order of messages, and retries are varied.
59    ///
60    /// During development, you should run the test that invokes this function with the `cargo sim`
61    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
62    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
63    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
64    /// be executed, and if no reproducer is found a small number of random executions will be
65    /// performed.
66    pub fn fuzz(self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
67        self.compiled().fuzz(thunk)
68    }
69
70    /// Exhaustively searches all possible executions of the simulation. The provided
71    /// closure will be repeatedly executed with instances of the Hydro program where the
72    /// batching boundaries, order of messages, and retries are varied.
73    ///
74    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
75    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
76    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
77    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
78    ///
79    /// Returns the number of distinct executions explored.
80    pub fn exhaustive(self, thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe) -> usize {
81        self.compiled().exhaustive(thunk)
82    }
83
84    /// Compiles the simulation into a dynamically loadable library, and returns a handle to it.
85    pub fn compiled(mut self) -> CompiledSim {
86        use std::collections::BTreeMap;
87
88        use dfir_lang::graph::{eliminate_extra_unions_tees, partition_graph};
89
90        let mut sim_emit = SimBuilder {
91            process_graphs: BTreeMap::new(),
92            cluster_graphs: BTreeMap::new(),
93            process_tick_dfirs: BTreeMap::new(),
94            cluster_tick_dfirs: BTreeMap::new(),
95            extra_stmts_global: vec![],
96            extra_stmts_cluster: BTreeMap::new(),
97            next_hoff_id: 0,
98        };
99
100        let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
101        self.ir.iter_mut().for_each(|leaf| {
102            leaf.compile_network::<SimDeploy>(
103                &(),
104                &mut BTreeMap::new(),
105                &mut seen_tees_instantiate,
106                &self.processes,
107                &self.clusters,
108                &self.externals,
109            );
110        });
111
112        let mut built_tees = HashMap::new();
113        let mut next_stmt_id = 0;
114        for leaf in &mut self.ir {
115            leaf.emit(&mut sim_emit, &mut built_tees, &mut next_stmt_id);
116        }
117
118        let process_graphs = sim_emit
119            .process_graphs
120            .into_iter()
121            .map(|(l, g)| {
122                let (mut flat_graph, _, _) = g.build();
123                eliminate_extra_unions_tees(&mut flat_graph);
124                (
125                    l,
126                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
127                )
128            })
129            .collect::<BTreeMap<_, _>>();
130
131        let cluster_graphs = sim_emit
132            .cluster_graphs
133            .into_iter()
134            .map(|(l, g)| {
135                let (mut flat_graph, _, _) = g.build();
136                eliminate_extra_unions_tees(&mut flat_graph);
137                (
138                    l,
139                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
140                )
141            })
142            .collect::<BTreeMap<_, _>>();
143
144        let process_tick_graphs = sim_emit
145            .process_tick_dfirs
146            .into_iter()
147            .map(|(l, g)| {
148                let (mut flat_graph, _, _) = g.build();
149                eliminate_extra_unions_tees(&mut flat_graph);
150                (
151                    l,
152                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
153                )
154            })
155            .collect::<BTreeMap<_, _>>();
156
157        let cluster_tick_graphs = sim_emit
158            .cluster_tick_dfirs
159            .into_iter()
160            .map(|(l, g)| {
161                let (mut flat_graph, _, _) = g.build();
162                eliminate_extra_unions_tees(&mut flat_graph);
163                (
164                    l,
165                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
166                )
167            })
168            .collect::<BTreeMap<_, _>>();
169
170        for c in self.clusters.keys() {
171            assert!(
172                self.cluster_max_sizes
173                    .contains_key(&LocationId::Cluster(*c)),
174                "Cluster {:?} missing max size; call with_cluster_size() before compiled()",
175                c
176            );
177        }
178
179        let (bin, trybuild) = create_sim_graph_trybuild(
180            process_graphs,
181            cluster_graphs,
182            self.cluster_max_sizes,
183            process_tick_graphs,
184            cluster_tick_graphs,
185            sim_emit.extra_stmts_global,
186            sim_emit.extra_stmts_cluster,
187        );
188
189        let out = compile_sim(bin, trybuild).unwrap();
190        let lib = unsafe { Library::new(&out).unwrap() };
191
192        let external_ports = self.external_ports.take().0;
193        CompiledSim {
194            _path: out,
195            lib,
196            external_ports,
197            external_registered: self.external_registered.take(),
198        }
199    }
200}