hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::marker::PhantomData;
6use std::panic::RefUnwindSafe;
7use std::path::Path;
8use std::pin::Pin;
9use std::task::{Context, Poll, Waker};
10
11use bytes::Bytes;
12use colored::Colorize;
13use dfir_rs::scheduled::graph::Dfir;
14use futures::{FutureExt, Stream, StreamExt};
15use libloading::Library;
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use tempfile::TempPath;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22use super::runtime::SimHook;
23use crate::compile::deploy::ConnectableAsync;
24use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
25use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
26
27/// A handle to a compiled Hydro simulation, which can be instantiated and run.
28pub struct CompiledSim {
29    pub(super) _path: TempPath,
30    pub(super) lib: Library,
31    pub(super) external_ports: Vec<usize>,
32}
33
34#[sealed::sealed]
35/// A trait implemented by closures that can instantiate a compiled simulation.
36///
37/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
38pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
39#[sealed::sealed]
40impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
41
42fn null_handler(_args: fmt::Arguments) {}
43
44fn println_handler(args: fmt::Arguments) {
45    println!("{}", args);
46}
47
48fn eprintln_handler(args: fmt::Arguments) {
49    eprintln!("{}", args);
50}
51
52type SimLoaded<'a> = libloading::Symbol<
53    'a,
54    unsafe extern "Rust" fn(
55        bool,
56        HashMap<usize, UnboundedSender<Bytes>>,
57        HashMap<usize, UnboundedReceiverStream<Bytes>>,
58        fn(fmt::Arguments<'_>),
59        fn(fmt::Arguments<'_>),
60    ) -> (
61        Dfir<'static>,
62        Vec<(&'static str, Dfir<'static>)>,
63        HashMap<&'static str, Vec<Box<dyn SimHook>>>,
64    ),
65>;
66
67impl CompiledSim {
68    /// Executes the given closure with a single instance of the compiled simulation.
69    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
70        self.with_instantiator(|instantiator| thunk(instantiator()), true)
71    }
72
73    /// Executes the given closure with an [`Instantiator`], which can be called to create
74    /// independent instances of the simulation. This is useful for fuzzing, where we need to
75    /// re-execute the simulation several times with different decisions.
76    ///
77    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
78    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
79    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
80    pub fn with_instantiator<T>(
81        &self,
82        thunk: impl FnOnce(&dyn Instantiator) -> T,
83        always_log: bool,
84    ) -> T {
85        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
86        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
87        thunk(
88            &(|| CompiledSimInstance {
89                func: func.clone(),
90                remaining_ports: self.external_ports.iter().cloned().collect(),
91                input_ports: HashMap::new(),
92                output_ports: HashMap::new(),
93                log,
94            }),
95        )
96    }
97
98    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
99    /// closure will be repeatedly executed with instances of the Hydro program where the
100    /// batching boundaries, order of messages, and retries are varied.
101    ///
102    /// During development, you should run the test that invokes this function with the `cargo sim`
103    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
104    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
105    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
106    /// be executed, and if no reproducer is found a small number of random executions will be
107    /// performed.
108    pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
109        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
110            .elements()
111            .into_iter()
112            .find(|e| {
113                !e.fn_name.starts_with("hydro_lang::sim::compiled")
114                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
115                    && !e.fn_name.starts_with("fuzz<")
116            })
117            .unwrap();
118
119        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
120        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
121
122        let caller_fuzz_repro_path = repro_folder
123            .join(caller_fn.fn_name.replace("::", "__"))
124            .with_extension("bin");
125
126        if std::env::var("BOLERO_FUZZER").is_ok() {
127            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
128            std::fs::create_dir_all(&corpus_dir).unwrap();
129            let libfuzzer_args = format!(
130                "{} {} -artifact_prefix={}/ -handle_abrt=0",
131                corpus_dir.to_str().unwrap(),
132                corpus_dir.to_str().unwrap(),
133                corpus_dir.to_str().unwrap(),
134            );
135
136            std::fs::create_dir_all(&repro_folder).unwrap();
137
138            unsafe {
139                std::env::set_var(
140                    "BOLERO_FAILURE_OUTPUT",
141                    caller_fuzz_repro_path.to_str().unwrap(),
142                );
143
144                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
145            }
146
147            self.with_instantiator(
148                |instantiator| {
149                    bolero::test(bolero::TargetLocation {
150                        package_name: "",
151                        manifest_dir: "",
152                        module_path: "",
153                        file: "",
154                        line: 0,
155                        item_path: "<unknown>::__bolero_item_path__",
156                        test_name: None,
157                    })
158                    .run_with_replay(move |is_replay| {
159                        let mut instance = instantiator();
160
161                        if instance.log {
162                            eprintln!(
163                                "{}",
164                                "\n==== New Simulation Instance ====\n"
165                                    .color(colored::Color::Cyan)
166                                    .bold()
167                            );
168                        }
169
170                        if is_replay {
171                            instance.log = true;
172                        }
173
174                        tokio::runtime::Builder::new_current_thread()
175                            .build()
176                            .unwrap()
177                            .block_on(async {
178                                let local_set = tokio::task::LocalSet::new();
179                                local_set.run_until(thunk(instance)).await
180                            })
181                    })
182                },
183                false,
184            );
185        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
186            self.fuzz_repro(existing_bytes, thunk);
187        } else {
188            eprintln!(
189                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
190                caller_fuzz_repro_path.display()
191            );
192            self.with_instantiator(
193                |instantiator| {
194                    bolero::test(bolero::TargetLocation {
195                        package_name: "",
196                        manifest_dir: "",
197                        module_path: "",
198                        file: ".",
199                        line: 0,
200                        item_path: "<unknown>::__bolero_item_path__",
201                        test_name: None,
202                    })
203                    .with_iterations(8192)
204                    .run(move || {
205                        let instance = instantiator();
206                        tokio::runtime::Builder::new_current_thread()
207                            .build()
208                            .unwrap()
209                            .block_on(async {
210                                let local_set = tokio::task::LocalSet::new();
211                                local_set.run_until(thunk(instance)).await
212                            })
213                    })
214                },
215                false,
216            );
217        }
218    }
219
220    /// Executes the given closure with a single instance of the compiled simulation, using the
221    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
222    /// failure found during fuzzing.
223    pub fn fuzz_repro<'a>(
224        &'a self,
225        bytes: Vec<u8>,
226        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
227    ) {
228        self.with_instance(|instance| {
229            bolero::bolero_engine::any::scope::with(
230                Box::new(bolero::bolero_engine::driver::object::Object(
231                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
232                )),
233                || {
234                    tokio::runtime::Builder::new_current_thread()
235                        .build()
236                        .unwrap()
237                        .block_on(async {
238                            let local_set = tokio::task::LocalSet::new();
239                            local_set.run_until(thunk(instance)).await
240                        })
241                },
242            )
243        });
244    }
245
246    /// Exhaustively searches all possible executions of the simulation. The provided
247    /// closure will be repeatedly executed with instances of the Hydro program where the
248    /// batching boundaries, order of messages, and retries are varied.
249    ///
250    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
251    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
252    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
253    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
254    ///
255    /// Returns the number of distinct executions explored.
256    pub fn exhaustive<'a>(
257        &'a self,
258        thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe,
259    ) -> usize {
260        if std::env::var("BOLERO_FUZZER").is_ok() {
261            eprintln!(
262                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
263            );
264            std::process::abort();
265        }
266
267        let mut count = 0;
268        let count_mut = &mut count;
269
270        self.with_instantiator(
271            |instantiator| {
272                bolero::test(bolero::TargetLocation {
273                    package_name: "",
274                    manifest_dir: "",
275                    module_path: "",
276                    file: "",
277                    line: 0,
278                    item_path: "<unknown>::__bolero_item_path__",
279                    test_name: None,
280                })
281                .exhaustive()
282                .run_with_replay(move |is_replay| {
283                    *count_mut += 1;
284
285                    let mut instance = instantiator();
286                    if instance.log {
287                        eprintln!(
288                            "{}",
289                            "\n==== New Simulation Instance ====\n"
290                                .color(colored::Color::Cyan)
291                                .bold()
292                        );
293                    }
294
295                    if is_replay {
296                        instance.log = true;
297                    }
298
299                    tokio::runtime::Builder::new_current_thread()
300                        .build()
301                        .unwrap()
302                        .block_on(async {
303                            let local_set = tokio::task::LocalSet::new();
304                            local_set.run_until(thunk(instance)).await;
305                        })
306                })
307            },
308            false,
309        );
310
311        count
312    }
313}
314
315/// A single instance of a compiled Hydro simulation, which provides methods to interactively
316/// execute the simulation, feed inputs, and receive outputs.
317pub struct CompiledSimInstance<'a> {
318    func: SimLoaded<'a>,
319    remaining_ports: HashSet<usize>,
320    output_ports: HashMap<usize, UnboundedSender<Bytes>>,
321    input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
322    log: bool,
323}
324
325impl<'a> CompiledSimInstance<'a> {
326    #[deprecated(note = "Use `connect` instead")]
327    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
328    /// given input port, and returns a closure that can be used to send messages to it.
329    pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
330        &mut self,
331        port: &ExternalBincodeSink<T, M, O, R>,
332    ) -> SimSender<T, O, R> {
333        self.connect(port)
334    }
335
336    #[deprecated(note = "Use `connect` instead")]
337    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
338    /// given output port, and returns a stream that can be used to receive messages from it.
339    pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
340        &mut self,
341        port: &ExternalBincodeStream<T, O, R>,
342    ) -> SimReceiver<'a, T, O, R> {
343        self.connect(port)
344    }
345
346    /// Establishes a connection to the given input or output port, returning either a
347    /// [`SimSender`] (for input ports) or a stream (for output ports). This should be invoked
348    /// before calling [`Self::launch`], and should only be invoked once per port.
349    pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
350        &'b mut self,
351        port: P,
352    ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
353        let mut pinned = std::pin::pin!(port.connect(self));
354        if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
355            v
356        } else {
357            panic!("Connect impl should not have used any async operations");
358        }
359    }
360
361    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
362    /// be invoked after connecting all inputs and outputs, but before receiving any messages.
363    pub fn launch(self) {
364        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
365    }
366
367    /// Returns a future that schedules simulation with the given logger for reporting the
368    /// simulation trace.
369    ///
370    /// See [`Self::launch`] for more details.
371    pub fn schedule_with_logger<W: std::io::Write>(
372        self,
373        log_writer: W,
374    ) -> impl use<W> + Future<Output = ()> {
375        self.schedule_with_maybe_logger(Some(log_writer))
376    }
377
378    fn schedule_with_maybe_logger<W: std::io::Write>(
379        self,
380        log_override: Option<W>,
381    ) -> impl use<W> + Future<Output = ()> {
382        if !self.remaining_ports.is_empty() {
383            panic!(
384                "Cannot launch DFIR because some of the inputs / outputs have not been connected."
385            )
386        }
387
388        let (async_dfir, ticks, hooks) = unsafe {
389            (self.func)(
390                colored::control::SHOULD_COLORIZE.should_colorize(),
391                self.output_ports,
392                self.input_ports,
393                if self.log {
394                    println_handler
395                } else {
396                    null_handler
397                },
398                if self.log {
399                    eprintln_handler
400                } else {
401                    null_handler
402                },
403            )
404        };
405        let mut launched = LaunchedSim {
406            async_dfir,
407            possibly_ready_ticks: vec![],
408            not_ready_ticks: ticks.into_iter().collect(),
409            hooks,
410            log: if self.log {
411                if let Some(w) = log_override {
412                    LogKind::Custom(w)
413                } else {
414                    LogKind::Stderr
415                }
416            } else {
417                LogKind::Null
418            },
419        };
420
421        async move { launched.scheduler().await }
422    }
423}
424
425/// A receiver for an external bincode stream in a simulation.
426pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
427    Pin<Box<dyn Stream<Item = T> + 'a>>,
428    PhantomData<(O, R)>,
429);
430
431impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
432    /// Asserts that the stream has ended and no more messages can possibly arrive.
433    pub async fn assert_no_more(mut self)
434    where
435        T: std::fmt::Debug,
436    {
437        if let Some(next) = self.0.next().await {
438            panic!("Stream yielded unexpected message: {:?}", next);
439        }
440    }
441}
442
443impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
444    /// Receives the next message from the external bincode stream. This will wait until a message
445    /// is available, or return `None` if no more messages can possibly arrive.
446    pub async fn next(&mut self) -> Option<T> {
447        self.0.next().await
448    }
449
450    /// Collects all remaining messages from the external bincode stream into a collection. This
451    /// will wait until no more messages can possibly arrive.
452    pub async fn collect<C: Default + Extend<T>>(self) -> C {
453        self.0.collect().await
454    }
455
456    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
457    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
458    pub async fn assert_yields(&mut self, expected: impl IntoIterator<Item = T>)
459    where
460        T: std::fmt::Debug + PartialEq,
461    {
462        let mut expected: VecDeque<T> = expected.into_iter().collect();
463
464        while !expected.is_empty() {
465            if let Some(next) = self.next().await {
466                assert_eq!(next, expected.pop_front().unwrap());
467            } else {
468                panic!("Stream ended early, still expected: {:?}", expected);
469            }
470        }
471    }
472
473    /// Asserts that the stream yields only the expected sequence of messages, in order,
474    /// and then ends.
475    pub async fn assert_yields_only(mut self, expected: impl IntoIterator<Item = T>)
476    where
477        T: std::fmt::Debug + PartialEq,
478    {
479        self.assert_yields(expected).await;
480        self.assert_no_more().await;
481    }
482}
483
484impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
485    /// Collects all remaining messages from the external bincode stream into a collection,
486    /// sorting them. This will wait until no more messages can possibly arrive.
487    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
488    where
489        T: Ord,
490    {
491        let mut collected: C = self.0.collect().await;
492        collected.as_mut().sort();
493        collected
494    }
495
496    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
497    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
498    pub async fn assert_yields_unordered(&mut self, expected: impl IntoIterator<Item = T>)
499    where
500        T: std::fmt::Debug + PartialEq,
501    {
502        let mut expected: Vec<T> = expected.into_iter().collect();
503
504        while !expected.is_empty() {
505            if let Some(next) = self.0.next().await {
506                let prev_length = expected.len();
507                expected.retain(|e| e != &next);
508                if expected.len() == prev_length {
509                    panic!("Stream yielded unexpected message: {:?}", next);
510                }
511            } else {
512                panic!("Stream ended early, still expected: {:?}", expected);
513            }
514        }
515    }
516
517    /// Asserts that the stream yields only the expected sequence of messages, in some order,
518    /// and then ends.
519    pub async fn assert_yields_only_unordered(mut self, expected: impl IntoIterator<Item = T>)
520    where
521        T: std::fmt::Debug + PartialEq,
522    {
523        self.assert_yields_unordered(expected).await;
524        self.assert_no_more().await;
525    }
526}
527
528impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
529    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
530{
531    type Output = SimReceiver<'a, T, O, R>;
532
533    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
534        assert!(ctx.remaining_ports.remove(&self.port_id));
535        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
536        ctx.output_ports.insert(self.port_id, sink);
537
538        SimReceiver(
539            Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
540            PhantomData,
541        )
542    }
543}
544
545/// A sender to an external bincode sink in a simulation.
546pub struct SimSender<T, O: Ordering, R: Retries>(
547    Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
548    PhantomData<(O, R)>,
549);
550impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
551    /// Sends a message to the external bincode sink. The message will be asynchronously processed
552    /// as part of the simulation.
553    pub fn send(&self, t: T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
554        (self.0)(t)
555    }
556
557    /// Sends several messages to the external bincode sink. The messages will be asynchronously
558    /// processed as part of the simulation.
559    pub fn send_many<I: IntoIterator<Item = T>>(
560        &self,
561        iter: I,
562    ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
563        for t in iter {
564            (self.0)(t)?;
565        }
566        Ok(())
567    }
568}
569
570impl<T> SimSender<T, NoOrder, ExactlyOnce> {
571    /// Sends several messages to the external bincode sink. The messages will be asynchronously
572    /// processed as part of the simulation, in non-determinstic order.
573    pub fn send_many_unordered<I: IntoIterator<Item = T>>(
574        &self,
575        iter: I,
576    ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
577        for t in iter {
578            (self.0)(t)?;
579        }
580        Ok(())
581    }
582}
583
584impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
585    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
586{
587    type Output = SimSender<T, O, R>;
588
589    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
590        assert!(ctx.remaining_ports.remove(&self.port_id));
591        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
592        ctx.input_ports.insert(self.port_id, source);
593        SimSender(
594            Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
595            PhantomData,
596        )
597    }
598}
599
600enum LogKind<W: std::io::Write> {
601    Null,
602    Stderr,
603    Custom(W),
604}
605
606// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
607impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
608    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
609        match self {
610            LogKind::Null => Ok(()),
611            LogKind::Stderr => {
612                eprint!("{}", s);
613                Ok(())
614            }
615            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
616        }
617    }
618}
619/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
620/// about scheduling ticks and choices for non-deterministic operators like batch.
621struct LaunchedSim<W: std::io::Write> {
622    async_dfir: Dfir<'static>,
623    possibly_ready_ticks: Vec<(&'static str, Dfir<'static>)>,
624    not_ready_ticks: Vec<(&'static str, Dfir<'static>)>,
625    hooks: HashMap<&'static str, Vec<Box<dyn SimHook>>>,
626    log: LogKind<W>,
627}
628
629impl<W: std::io::Write> LaunchedSim<W> {
630    async fn scheduler(&mut self) {
631        loop {
632            tokio::task::yield_now().await;
633            if self.async_dfir.run_available().await {
634                self.possibly_ready_ticks.append(&mut self.not_ready_ticks);
635                continue;
636            } else {
637                use bolero::generator::*;
638
639                let (ready, mut not_ready): (Vec<_>, Vec<_>) =
640                    self.possibly_ready_ticks.drain(..).partition(|(name, _)| {
641                        self.hooks.get(name).unwrap().iter().any(|hook| {
642                            hook.current_decision().unwrap_or(false)
643                                || hook.can_make_nontrivial_decision()
644                        })
645                    });
646
647                self.possibly_ready_ticks = ready;
648                self.not_ready_ticks.append(&mut not_ready);
649
650                if self.possibly_ready_ticks.is_empty() {
651                    break;
652                } else {
653                    let next_tick = (0..self.possibly_ready_ticks.len()).any();
654                    let mut removed: (&'static str, Dfir<'static>) =
655                        self.possibly_ready_ticks.remove(next_tick);
656
657                    match &mut self.log {
658                        LogKind::Null => {}
659                        LogKind::Stderr => {
660                            eprintln!("\n{}", "Running Tick".color(colored::Color::Magenta).bold())
661                        }
662                        LogKind::Custom(writer) => {
663                            writeln!(
664                                writer,
665                                "\n{}",
666                                "Running Tick".color(colored::Color::Magenta).bold()
667                            )
668                            .unwrap();
669                        }
670                    }
671
672                    let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
673                        write.write_str(&"*".color(colored::Color::Magenta).bold())?;
674                        write.write_str(" ")
675                    };
676
677                    let mut tick_decision_writer =
678                        indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
679                            inserter: &mut asterisk_indenter,
680                        });
681
682                    let hooks = self.hooks.get_mut(removed.0).unwrap();
683                    let mut remaining_decision_count = hooks.len();
684                    let mut made_nontrivial_decision = false;
685
686                    bolero_generator::any::scope::borrow_with(|driver| {
687                        // first, scan manual decisions
688                        hooks.iter_mut().for_each(|hook| {
689                            if let Some(is_nontrivial) = hook.current_decision() {
690                                made_nontrivial_decision |= is_nontrivial;
691                                remaining_decision_count -= 1;
692                            } else if !hook.can_make_nontrivial_decision() {
693                                // if no nontrivial decision is possible, make a trivial one
694                                // (we need to do this in the first pass to force nontrivial decisions
695                                // on the remaining hooks)
696                                hook.autonomous_decision(driver, false);
697                                remaining_decision_count -= 1;
698                            }
699                        });
700
701                        hooks.iter_mut().for_each(|hook| {
702                            if hook.current_decision().is_none() {
703                                made_nontrivial_decision |= hook.autonomous_decision(
704                                    driver,
705                                    !made_nontrivial_decision && remaining_decision_count == 1,
706                                );
707                                remaining_decision_count -= 1;
708                            }
709
710                            hook.release_decision(&mut tick_decision_writer);
711                        });
712                    });
713
714                    assert!(removed.1.run_tick().await);
715                    self.possibly_ready_ticks.push(removed);
716                }
717            }
718        }
719    }
720}