hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31
32struct SimConnections {
33    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
34    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
35    external_registered: HashMap<ExternalPortId, SimExternalPort>,
36}
37
38tokio::task_local! {
39    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
40}
41
42/// A handle to a compiled Hydro simulation, which can be instantiated and run.
43pub struct CompiledSim {
44    pub(super) _path: TempPath,
45    pub(super) lib: Library,
46    pub(super) externals_port_registry: SimExternalPortRegistry,
47}
48
49#[sealed::sealed]
50/// A trait implemented by closures that can instantiate a compiled simulation.
51///
52/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
53pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
54#[sealed::sealed]
55impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
56
57fn null_handler(_args: fmt::Arguments) {}
58
59fn println_handler(args: fmt::Arguments) {
60    println!("{}", args);
61}
62
63fn eprintln_handler(args: fmt::Arguments) {
64    eprintln!("{}", args);
65}
66
67/// Creates a simulation instance, returning:
68/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
69/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
70/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
71/// - A mapping of inline hooks for non-deterministic decisions inside ticks
72type SimLoaded<'a> = libloading::Symbol<
73    'a,
74    unsafe extern "Rust" fn(
75        should_color: bool,
76        // usize: SimExternalPort
77        external_out: HashMap<usize, UnboundedSender<Bytes>>,
78        // usize: SimExternalPort
79        external_in: HashMap<usize, UnboundedReceiverStream<Bytes>>,
80        println_handler: fn(fmt::Arguments<'_>),
81        eprintln_handler: fn(fmt::Arguments<'_>),
82    ) -> (
83        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
84        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
85        Hooks<&'static str>,
86        InlineHooks<&'static str>,
87    ),
88>;
89
90impl CompiledSim {
91    /// Executes the given closure with a single instance of the compiled simulation.
92    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
93        self.with_instantiator(|instantiator| thunk(instantiator()), true)
94    }
95
96    /// Executes the given closure with an [`Instantiator`], which can be called to create
97    /// independent instances of the simulation. This is useful for fuzzing, where we need to
98    /// re-execute the simulation several times with different decisions.
99    ///
100    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
101    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
102    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
103    pub fn with_instantiator<T>(
104        &self,
105        thunk: impl FnOnce(&dyn Instantiator) -> T,
106        always_log: bool,
107    ) -> T {
108        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
109        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
110        thunk(
111            &(|| CompiledSimInstance {
112                func: func.clone(),
113                externals_port_registry: self.externals_port_registry.clone(),
114                input_ports: HashMap::new(),
115                output_ports: HashMap::new(),
116                log,
117            }),
118        )
119    }
120
121    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
122    /// closure will be repeatedly executed with instances of the Hydro program where the
123    /// batching boundaries, order of messages, and retries are varied.
124    ///
125    /// During development, you should run the test that invokes this function with the `cargo sim`
126    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
127    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
128    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
129    /// be executed, and if no reproducer is found a small number of random executions will be
130    /// performed.
131    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
132        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
133            .elements()
134            .into_iter()
135            .find(|e| {
136                !e.fn_name.starts_with("hydro_lang::sim::compiled")
137                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
138                    && !e.fn_name.starts_with("fuzz<")
139                    && !e.fn_name.starts_with("<hydro_lang::sim")
140            })
141            .unwrap();
142
143        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
144        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
145
146        let caller_fuzz_repro_path = repro_folder
147            .join(caller_fn.fn_name.replace("::", "__"))
148            .with_extension("bin");
149
150        if std::env::var("BOLERO_FUZZER").is_ok() {
151            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
152            std::fs::create_dir_all(&corpus_dir).unwrap();
153            let libfuzzer_args = format!(
154                "{} {} -artifact_prefix={}/ -handle_abrt=0",
155                corpus_dir.to_str().unwrap(),
156                corpus_dir.to_str().unwrap(),
157                corpus_dir.to_str().unwrap(),
158            );
159
160            std::fs::create_dir_all(&repro_folder).unwrap();
161
162            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
163                unsafe {
164                    std::env::set_var(
165                        "BOLERO_FAILURE_OUTPUT",
166                        caller_fuzz_repro_path.to_str().unwrap(),
167                    );
168                }
169            }
170
171            unsafe {
172                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
173            }
174
175            self.with_instantiator(
176                |instantiator| {
177                    bolero::test(bolero::TargetLocation {
178                        package_name: "",
179                        manifest_dir: "",
180                        module_path: "",
181                        file: "",
182                        line: 0,
183                        item_path: "<unknown>::__bolero_item_path__",
184                        test_name: None,
185                    })
186                    .run_with_replay(move |is_replay| {
187                        let mut instance = instantiator();
188
189                        if instance.log {
190                            eprintln!(
191                                "{}",
192                                "\n==== New Simulation Instance ===="
193                                    .color(colored::Color::Cyan)
194                                    .bold()
195                            );
196                        }
197
198                        if is_replay {
199                            instance.log = true;
200                        }
201
202                        tokio::runtime::Builder::new_current_thread()
203                            .build()
204                            .unwrap()
205                            .block_on(async { instance.run(&mut thunk).await })
206                    })
207                },
208                false,
209            );
210        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
211            self.fuzz_repro(existing_bytes, async |compiled| {
212                compiled.launch();
213                thunk().await
214            });
215        } else {
216            eprintln!(
217                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
218                caller_fuzz_repro_path.display()
219            );
220            self.with_instantiator(
221                |instantiator| {
222                    bolero::test(bolero::TargetLocation {
223                        package_name: "",
224                        manifest_dir: "",
225                        module_path: "",
226                        file: ".",
227                        line: 0,
228                        item_path: "<unknown>::__bolero_item_path__",
229                        test_name: None,
230                    })
231                    .with_iterations(8192)
232                    .run(move || {
233                        let instance = instantiator();
234                        tokio::runtime::Builder::new_current_thread()
235                            .build()
236                            .unwrap()
237                            .block_on(async { instance.run(&mut thunk).await })
238                    })
239                },
240                false,
241            );
242        }
243    }
244
245    /// Executes the given closure with a single instance of the compiled simulation, using the
246    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
247    /// failure found during fuzzing.
248    pub fn fuzz_repro<'a>(
249        &'a self,
250        bytes: Vec<u8>,
251        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
252    ) {
253        self.with_instance(|instance| {
254            bolero::bolero_engine::any::scope::with(
255                Box::new(bolero::bolero_engine::driver::object::Object(
256                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
257                )),
258                || {
259                    tokio::runtime::Builder::new_current_thread()
260                        .build()
261                        .unwrap()
262                        .block_on(async { instance.run_without_launching(thunk).await })
263                },
264            )
265        });
266    }
267
268    /// Exhaustively searches all possible executions of the simulation. The provided
269    /// closure will be repeatedly executed with instances of the Hydro program where the
270    /// batching boundaries, order of messages, and retries are varied.
271    ///
272    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
273    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
274    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
275    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
276    ///
277    /// Returns the number of distinct executions explored.
278    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
279        if std::env::var("BOLERO_FUZZER").is_ok() {
280            eprintln!(
281                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
282            );
283            std::process::abort();
284        }
285
286        let mut count = 0;
287        let count_mut = &mut count;
288
289        self.with_instantiator(
290            |instantiator| {
291                bolero::test(bolero::TargetLocation {
292                    package_name: "",
293                    manifest_dir: "",
294                    module_path: "",
295                    file: "",
296                    line: 0,
297                    item_path: "<unknown>::__bolero_item_path__",
298                    test_name: None,
299                })
300                .exhaustive()
301                .run_with_replay(move |is_replay| {
302                    *count_mut += 1;
303
304                    let mut instance = instantiator();
305                    if instance.log {
306                        eprintln!(
307                            "{}",
308                            "\n==== New Simulation Instance ===="
309                                .color(colored::Color::Cyan)
310                                .bold()
311                        );
312                    }
313
314                    if is_replay {
315                        instance.log = true;
316                    }
317
318                    tokio::runtime::Builder::new_current_thread()
319                        .build()
320                        .unwrap()
321                        .block_on(async { instance.run(&mut thunk).await })
322                })
323            },
324            false,
325        );
326
327        count
328    }
329}
330
331/// A single instance of a compiled Hydro simulation, which provides methods to interactively
332/// execute the simulation, feed inputs, and receive outputs.
333pub struct CompiledSimInstance<'a> {
334    func: SimLoaded<'a>,
335    externals_port_registry: SimExternalPortRegistry,
336    output_ports: HashMap<SimExternalPort, UnboundedSender<Bytes>>,
337    input_ports: HashMap<SimExternalPort, UnboundedReceiverStream<Bytes>>,
338    log: bool,
339}
340
341impl<'a> CompiledSimInstance<'a> {
342    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
343        self.run_without_launching(async |instance| {
344            instance.launch();
345            thunk().await;
346        })
347        .await;
348    }
349
350    async fn run_without_launching(
351        mut self,
352        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
353    ) {
354        let mut input_senders = HashMap::new();
355        let mut output_receivers = HashMap::new();
356        for registered_port in self.externals_port_registry.port_counter.range_up_to() {
357            {
358                let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
359                self.output_ports.insert(registered_port, sender);
360                output_receivers.insert(
361                    registered_port,
362                    Rc::new(Mutex::new(UnboundedReceiverStream::new(
363                        receiver.into_inner(),
364                    ))),
365                );
366            }
367
368            {
369                let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
370                self.input_ports.insert(registered_port, receiver);
371                input_senders.insert(registered_port, Rc::new(sender));
372            }
373        }
374
375        let local_set = tokio::task::LocalSet::new();
376        local_set
377            .run_until(CURRENT_SIM_CONNECTIONS.scope(
378                RefCell::new(SimConnections {
379                    input_senders,
380                    output_receivers,
381                    external_registered: self.externals_port_registry.registered.clone(),
382                }),
383                async move {
384                    thunk(self).await;
385                },
386            ))
387            .await;
388    }
389
390    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
391    /// be invoked but before receiving any messages.
392    fn launch(self) {
393        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
394    }
395
396    /// Returns a future that schedules simulation with the given logger for reporting the
397    /// simulation trace.
398    pub fn schedule_with_logger<W: std::io::Write>(
399        self,
400        log_writer: W,
401    ) -> impl use<W> + Future<Output = ()> {
402        self.schedule_with_maybe_logger(Some(log_writer))
403    }
404
405    fn schedule_with_maybe_logger<W: std::io::Write>(
406        self,
407        log_override: Option<W>,
408    ) -> impl use<W> + Future<Output = ()> {
409        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = unsafe {
410            (self.func)(
411                colored::control::SHOULD_COLORIZE.should_colorize(),
412                self.output_ports
413                    .into_iter()
414                    .map(|(k, v)| (k.into_inner(), v))
415                    .collect(),
416                self.input_ports
417                    .into_iter()
418                    .map(|(k, v)| (k.into_inner(), v))
419                    .collect(),
420                if self.log {
421                    println_handler
422                } else {
423                    null_handler
424                },
425                if self.log {
426                    eprintln_handler
427                } else {
428                    null_handler
429                },
430            )
431        };
432        let mut launched = LaunchedSim {
433            async_dfirs: async_dfirs
434                .into_iter()
435                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
436                .collect(),
437            possibly_ready_ticks: vec![],
438            not_ready_ticks: tick_dfirs
439                .into_iter()
440                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
441                .collect(),
442            hooks: hooks
443                .into_iter()
444                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
445                .collect(),
446            inline_hooks: inline_hooks
447                .into_iter()
448                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
449                .collect(),
450            log: if self.log {
451                if let Some(w) = log_override {
452                    LogKind::Custom(w)
453                } else {
454                    LogKind::Stderr
455                }
456            } else {
457                LogKind::Null
458            },
459        };
460
461        async move { launched.scheduler().await }
462    }
463}
464
465impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
466    fn clone(&self) -> Self {
467        *self
468    }
469}
470
471impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
472
473impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
474    async fn with_stream<Out>(
475        &self,
476        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
477    ) -> Out {
478        let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
479            let connections = &mut *connections.borrow_mut();
480            connections
481                .output_receivers
482                .get(connections.external_registered.get(&self.0).unwrap())
483                .unwrap()
484                .clone()
485        });
486
487        let mut receiver_stream = receiver.lock().await;
488        thunk(&mut pin!(
489            &mut receiver_stream
490                .by_ref()
491                .map(|b| bincode::deserialize(&b).unwrap())
492        ))
493        .await
494    }
495
496    /// Asserts that the stream has ended and no more messages can possibly arrive.
497    pub fn assert_no_more(self) -> impl Future<Output = ()>
498    where
499        T: Debug,
500    {
501        FutureTrackingCaller {
502            future: async move {
503                self.with_stream(async |stream| {
504                    if let Some(next) = stream.next().await {
505                        Err(format!(
506                            "Stream yielded unexpected message: {:?}, expected termination",
507                            next
508                        ))?;
509                    }
510                    Ok(())
511                })
512                .await
513            },
514        }
515    }
516}
517
518impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
519    /// Receives the next message from the external bincode stream. This will wait until a message
520    /// is available, or return `None` if no more messages can possibly arrive.
521    pub async fn next(&self) -> Option<T> {
522        self.with_stream(async |stream| stream.next().await).await
523    }
524
525    /// Collects all remaining messages from the external bincode stream into a collection. This
526    /// will wait until no more messages can possibly arrive.
527    pub async fn collect<C: Default + Extend<T>>(self) -> C {
528        self.with_stream(async |stream| stream.collect().await)
529            .await
530    }
531
532    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
533    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
534    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
535        &self,
536        expected: I,
537    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
538    where
539        T: Debug + PartialEq<T2>,
540    {
541        FutureTrackingCaller {
542            future: async {
543                let mut expected: VecDeque<T2> = expected.into_iter().collect();
544
545                while !expected.is_empty() {
546                    if let Some(next) = self.next().await {
547                        let next_expected = expected.pop_front().unwrap();
548                        if next != next_expected {
549                            Err(format!(
550                                "Stream yielded unexpected message: {:?}, expected: {:?}",
551                                next, next_expected
552                            ))?
553                        }
554                    } else {
555                        Err(format!(
556                            "Stream ended early, still expected: {:?}",
557                            expected
558                        ))?;
559                    }
560                }
561
562                Ok(())
563            },
564        }
565    }
566
567    /// Asserts that the stream yields only the expected sequence of messages, in order,
568    /// and then ends.
569    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
570        &self,
571        expected: I,
572    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
573    where
574        T: Debug + PartialEq<T2>,
575    {
576        ChainedFuture {
577            first: self.assert_yields(expected),
578            second: self.assert_no_more(),
579            first_done: false,
580        }
581    }
582}
583
584pin_project_lite::pin_project! {
585    // A future that tracks the location of the `.await` call for better panic messages.
586    //
587    // `#[track_caller]` is important for us to create assertion methods because it makes
588    // the panic backtrace show up at that method (instead of inside the call tree within
589    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
590    // does not work correctly for async methods (or `dyn Future` either), so we have to
591    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
592    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
593    // nested concrete future).
594    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
595        #[pin]
596        future: F,
597    }
598}
599
600impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
601    type Output = ();
602
603    #[track_caller]
604    fn poll(
605        mut self: Pin<&mut Self>,
606        cx: &mut std::task::Context<'_>,
607    ) -> std::task::Poll<Self::Output> {
608        match ready!(self.as_mut().project().future.poll(cx)) {
609            Ok(()) => std::task::Poll::Ready(()),
610            Err(e) => panic!("{}", e),
611        }
612    }
613}
614
615pin_project_lite::pin_project! {
616    // A future that first awaits the first future, then the second, propagating caller info.
617    //
618    // See [`FutureTrackingCaller`] for context.
619    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
620        #[pin]
621        first: F1,
622        #[pin]
623        second: F2,
624        first_done: bool,
625    }
626}
627
628impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
629    type Output = ();
630
631    #[track_caller]
632    fn poll(
633        mut self: Pin<&mut Self>,
634        cx: &mut std::task::Context<'_>,
635    ) -> std::task::Poll<Self::Output> {
636        if !self.first_done {
637            ready!(self.as_mut().project().first.poll(cx));
638            *self.as_mut().project().first_done = true;
639        }
640
641        self.as_mut().project().second.poll(cx)
642    }
643}
644
645impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
646    /// Collects all remaining messages from the external bincode stream into a collection,
647    /// sorting them. This will wait until no more messages can possibly arrive.
648    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
649    where
650        T: Ord,
651    {
652        self.with_stream(async |stream| {
653            let mut collected: C = stream.collect().await;
654            collected.as_mut().sort();
655            collected
656        })
657        .await
658    }
659
660    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
661    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
662    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
663        &self,
664        expected: I,
665    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
666    where
667        T: Debug + PartialEq<T2>,
668    {
669        FutureTrackingCaller {
670            future: async {
671                self.with_stream(async |stream| {
672                    let mut expected: Vec<T2> = expected.into_iter().collect();
673
674                    while !expected.is_empty() {
675                        if let Some(next) = stream.next().await {
676                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
677                            if let Some((i, _)) = idx {
678                                expected.swap_remove(i);
679                            } else {
680                                Err(format!("Stream yielded unexpected message: {:?}", next))?
681                            }
682                        } else {
683                            Err(format!(
684                                "Stream ended early, still expected: {:?}",
685                                expected
686                            ))?
687                        }
688                    }
689
690                    Ok(())
691                })
692                .await
693            },
694        }
695    }
696
697    /// Asserts that the stream yields only the expected sequence of messages, in some order,
698    /// and then ends.
699    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
700        &self,
701        expected: I,
702    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
703    where
704        T: Debug + PartialEq<T2>,
705    {
706        ChainedFuture {
707            first: self.assert_yields_unordered(expected),
708            second: self.assert_no_more(),
709            first_done: false,
710        }
711    }
712}
713
714impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
715    fn with_sink<Out>(
716        &self,
717        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
718    ) -> Out {
719        let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
720            let connections = &mut *connections.borrow_mut();
721            connections
722                .input_senders
723                .get(connections.external_registered.get(&self.0).unwrap())
724                .unwrap()
725                .clone()
726        });
727
728        thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
729    }
730}
731
732impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
733    /// Sends several messages to the external bincode sink. The messages will be asynchronously
734    /// processed as part of the simulation, in non-determinstic order.
735    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
736        self.with_sink(|send| {
737            for t in iter {
738                send(t).unwrap();
739            }
740        })
741    }
742}
743
744impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
745    /// Sends a message to the external bincode sink. The message will be asynchronously processed
746    /// as part of the simulation.
747    pub fn send(&self, t: T) {
748        self.with_sink(|send| send(t)).unwrap();
749    }
750
751    /// Sends several messages to the external bincode sink. The messages will be asynchronously
752    /// processed as part of the simulation.
753    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
754        self.with_sink(|send| {
755            for t in iter {
756                send(t).unwrap();
757            }
758        })
759    }
760}
761
762enum LogKind<W: std::io::Write> {
763    Null,
764    Stderr,
765    Custom(W),
766}
767
768// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
769impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
770    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
771        match self {
772            LogKind::Null => Ok(()),
773            LogKind::Stderr => {
774                eprint!("{}", s);
775                Ok(())
776            }
777            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
778        }
779    }
780}
781
782/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
783/// about scheduling ticks and choices for non-deterministic operators like batch.
784struct LaunchedSim<W: std::io::Write> {
785    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
786    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
787    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
788    hooks: Hooks<LocationId>,
789    inline_hooks: InlineHooks<LocationId>,
790    log: LogKind<W>,
791}
792
793impl<W: std::io::Write> LaunchedSim<W> {
794    async fn scheduler(&mut self) {
795        loop {
796            tokio::task::yield_now().await;
797            let mut any_made_progress = false;
798            for (loc, c_id, dfir) in &mut self.async_dfirs {
799                if dfir.run_tick().await {
800                    any_made_progress = true;
801                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
802                        .not_ready_ticks
803                        .drain(..)
804                        .partition(|(tick_loc, tick_c_id, _)| {
805                            let LocationId::Tick(_, outer) = tick_loc else {
806                                unreachable!()
807                            };
808                            outer.as_ref() == loc && tick_c_id == c_id
809                        });
810
811                    self.possibly_ready_ticks.extend(now_ready);
812                    self.not_ready_ticks.extend(still_not_ready);
813                }
814            }
815
816            if any_made_progress {
817                continue;
818            } else {
819                use bolero::generator::*;
820
821                let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
822                    .possibly_ready_ticks
823                    .drain(..)
824                    .partition(|(name, cid, _)| {
825                        self.hooks
826                            .get(&(name.clone(), *cid))
827                            .unwrap()
828                            .iter()
829                            .any(|hook| {
830                                hook.current_decision().unwrap_or(false)
831                                    || hook.can_make_nontrivial_decision()
832                            })
833                    });
834
835                self.possibly_ready_ticks = ready;
836                self.not_ready_ticks.append(&mut not_ready);
837
838                if self.possibly_ready_ticks.is_empty() {
839                    break;
840                } else {
841                    let next_tick = (0..self.possibly_ready_ticks.len()).any();
842                    let mut removed = self.possibly_ready_ticks.remove(next_tick);
843
844                    match &mut self.log {
845                        LogKind::Null => {}
846                        LogKind::Stderr => {
847                            if let Some(cid) = &removed.1 {
848                                eprintln!(
849                                    "\n{}",
850                                    format!("Running Tick (Cluster Member {})", cid)
851                                        .color(colored::Color::Magenta)
852                                        .bold()
853                                )
854                            } else {
855                                eprintln!(
856                                    "\n{}",
857                                    "Running Tick".color(colored::Color::Magenta).bold()
858                                )
859                            }
860                        }
861                        LogKind::Custom(writer) => {
862                            writeln!(
863                                writer,
864                                "\n{}",
865                                "Running Tick".color(colored::Color::Magenta).bold()
866                            )
867                            .unwrap();
868                        }
869                    }
870
871                    let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
872                        write.write_str(&"*".color(colored::Color::Magenta).bold())?;
873                        write.write_str(" ")
874                    };
875
876                    let mut tick_decision_writer =
877                        indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
878                            inserter: &mut asterisk_indenter,
879                        });
880
881                    let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
882                    let mut remaining_decision_count = hooks.len();
883                    let mut made_nontrivial_decision = false;
884
885                    bolero_generator::any::scope::borrow_with(|driver| {
886                        // first, scan manual decisions
887                        hooks.iter_mut().for_each(|hook| {
888                            if let Some(is_nontrivial) = hook.current_decision() {
889                                made_nontrivial_decision |= is_nontrivial;
890                                remaining_decision_count -= 1;
891                            } else if !hook.can_make_nontrivial_decision() {
892                                // if no nontrivial decision is possible, make a trivial one
893                                // (we need to do this in the first pass to force nontrivial decisions
894                                // on the remaining hooks)
895                                hook.autonomous_decision(driver, false);
896                                remaining_decision_count -= 1;
897                            }
898                        });
899
900                        hooks.iter_mut().for_each(|hook| {
901                            if hook.current_decision().is_none() {
902                                made_nontrivial_decision |= hook.autonomous_decision(
903                                    driver,
904                                    !made_nontrivial_decision && remaining_decision_count == 1,
905                                );
906                                remaining_decision_count -= 1;
907                            }
908
909                            hook.release_decision(&mut tick_decision_writer);
910                        });
911                    });
912
913                    let run_tick_future = removed.2.run_tick();
914                    if let Some(inline_hooks) =
915                        self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
916                    {
917                        let mut run_tick_future_pinned = pin!(run_tick_future);
918
919                        loop {
920                            tokio::select! {
921                                biased;
922                                r = &mut run_tick_future_pinned => {
923                                    assert!(r);
924                                    break;
925                                }
926                                _ = async {} => {
927                                    bolero_generator::any::scope::borrow_with(|driver| {
928                                        for hook in inline_hooks.iter_mut() {
929                                            if hook.pending_decision() {
930                                                if !hook.has_decision() {
931                                                    hook.autonomous_decision(driver);
932                                                }
933
934                                                hook.release_decision(&mut tick_decision_writer);
935                                            }
936                                        }
937                                    });
938                                }
939                            }
940                        }
941                    } else {
942                        assert!(run_tick_future.await);
943                    }
944
945                    self.possibly_ready_ticks.push(removed);
946                }
947            }
948        }
949    }
950}