hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimReceiver, SimSender};
27use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
28use crate::location::dynamic::LocationId;
29
30struct SimConnections {
31    input_senders: HashMap<usize, Rc<UnboundedSender<Bytes>>>,
32    output_receivers: HashMap<usize, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
33    external_registered: HashMap<usize, usize>,
34}
35
36tokio::task_local! {
37    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
38}
39
40/// A handle to a compiled Hydro simulation, which can be instantiated and run.
41pub struct CompiledSim {
42    pub(super) _path: TempPath,
43    pub(super) lib: Library,
44    pub(super) external_ports: Vec<usize>,
45    pub(super) external_registered: HashMap<usize, usize>,
46}
47
48#[sealed::sealed]
49/// A trait implemented by closures that can instantiate a compiled simulation.
50///
51/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
52pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
53#[sealed::sealed]
54impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
55
56fn null_handler(_args: fmt::Arguments) {}
57
58fn println_handler(args: fmt::Arguments) {
59    println!("{}", args);
60}
61
62fn eprintln_handler(args: fmt::Arguments) {
63    eprintln!("{}", args);
64}
65
66/// Creates a simulation instance, returning:
67/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
68/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
69/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
70/// - A mapping of inline hooks for non-deterministic decisions inside ticks
71type SimLoaded<'a> = libloading::Symbol<
72    'a,
73    unsafe extern "Rust" fn(
74        bool,
75        HashMap<usize, UnboundedSender<Bytes>>,
76        HashMap<usize, UnboundedReceiverStream<Bytes>>,
77        fn(fmt::Arguments<'_>),
78        fn(fmt::Arguments<'_>),
79    ) -> (
80        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
81        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
82        Hooks<&'static str>,
83        InlineHooks<&'static str>,
84    ),
85>;
86
87impl CompiledSim {
88    /// Executes the given closure with a single instance of the compiled simulation.
89    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
90        self.with_instantiator(|instantiator| thunk(instantiator()), true)
91    }
92
93    /// Executes the given closure with an [`Instantiator`], which can be called to create
94    /// independent instances of the simulation. This is useful for fuzzing, where we need to
95    /// re-execute the simulation several times with different decisions.
96    ///
97    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
98    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
99    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
100    pub fn with_instantiator<T>(
101        &self,
102        thunk: impl FnOnce(&dyn Instantiator) -> T,
103        always_log: bool,
104    ) -> T {
105        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
106        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
107        thunk(
108            &(|| CompiledSimInstance {
109                func: func.clone(),
110                remaining_ports: self.external_ports.iter().cloned().collect(),
111                external_registered: self.external_registered.clone(),
112                input_ports: HashMap::new(),
113                output_ports: HashMap::new(),
114                log,
115            }),
116        )
117    }
118
119    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
120    /// closure will be repeatedly executed with instances of the Hydro program where the
121    /// batching boundaries, order of messages, and retries are varied.
122    ///
123    /// During development, you should run the test that invokes this function with the `cargo sim`
124    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
125    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
126    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
127    /// be executed, and if no reproducer is found a small number of random executions will be
128    /// performed.
129    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
130        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
131            .elements()
132            .into_iter()
133            .find(|e| {
134                !e.fn_name.starts_with("hydro_lang::sim::compiled")
135                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
136                    && !e.fn_name.starts_with("fuzz<")
137            })
138            .unwrap();
139
140        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
141        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
142
143        let caller_fuzz_repro_path = repro_folder
144            .join(caller_fn.fn_name.replace("::", "__"))
145            .with_extension("bin");
146
147        if std::env::var("BOLERO_FUZZER").is_ok() {
148            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
149            std::fs::create_dir_all(&corpus_dir).unwrap();
150            let libfuzzer_args = format!(
151                "{} {} -artifact_prefix={}/ -handle_abrt=0",
152                corpus_dir.to_str().unwrap(),
153                corpus_dir.to_str().unwrap(),
154                corpus_dir.to_str().unwrap(),
155            );
156
157            std::fs::create_dir_all(&repro_folder).unwrap();
158
159            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
160                unsafe {
161                    std::env::set_var(
162                        "BOLERO_FAILURE_OUTPUT",
163                        caller_fuzz_repro_path.to_str().unwrap(),
164                    );
165                }
166            }
167
168            unsafe {
169                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
170            }
171
172            self.with_instantiator(
173                |instantiator| {
174                    bolero::test(bolero::TargetLocation {
175                        package_name: "",
176                        manifest_dir: "",
177                        module_path: "",
178                        file: "",
179                        line: 0,
180                        item_path: "<unknown>::__bolero_item_path__",
181                        test_name: None,
182                    })
183                    .run_with_replay(move |is_replay| {
184                        let mut instance = instantiator();
185
186                        if instance.log {
187                            eprintln!(
188                                "{}",
189                                "\n==== New Simulation Instance ===="
190                                    .color(colored::Color::Cyan)
191                                    .bold()
192                            );
193                        }
194
195                        if is_replay {
196                            instance.log = true;
197                        }
198
199                        tokio::runtime::Builder::new_current_thread()
200                            .build()
201                            .unwrap()
202                            .block_on(async { instance.run(&mut thunk).await })
203                    })
204                },
205                false,
206            );
207        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
208            self.fuzz_repro(existing_bytes, async |compiled| {
209                compiled.launch();
210                thunk().await
211            });
212        } else {
213            eprintln!(
214                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
215                caller_fuzz_repro_path.display()
216            );
217            self.with_instantiator(
218                |instantiator| {
219                    bolero::test(bolero::TargetLocation {
220                        package_name: "",
221                        manifest_dir: "",
222                        module_path: "",
223                        file: ".",
224                        line: 0,
225                        item_path: "<unknown>::__bolero_item_path__",
226                        test_name: None,
227                    })
228                    .with_iterations(8192)
229                    .run(move || {
230                        let instance = instantiator();
231                        tokio::runtime::Builder::new_current_thread()
232                            .build()
233                            .unwrap()
234                            .block_on(async { instance.run(&mut thunk).await })
235                    })
236                },
237                false,
238            );
239        }
240    }
241
242    /// Executes the given closure with a single instance of the compiled simulation, using the
243    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
244    /// failure found during fuzzing.
245    pub fn fuzz_repro<'a>(
246        &'a self,
247        bytes: Vec<u8>,
248        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
249    ) {
250        self.with_instance(|instance| {
251            bolero::bolero_engine::any::scope::with(
252                Box::new(bolero::bolero_engine::driver::object::Object(
253                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
254                )),
255                || {
256                    tokio::runtime::Builder::new_current_thread()
257                        .build()
258                        .unwrap()
259                        .block_on(async { instance.run_without_launching(thunk).await })
260                },
261            )
262        });
263    }
264
265    /// Exhaustively searches all possible executions of the simulation. The provided
266    /// closure will be repeatedly executed with instances of the Hydro program where the
267    /// batching boundaries, order of messages, and retries are varied.
268    ///
269    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
270    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
271    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
272    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
273    ///
274    /// Returns the number of distinct executions explored.
275    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
276        if std::env::var("BOLERO_FUZZER").is_ok() {
277            eprintln!(
278                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
279            );
280            std::process::abort();
281        }
282
283        let mut count = 0;
284        let count_mut = &mut count;
285
286        self.with_instantiator(
287            |instantiator| {
288                bolero::test(bolero::TargetLocation {
289                    package_name: "",
290                    manifest_dir: "",
291                    module_path: "",
292                    file: "",
293                    line: 0,
294                    item_path: "<unknown>::__bolero_item_path__",
295                    test_name: None,
296                })
297                .exhaustive()
298                .run_with_replay(move |is_replay| {
299                    *count_mut += 1;
300
301                    let mut instance = instantiator();
302                    if instance.log {
303                        eprintln!(
304                            "{}",
305                            "\n==== New Simulation Instance ===="
306                                .color(colored::Color::Cyan)
307                                .bold()
308                        );
309                    }
310
311                    if is_replay {
312                        instance.log = true;
313                    }
314
315                    tokio::runtime::Builder::new_current_thread()
316                        .build()
317                        .unwrap()
318                        .block_on(async { instance.run(&mut thunk).await })
319                })
320            },
321            false,
322        );
323
324        count
325    }
326}
327
328/// A single instance of a compiled Hydro simulation, which provides methods to interactively
329/// execute the simulation, feed inputs, and receive outputs.
330pub struct CompiledSimInstance<'a> {
331    func: SimLoaded<'a>,
332    remaining_ports: HashSet<usize>,
333    external_registered: HashMap<usize, usize>,
334    output_ports: HashMap<usize, UnboundedSender<Bytes>>,
335    input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
336    log: bool,
337}
338
339impl<'a> CompiledSimInstance<'a> {
340    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
341        self.run_without_launching(async |instance| {
342            instance.launch();
343            thunk().await;
344        })
345        .await;
346    }
347
348    async fn run_without_launching(
349        mut self,
350        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
351    ) {
352        let mut input_senders = HashMap::new();
353        let mut output_receivers = HashMap::new();
354        for remaining in &self.remaining_ports {
355            {
356                let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
357                self.output_ports.insert(*remaining, sender);
358                output_receivers.insert(
359                    *remaining,
360                    Rc::new(Mutex::new(UnboundedReceiverStream::new(
361                        receiver.into_inner(),
362                    ))),
363                );
364            }
365
366            {
367                let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
368                self.input_ports.insert(*remaining, receiver);
369                input_senders.insert(*remaining, Rc::new(sender));
370            }
371        }
372
373        let local_set = tokio::task::LocalSet::new();
374        local_set
375            .run_until(CURRENT_SIM_CONNECTIONS.scope(
376                RefCell::new(SimConnections {
377                    input_senders,
378                    output_receivers,
379                    external_registered: self.external_registered.clone(),
380                }),
381                async move {
382                    thunk(self).await;
383                },
384            ))
385            .await;
386    }
387
388    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
389    /// be invoked but before receiving any messages.
390    fn launch(self) {
391        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
392    }
393
394    /// Returns a future that schedules simulation with the given logger for reporting the
395    /// simulation trace.
396    pub fn schedule_with_logger<W: std::io::Write>(
397        self,
398        log_writer: W,
399    ) -> impl use<W> + Future<Output = ()> {
400        self.schedule_with_maybe_logger(Some(log_writer))
401    }
402
403    fn schedule_with_maybe_logger<W: std::io::Write>(
404        self,
405        log_override: Option<W>,
406    ) -> impl use<W> + Future<Output = ()> {
407        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = unsafe {
408            (self.func)(
409                colored::control::SHOULD_COLORIZE.should_colorize(),
410                self.output_ports,
411                self.input_ports,
412                if self.log {
413                    println_handler
414                } else {
415                    null_handler
416                },
417                if self.log {
418                    eprintln_handler
419                } else {
420                    null_handler
421                },
422            )
423        };
424        let mut launched = LaunchedSim {
425            async_dfirs: async_dfirs
426                .into_iter()
427                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
428                .collect(),
429            possibly_ready_ticks: vec![],
430            not_ready_ticks: tick_dfirs
431                .into_iter()
432                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
433                .collect(),
434            hooks: hooks
435                .into_iter()
436                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
437                .collect(),
438            inline_hooks: inline_hooks
439                .into_iter()
440                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
441                .collect(),
442            log: if self.log {
443                if let Some(w) = log_override {
444                    LogKind::Custom(w)
445                } else {
446                    LogKind::Stderr
447                }
448            } else {
449                LogKind::Null
450            },
451        };
452
453        async move { launched.scheduler().await }
454    }
455}
456
457impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
458    fn clone(&self) -> Self {
459        *self
460    }
461}
462
463impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
464
465impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
466    async fn with_stream<Out>(
467        &self,
468        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
469    ) -> Out {
470        let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
471            let connections = &mut *connections.borrow_mut();
472            connections
473                .output_receivers
474                .get(connections.external_registered.get(&self.0).unwrap())
475                .unwrap()
476                .clone()
477        });
478
479        let mut receiver_stream = receiver.lock().await;
480        thunk(&mut pin!(
481            &mut receiver_stream
482                .by_ref()
483                .map(|b| bincode::deserialize(&b).unwrap())
484        ))
485        .await
486    }
487
488    /// Asserts that the stream has ended and no more messages can possibly arrive.
489    pub fn assert_no_more(self) -> impl Future<Output = ()>
490    where
491        T: Debug,
492    {
493        FutureTrackingCaller {
494            future: async move {
495                self.with_stream(async |stream| {
496                    if let Some(next) = stream.next().await {
497                        Err(format!(
498                            "Stream yielded unexpected message: {:?}, expected termination",
499                            next
500                        ))?;
501                    }
502                    Ok(())
503                })
504                .await
505            },
506        }
507    }
508}
509
510impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
511    /// Receives the next message from the external bincode stream. This will wait until a message
512    /// is available, or return `None` if no more messages can possibly arrive.
513    pub async fn next(&self) -> Option<T> {
514        self.with_stream(async |stream| stream.next().await).await
515    }
516
517    /// Collects all remaining messages from the external bincode stream into a collection. This
518    /// will wait until no more messages can possibly arrive.
519    pub async fn collect<C: Default + Extend<T>>(self) -> C {
520        self.with_stream(async |stream| stream.collect().await)
521            .await
522    }
523
524    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
525    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
526    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
527        &self,
528        expected: I,
529    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
530    where
531        T: Debug + PartialEq<T2>,
532    {
533        FutureTrackingCaller {
534            future: async {
535                let mut expected: VecDeque<T2> = expected.into_iter().collect();
536
537                while !expected.is_empty() {
538                    if let Some(next) = self.next().await {
539                        let next_expected = expected.pop_front().unwrap();
540                        if next != next_expected {
541                            Err(format!(
542                                "Stream yielded unexpected message: {:?}, expected: {:?}",
543                                next, next_expected
544                            ))?
545                        }
546                    } else {
547                        Err(format!(
548                            "Stream ended early, still expected: {:?}",
549                            expected
550                        ))?;
551                    }
552                }
553
554                Ok(())
555            },
556        }
557    }
558
559    /// Asserts that the stream yields only the expected sequence of messages, in order,
560    /// and then ends.
561    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
562        &self,
563        expected: I,
564    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
565    where
566        T: Debug + PartialEq<T2>,
567    {
568        ChainedFuture {
569            first: self.assert_yields(expected),
570            second: self.assert_no_more(),
571            first_done: false,
572        }
573    }
574}
575
576pin_project_lite::pin_project! {
577    // A future that tracks the location of the `.await` call for better panic messages.
578    //
579    // `#[track_caller]` is important for us to create assertion methods because it makes
580    // the panic backtrace show up at that method (instead of inside the call tree within
581    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
582    // does not work correctly for async methods (or `dyn Future` either), so we have to
583    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
584    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
585    // nested concrete future).
586    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
587        #[pin]
588        future: F,
589    }
590}
591
592impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
593    type Output = ();
594
595    #[track_caller]
596    fn poll(
597        mut self: Pin<&mut Self>,
598        cx: &mut std::task::Context<'_>,
599    ) -> std::task::Poll<Self::Output> {
600        match ready!(self.as_mut().project().future.poll(cx)) {
601            Ok(()) => std::task::Poll::Ready(()),
602            Err(e) => panic!("{}", e),
603        }
604    }
605}
606
607pin_project_lite::pin_project! {
608    // A future that first awaits the first future, then the second, propagating caller info.
609    //
610    // See [`FutureTrackingCaller`] for context.
611    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
612        #[pin]
613        first: F1,
614        #[pin]
615        second: F2,
616        first_done: bool,
617    }
618}
619
620impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
621    type Output = ();
622
623    #[track_caller]
624    fn poll(
625        mut self: Pin<&mut Self>,
626        cx: &mut std::task::Context<'_>,
627    ) -> std::task::Poll<Self::Output> {
628        if !self.first_done {
629            ready!(self.as_mut().project().first.poll(cx));
630            *self.as_mut().project().first_done = true;
631        }
632
633        self.as_mut().project().second.poll(cx)
634    }
635}
636
637impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
638    /// Collects all remaining messages from the external bincode stream into a collection,
639    /// sorting them. This will wait until no more messages can possibly arrive.
640    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
641    where
642        T: Ord,
643    {
644        self.with_stream(async |stream| {
645            let mut collected: C = stream.collect().await;
646            collected.as_mut().sort();
647            collected
648        })
649        .await
650    }
651
652    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
653    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
654    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
655        &self,
656        expected: I,
657    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
658    where
659        T: Debug + PartialEq<T2>,
660    {
661        FutureTrackingCaller {
662            future: async {
663                self.with_stream(async |stream| {
664                    let mut expected: Vec<T2> = expected.into_iter().collect();
665
666                    while !expected.is_empty() {
667                        if let Some(next) = stream.next().await {
668                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
669                            if let Some((i, _)) = idx {
670                                expected.swap_remove(i);
671                            } else {
672                                Err(format!("Stream yielded unexpected message: {:?}", next))?
673                            }
674                        } else {
675                            Err(format!(
676                                "Stream ended early, still expected: {:?}",
677                                expected
678                            ))?
679                        }
680                    }
681
682                    Ok(())
683                })
684                .await
685            },
686        }
687    }
688
689    /// Asserts that the stream yields only the expected sequence of messages, in some order,
690    /// and then ends.
691    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
692        &self,
693        expected: I,
694    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
695    where
696        T: Debug + PartialEq<T2>,
697    {
698        ChainedFuture {
699            first: self.assert_yields_unordered(expected),
700            second: self.assert_no_more(),
701            first_done: false,
702        }
703    }
704}
705
706impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
707    fn with_sink<Out>(
708        &self,
709        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
710    ) -> Out {
711        let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
712            let connections = &mut *connections.borrow_mut();
713            connections
714                .input_senders
715                .get(connections.external_registered.get(&self.0).unwrap())
716                .unwrap()
717                .clone()
718        });
719
720        thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
721    }
722}
723
724impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
725    /// Sends several messages to the external bincode sink. The messages will be asynchronously
726    /// processed as part of the simulation, in non-determinstic order.
727    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
728        self.with_sink(|send| {
729            for t in iter {
730                send(t).unwrap();
731            }
732        })
733    }
734}
735
736impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
737    /// Sends a message to the external bincode sink. The message will be asynchronously processed
738    /// as part of the simulation.
739    pub fn send(&self, t: T) {
740        self.with_sink(|send| send(t)).unwrap();
741    }
742
743    /// Sends several messages to the external bincode sink. The messages will be asynchronously
744    /// processed as part of the simulation.
745    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
746        self.with_sink(|send| {
747            for t in iter {
748                send(t).unwrap();
749            }
750        })
751    }
752}
753
754enum LogKind<W: std::io::Write> {
755    Null,
756    Stderr,
757    Custom(W),
758}
759
760// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
761impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
762    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
763        match self {
764            LogKind::Null => Ok(()),
765            LogKind::Stderr => {
766                eprint!("{}", s);
767                Ok(())
768            }
769            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
770        }
771    }
772}
773
774/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
775/// about scheduling ticks and choices for non-deterministic operators like batch.
776struct LaunchedSim<W: std::io::Write> {
777    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
778    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
779    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
780    hooks: Hooks<LocationId>,
781    inline_hooks: InlineHooks<LocationId>,
782    log: LogKind<W>,
783}
784
785impl<W: std::io::Write> LaunchedSim<W> {
786    async fn scheduler(&mut self) {
787        loop {
788            tokio::task::yield_now().await;
789            let mut any_made_progress = false;
790            for (loc, c_id, dfir) in &mut self.async_dfirs {
791                if dfir.run_tick().await {
792                    any_made_progress = true;
793                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
794                        .not_ready_ticks
795                        .drain(..)
796                        .partition(|(tick_loc, tick_c_id, _)| {
797                            let LocationId::Tick(_, outer) = tick_loc else {
798                                unreachable!()
799                            };
800                            outer.as_ref() == loc && tick_c_id == c_id
801                        });
802
803                    self.possibly_ready_ticks.extend(now_ready);
804                    self.not_ready_ticks.extend(still_not_ready);
805                }
806            }
807
808            if any_made_progress {
809                continue;
810            } else {
811                use bolero::generator::*;
812
813                let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
814                    .possibly_ready_ticks
815                    .drain(..)
816                    .partition(|(name, cid, _)| {
817                        self.hooks
818                            .get(&(name.clone(), *cid))
819                            .unwrap()
820                            .iter()
821                            .any(|hook| {
822                                hook.current_decision().unwrap_or(false)
823                                    || hook.can_make_nontrivial_decision()
824                            })
825                    });
826
827                self.possibly_ready_ticks = ready;
828                self.not_ready_ticks.append(&mut not_ready);
829
830                if self.possibly_ready_ticks.is_empty() {
831                    break;
832                } else {
833                    let next_tick = (0..self.possibly_ready_ticks.len()).any();
834                    let mut removed = self.possibly_ready_ticks.remove(next_tick);
835
836                    match &mut self.log {
837                        LogKind::Null => {}
838                        LogKind::Stderr => {
839                            if let Some(cid) = &removed.1 {
840                                eprintln!(
841                                    "\n{}",
842                                    format!("Running Tick (Cluster Member {})", cid)
843                                        .color(colored::Color::Magenta)
844                                        .bold()
845                                )
846                            } else {
847                                eprintln!(
848                                    "\n{}",
849                                    "Running Tick".color(colored::Color::Magenta).bold()
850                                )
851                            }
852                        }
853                        LogKind::Custom(writer) => {
854                            writeln!(
855                                writer,
856                                "\n{}",
857                                "Running Tick".color(colored::Color::Magenta).bold()
858                            )
859                            .unwrap();
860                        }
861                    }
862
863                    let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
864                        write.write_str(&"*".color(colored::Color::Magenta).bold())?;
865                        write.write_str(" ")
866                    };
867
868                    let mut tick_decision_writer =
869                        indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
870                            inserter: &mut asterisk_indenter,
871                        });
872
873                    let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
874                    let mut remaining_decision_count = hooks.len();
875                    let mut made_nontrivial_decision = false;
876
877                    bolero_generator::any::scope::borrow_with(|driver| {
878                        // first, scan manual decisions
879                        hooks.iter_mut().for_each(|hook| {
880                            if let Some(is_nontrivial) = hook.current_decision() {
881                                made_nontrivial_decision |= is_nontrivial;
882                                remaining_decision_count -= 1;
883                            } else if !hook.can_make_nontrivial_decision() {
884                                // if no nontrivial decision is possible, make a trivial one
885                                // (we need to do this in the first pass to force nontrivial decisions
886                                // on the remaining hooks)
887                                hook.autonomous_decision(driver, false);
888                                remaining_decision_count -= 1;
889                            }
890                        });
891
892                        hooks.iter_mut().for_each(|hook| {
893                            if hook.current_decision().is_none() {
894                                made_nontrivial_decision |= hook.autonomous_decision(
895                                    driver,
896                                    !made_nontrivial_decision && remaining_decision_count == 1,
897                                );
898                                remaining_decision_count -= 1;
899                            }
900
901                            hook.release_decision(&mut tick_decision_writer);
902                        });
903                    });
904
905                    let run_tick_future = removed.2.run_tick();
906                    if let Some(inline_hooks) =
907                        self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
908                    {
909                        let mut run_tick_future_pinned = pin!(run_tick_future);
910
911                        loop {
912                            tokio::select! {
913                                biased;
914                                r = &mut run_tick_future_pinned => {
915                                    assert!(r);
916                                    break;
917                                }
918                                _ = async {} => {
919                                    bolero_generator::any::scope::borrow_with(|driver| {
920                                        for hook in inline_hooks.iter_mut() {
921                                            if hook.pending_decision() {
922                                                if !hook.has_decision() {
923                                                    hook.autonomous_decision(driver);
924                                                }
925
926                                                hook.release_decision(&mut tick_decision_writer);
927                                            }
928                                        }
929                                    });
930                                }
931                            }
932                        }
933                    } else {
934                        assert!(run_tick_future.await);
935                    }
936
937                    self.possibly_ready_ticks.push(removed);
938                }
939            }
940        }
941    }
942}