Skip to main content

hydro_lang/sim/
flow.rs

1//! Entrypoint for compiling and running Hydro simulations.
2
3use std::cell::RefCell;
4use std::collections::{HashMap, HashSet};
5use std::panic::RefUnwindSafe;
6use std::rc::Rc;
7
8use dfir_lang::graph::{DfirGraph, FlatGraphBuilder, FlatGraphBuilderOutput};
9use libloading::Library;
10use slotmap::SparseSecondaryMap;
11
12use super::builder::SimBuilder;
13use super::compiled::{CompiledSim, CompiledSimInstance};
14use super::graph::{SimDeploy, SimExternal, SimNode, compile_sim, create_sim_graph_trybuild};
15use crate::compile::builder::{HandoffId, StmtId};
16use crate::compile::ir::HydroRoot;
17use crate::location::LocationKey;
18use crate::location::dynamic::LocationId;
19use crate::prelude::Cluster;
20use crate::sim::graph::SimExternalPortRegistry;
21use crate::staging_util::Invariant;
22
23/// A not-yet-compiled simulator for a Hydro program.
24pub struct SimFlow<'a> {
25    pub(crate) ir: Vec<HydroRoot>,
26
27    /// SimNode for each Process.
28    pub(crate) processes: SparseSecondaryMap<LocationKey, SimNode>,
29    /// SimNode for each Cluster.
30    pub(crate) clusters: SparseSecondaryMap<LocationKey, SimNode>,
31    /// SimExternal for each External.
32    pub(crate) externals: SparseSecondaryMap<LocationKey, SimExternal>,
33
34    /// Max size of each cluster.
35    pub(crate) cluster_max_sizes: SparseSecondaryMap<LocationKey, usize>,
36    /// Handle to state handling `external`s' ports.
37    pub(crate) externals_port_registry: Rc<RefCell<SimExternalPortRegistry>>,
38
39    /// When true, the simulator only tests safety properties (not liveness).
40    pub(crate) test_safety_only: bool,
41
42    /// When true, consistency assertions are skipped (treated as identity no-ops).
43    /// When false (default), encountering a consistency assertion panics because
44    /// validating consistency assertions is not yet supported in the simulator.
45    pub(crate) skip_consistency_assertions: bool,
46
47    /// Number of iterations to use for fuzzing, defaults to 8192
48    pub(crate) unit_test_fuzz_iterations: usize,
49
50    pub(crate) _phantom: Invariant<'a>,
51}
52
53impl<'a> SimFlow<'a> {
54    /// Sets the maximum size of the given cluster in the simulation.
55    pub fn with_cluster_size<C>(mut self, cluster: &Cluster<'a, C>, max_size: usize) -> Self {
56        self.cluster_max_sizes.insert(cluster.key, max_size);
57        self
58    }
59
60    /// Opts in to safety-only testing, which is required when using
61    /// [`lossy_delayed_forever`](crate::networking::NetworkingConfig::lossy_delayed_forever)
62    /// networking.
63    ///
64    /// The simulator models dropped messages as indefinitely delayed, which means
65    /// it only tests safety properties—not liveness—since messages may never arrive.
66    /// Calling this method acknowledges that the simulation will not verify that the
67    /// program eventually makes progress.
68    pub fn test_safety_only(mut self) -> Self {
69        self.test_safety_only = true;
70        self
71    }
72
73    /// Opts in to skipping consistency assertions. When enabled, `assert_is_consistent`
74    /// nodes are treated as identity no-ops in the simulator. When disabled (the default),
75    /// encountering a consistency assertion will panic because validating consistency
76    /// assertions is not yet supported in the simulator.
77    pub fn skip_consistency_assertions(mut self) -> Self {
78        self.skip_consistency_assertions = true;
79        self
80    }
81
82    /// Sets the number of fuzz iterations for this test. Overrides the
83    /// the default value of 8192
84    pub fn unit_test_fuzz_iterations(mut self, iterations: usize) -> Self {
85        self.unit_test_fuzz_iterations = iterations;
86        self
87    }
88
89    /// Executes the given closure with a single instance of the compiled simulation.
90    pub fn with_instance<T>(self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
91        self.compiled().with_instance(thunk)
92    }
93
94    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
95    /// closure will be repeatedly executed with instances of the Hydro program where the
96    /// batching boundaries, order of messages, and retries are varied.
97    ///
98    /// During development, you should run the test that invokes this function with the `cargo sim`
99    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
100    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
101    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
102    /// be executed, and if no reproducer is found a small number of random executions will be
103    /// performed.
104    pub fn fuzz(self, thunk: impl AsyncFn() + RefUnwindSafe) {
105        self.compiled().fuzz(thunk)
106    }
107
108    /// Exhaustively searches all possible executions of the simulation. The provided
109    /// closure will be repeatedly executed with instances of the Hydro program where the
110    /// batching boundaries, order of messages, and retries are varied.
111    ///
112    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
113    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
114    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
115    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
116    ///
117    /// Returns the number of distinct executions explored.
118    pub fn exhaustive(self, thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
119        self.compiled().exhaustive(thunk)
120    }
121
122    /// Compiles the simulation into a dynamically loadable library, and returns a handle to it.
123    pub fn compiled(mut self) -> CompiledSim {
124        use std::collections::BTreeMap;
125
126        use dfir_lang::graph::{eliminate_extra_unions_tees, partition_graph};
127
128        let mut sim_emit = SimBuilder {
129            process_graphs: BTreeMap::new(),
130            cluster_graphs: BTreeMap::new(),
131            process_tick_dfirs: BTreeMap::new(),
132            cluster_tick_dfirs: BTreeMap::new(),
133            extra_stmts_global: vec![],
134            extra_stmts_cluster: BTreeMap::new(),
135            next_hoff_id: HandoffId::default(),
136            test_safety_only: self.test_safety_only,
137            skip_consistency_assertions: self.skip_consistency_assertions,
138        };
139
140        // Ensure the default (0) external is always present.
141        self.externals.insert(
142            LocationKey::FIRST,
143            SimExternal {
144                shared_inner: self.externals_port_registry.clone(),
145            },
146        );
147
148        let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
149        let mut seen_cluster_members = HashSet::new();
150        self.ir.iter_mut().for_each(|leaf| {
151            leaf.compile_network::<SimDeploy>(
152                &mut SparseSecondaryMap::new(),
153                &mut seen_tees_instantiate,
154                &mut seen_cluster_members,
155                &self.processes,
156                &self.clusters,
157                &self.externals,
158                &mut (),
159            );
160        });
161
162        let mut seen_tees = HashMap::new();
163        let mut built_tees = HashMap::new();
164        let mut next_stmt_id = StmtId::default();
165        let mut fold_hooked_idents = HashSet::new();
166        for leaf in &mut self.ir {
167            leaf.emit(
168                &mut sim_emit,
169                &mut seen_tees,
170                &mut built_tees,
171                &mut next_stmt_id,
172                &mut fold_hooked_idents,
173            );
174        }
175
176        fn build_graphs(
177            graphs: BTreeMap<LocationId, FlatGraphBuilder>,
178        ) -> BTreeMap<LocationId, DfirGraph> {
179            graphs
180                .into_iter()
181                .map(|(l, g)| {
182                    let FlatGraphBuilderOutput { mut flat_graph, .. } =
183                        g.build().expect("Failed to build DFIR flat graph.");
184                    eliminate_extra_unions_tees(&mut flat_graph);
185                    (
186                        l,
187                        partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
188                    )
189                })
190                .collect()
191        }
192
193        let process_graphs = build_graphs(sim_emit.process_graphs);
194        let cluster_graphs = build_graphs(sim_emit.cluster_graphs);
195        let process_tick_graphs = build_graphs(sim_emit.process_tick_dfirs);
196        let cluster_tick_graphs = build_graphs(sim_emit.cluster_tick_dfirs);
197
198        #[expect(
199            clippy::disallowed_methods,
200            reason = "nondeterministic iteration order, fine for checks"
201        )]
202        for c in self.clusters.keys() {
203            assert!(
204                self.cluster_max_sizes.contains_key(c),
205                "Cluster {:?} missing max size; call with_cluster_size() before compiled()",
206                c
207            );
208        }
209
210        let (bin, trybuild) = create_sim_graph_trybuild(
211            process_graphs,
212            cluster_graphs,
213            self.cluster_max_sizes,
214            process_tick_graphs,
215            cluster_tick_graphs,
216            sim_emit.extra_stmts_global,
217            sim_emit.extra_stmts_cluster,
218        );
219
220        let out = compile_sim(bin, trybuild).unwrap();
221        let lib = unsafe { Library::new(&out).unwrap() };
222
223        CompiledSim {
224            _path: out,
225            lib,
226            externals_port_registry: self.externals_port_registry.take(),
227            unit_test_fuzz_iterations: self.unit_test_fuzz_iterations,
228        }
229    }
230}