1use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimReceiver, SimSender};
27use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
28use crate::location::dynamic::LocationId;
29
30struct SimConnections {
31 input_senders: HashMap<usize, Rc<UnboundedSender<Bytes>>>,
32 output_receivers: HashMap<usize, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
33 external_registered: HashMap<usize, usize>,
34}
35
36tokio::task_local! {
37 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
38}
39
40pub struct CompiledSim {
42 pub(super) _path: TempPath,
43 pub(super) lib: Library,
44 pub(super) external_ports: Vec<usize>,
45 pub(super) external_registered: HashMap<usize, usize>,
46}
47
48#[sealed::sealed]
49pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
53#[sealed::sealed]
54impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
55
56fn null_handler(_args: fmt::Arguments) {}
57
58fn println_handler(args: fmt::Arguments) {
59 println!("{}", args);
60}
61
62fn eprintln_handler(args: fmt::Arguments) {
63 eprintln!("{}", args);
64}
65
66type SimLoaded<'a> = libloading::Symbol<
72 'a,
73 unsafe extern "Rust" fn(
74 bool,
75 HashMap<usize, UnboundedSender<Bytes>>,
76 HashMap<usize, UnboundedReceiverStream<Bytes>>,
77 fn(fmt::Arguments<'_>),
78 fn(fmt::Arguments<'_>),
79 ) -> (
80 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
81 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
82 Hooks<&'static str>,
83 InlineHooks<&'static str>,
84 ),
85>;
86
87impl CompiledSim {
88 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
90 self.with_instantiator(|instantiator| thunk(instantiator()), true)
91 }
92
93 pub fn with_instantiator<T>(
101 &self,
102 thunk: impl FnOnce(&dyn Instantiator) -> T,
103 always_log: bool,
104 ) -> T {
105 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
106 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
107 thunk(
108 &(|| CompiledSimInstance {
109 func: func.clone(),
110 remaining_ports: self.external_ports.iter().cloned().collect(),
111 external_registered: self.external_registered.clone(),
112 input_ports: HashMap::new(),
113 output_ports: HashMap::new(),
114 log,
115 }),
116 )
117 }
118
119 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
130 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
131 .elements()
132 .into_iter()
133 .find(|e| {
134 !e.fn_name.starts_with("hydro_lang::sim::compiled")
135 && !e.fn_name.starts_with("hydro_lang::sim::flow")
136 && !e.fn_name.starts_with("fuzz<")
137 })
138 .unwrap();
139
140 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
141 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
142
143 let caller_fuzz_repro_path = repro_folder
144 .join(caller_fn.fn_name.replace("::", "__"))
145 .with_extension("bin");
146
147 if std::env::var("BOLERO_FUZZER").is_ok() {
148 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
149 std::fs::create_dir_all(&corpus_dir).unwrap();
150 let libfuzzer_args = format!(
151 "{} {} -artifact_prefix={}/ -handle_abrt=0",
152 corpus_dir.to_str().unwrap(),
153 corpus_dir.to_str().unwrap(),
154 corpus_dir.to_str().unwrap(),
155 );
156
157 std::fs::create_dir_all(&repro_folder).unwrap();
158
159 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
160 unsafe {
161 std::env::set_var(
162 "BOLERO_FAILURE_OUTPUT",
163 caller_fuzz_repro_path.to_str().unwrap(),
164 );
165 }
166 }
167
168 unsafe {
169 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
170 }
171
172 self.with_instantiator(
173 |instantiator| {
174 bolero::test(bolero::TargetLocation {
175 package_name: "",
176 manifest_dir: "",
177 module_path: "",
178 file: "",
179 line: 0,
180 item_path: "<unknown>::__bolero_item_path__",
181 test_name: None,
182 })
183 .run_with_replay(move |is_replay| {
184 let mut instance = instantiator();
185
186 if instance.log {
187 eprintln!(
188 "{}",
189 "\n==== New Simulation Instance ===="
190 .color(colored::Color::Cyan)
191 .bold()
192 );
193 }
194
195 if is_replay {
196 instance.log = true;
197 }
198
199 tokio::runtime::Builder::new_current_thread()
200 .build()
201 .unwrap()
202 .block_on(async { instance.run(&mut thunk).await })
203 })
204 },
205 false,
206 );
207 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
208 self.fuzz_repro(existing_bytes, async |compiled| {
209 compiled.launch();
210 thunk().await
211 });
212 } else {
213 eprintln!(
214 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
215 caller_fuzz_repro_path.display()
216 );
217 self.with_instantiator(
218 |instantiator| {
219 bolero::test(bolero::TargetLocation {
220 package_name: "",
221 manifest_dir: "",
222 module_path: "",
223 file: ".",
224 line: 0,
225 item_path: "<unknown>::__bolero_item_path__",
226 test_name: None,
227 })
228 .with_iterations(8192)
229 .run(move || {
230 let instance = instantiator();
231 tokio::runtime::Builder::new_current_thread()
232 .build()
233 .unwrap()
234 .block_on(async { instance.run(&mut thunk).await })
235 })
236 },
237 false,
238 );
239 }
240 }
241
242 pub fn fuzz_repro<'a>(
246 &'a self,
247 bytes: Vec<u8>,
248 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
249 ) {
250 self.with_instance(|instance| {
251 bolero::bolero_engine::any::scope::with(
252 Box::new(bolero::bolero_engine::driver::object::Object(
253 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
254 )),
255 || {
256 tokio::runtime::Builder::new_current_thread()
257 .build()
258 .unwrap()
259 .block_on(async { instance.run_without_launching(thunk).await })
260 },
261 )
262 });
263 }
264
265 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
276 if std::env::var("BOLERO_FUZZER").is_ok() {
277 eprintln!(
278 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
279 );
280 std::process::abort();
281 }
282
283 let mut count = 0;
284 let count_mut = &mut count;
285
286 self.with_instantiator(
287 |instantiator| {
288 bolero::test(bolero::TargetLocation {
289 package_name: "",
290 manifest_dir: "",
291 module_path: "",
292 file: "",
293 line: 0,
294 item_path: "<unknown>::__bolero_item_path__",
295 test_name: None,
296 })
297 .exhaustive()
298 .run_with_replay(move |is_replay| {
299 *count_mut += 1;
300
301 let mut instance = instantiator();
302 if instance.log {
303 eprintln!(
304 "{}",
305 "\n==== New Simulation Instance ===="
306 .color(colored::Color::Cyan)
307 .bold()
308 );
309 }
310
311 if is_replay {
312 instance.log = true;
313 }
314
315 tokio::runtime::Builder::new_current_thread()
316 .build()
317 .unwrap()
318 .block_on(async { instance.run(&mut thunk).await })
319 })
320 },
321 false,
322 );
323
324 count
325 }
326}
327
328pub struct CompiledSimInstance<'a> {
331 func: SimLoaded<'a>,
332 remaining_ports: HashSet<usize>,
333 external_registered: HashMap<usize, usize>,
334 output_ports: HashMap<usize, UnboundedSender<Bytes>>,
335 input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
336 log: bool,
337}
338
339impl<'a> CompiledSimInstance<'a> {
340 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
341 self.run_without_launching(async |instance| {
342 instance.launch();
343 thunk().await;
344 })
345 .await;
346 }
347
348 async fn run_without_launching(
349 mut self,
350 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
351 ) {
352 let mut input_senders = HashMap::new();
353 let mut output_receivers = HashMap::new();
354 for remaining in &self.remaining_ports {
355 {
356 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
357 self.output_ports.insert(*remaining, sender);
358 output_receivers.insert(
359 *remaining,
360 Rc::new(Mutex::new(UnboundedReceiverStream::new(
361 receiver.into_inner(),
362 ))),
363 );
364 }
365
366 {
367 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
368 self.input_ports.insert(*remaining, receiver);
369 input_senders.insert(*remaining, Rc::new(sender));
370 }
371 }
372
373 let local_set = tokio::task::LocalSet::new();
374 local_set
375 .run_until(CURRENT_SIM_CONNECTIONS.scope(
376 RefCell::new(SimConnections {
377 input_senders,
378 output_receivers,
379 external_registered: self.external_registered.clone(),
380 }),
381 async move {
382 thunk(self).await;
383 },
384 ))
385 .await;
386 }
387
388 fn launch(self) {
391 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
392 }
393
394 pub fn schedule_with_logger<W: std::io::Write>(
397 self,
398 log_writer: W,
399 ) -> impl use<W> + Future<Output = ()> {
400 self.schedule_with_maybe_logger(Some(log_writer))
401 }
402
403 fn schedule_with_maybe_logger<W: std::io::Write>(
404 self,
405 log_override: Option<W>,
406 ) -> impl use<W> + Future<Output = ()> {
407 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = unsafe {
408 (self.func)(
409 colored::control::SHOULD_COLORIZE.should_colorize(),
410 self.output_ports,
411 self.input_ports,
412 if self.log {
413 println_handler
414 } else {
415 null_handler
416 },
417 if self.log {
418 eprintln_handler
419 } else {
420 null_handler
421 },
422 )
423 };
424 let mut launched = LaunchedSim {
425 async_dfirs: async_dfirs
426 .into_iter()
427 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
428 .collect(),
429 possibly_ready_ticks: vec![],
430 not_ready_ticks: tick_dfirs
431 .into_iter()
432 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
433 .collect(),
434 hooks: hooks
435 .into_iter()
436 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
437 .collect(),
438 inline_hooks: inline_hooks
439 .into_iter()
440 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
441 .collect(),
442 log: if self.log {
443 if let Some(w) = log_override {
444 LogKind::Custom(w)
445 } else {
446 LogKind::Stderr
447 }
448 } else {
449 LogKind::Null
450 },
451 };
452
453 async move { launched.scheduler().await }
454 }
455}
456
457impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
458 fn clone(&self) -> Self {
459 *self
460 }
461}
462
463impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
464
465impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
466 async fn with_stream<Out>(
467 &self,
468 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
469 ) -> Out {
470 let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
471 let connections = &mut *connections.borrow_mut();
472 connections
473 .output_receivers
474 .get(connections.external_registered.get(&self.0).unwrap())
475 .unwrap()
476 .clone()
477 });
478
479 let mut receiver_stream = receiver.lock().await;
480 thunk(&mut pin!(
481 &mut receiver_stream
482 .by_ref()
483 .map(|b| bincode::deserialize(&b).unwrap())
484 ))
485 .await
486 }
487
488 pub fn assert_no_more(self) -> impl Future<Output = ()>
490 where
491 T: Debug,
492 {
493 FutureTrackingCaller {
494 future: async move {
495 self.with_stream(async |stream| {
496 if let Some(next) = stream.next().await {
497 Err(format!(
498 "Stream yielded unexpected message: {:?}, expected termination",
499 next
500 ))?;
501 }
502 Ok(())
503 })
504 .await
505 },
506 }
507 }
508}
509
510impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
511 pub async fn next(&self) -> Option<T> {
514 self.with_stream(async |stream| stream.next().await).await
515 }
516
517 pub async fn collect<C: Default + Extend<T>>(self) -> C {
520 self.with_stream(async |stream| stream.collect().await)
521 .await
522 }
523
524 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
527 &self,
528 expected: I,
529 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
530 where
531 T: Debug + PartialEq<T2>,
532 {
533 FutureTrackingCaller {
534 future: async {
535 let mut expected: VecDeque<T2> = expected.into_iter().collect();
536
537 while !expected.is_empty() {
538 if let Some(next) = self.next().await {
539 let next_expected = expected.pop_front().unwrap();
540 if next != next_expected {
541 Err(format!(
542 "Stream yielded unexpected message: {:?}, expected: {:?}",
543 next, next_expected
544 ))?
545 }
546 } else {
547 Err(format!(
548 "Stream ended early, still expected: {:?}",
549 expected
550 ))?;
551 }
552 }
553
554 Ok(())
555 },
556 }
557 }
558
559 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
562 &self,
563 expected: I,
564 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
565 where
566 T: Debug + PartialEq<T2>,
567 {
568 ChainedFuture {
569 first: self.assert_yields(expected),
570 second: self.assert_no_more(),
571 first_done: false,
572 }
573 }
574}
575
576pin_project_lite::pin_project! {
577 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
587 #[pin]
588 future: F,
589 }
590}
591
592impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
593 type Output = ();
594
595 #[track_caller]
596 fn poll(
597 mut self: Pin<&mut Self>,
598 cx: &mut std::task::Context<'_>,
599 ) -> std::task::Poll<Self::Output> {
600 match ready!(self.as_mut().project().future.poll(cx)) {
601 Ok(()) => std::task::Poll::Ready(()),
602 Err(e) => panic!("{}", e),
603 }
604 }
605}
606
607pin_project_lite::pin_project! {
608 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
612 #[pin]
613 first: F1,
614 #[pin]
615 second: F2,
616 first_done: bool,
617 }
618}
619
620impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
621 type Output = ();
622
623 #[track_caller]
624 fn poll(
625 mut self: Pin<&mut Self>,
626 cx: &mut std::task::Context<'_>,
627 ) -> std::task::Poll<Self::Output> {
628 if !self.first_done {
629 ready!(self.as_mut().project().first.poll(cx));
630 *self.as_mut().project().first_done = true;
631 }
632
633 self.as_mut().project().second.poll(cx)
634 }
635}
636
637impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
638 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
641 where
642 T: Ord,
643 {
644 self.with_stream(async |stream| {
645 let mut collected: C = stream.collect().await;
646 collected.as_mut().sort();
647 collected
648 })
649 .await
650 }
651
652 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
655 &self,
656 expected: I,
657 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
658 where
659 T: Debug + PartialEq<T2>,
660 {
661 FutureTrackingCaller {
662 future: async {
663 self.with_stream(async |stream| {
664 let mut expected: Vec<T2> = expected.into_iter().collect();
665
666 while !expected.is_empty() {
667 if let Some(next) = stream.next().await {
668 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
669 if let Some((i, _)) = idx {
670 expected.swap_remove(i);
671 } else {
672 Err(format!("Stream yielded unexpected message: {:?}", next))?
673 }
674 } else {
675 Err(format!(
676 "Stream ended early, still expected: {:?}",
677 expected
678 ))?
679 }
680 }
681
682 Ok(())
683 })
684 .await
685 },
686 }
687 }
688
689 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
692 &self,
693 expected: I,
694 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
695 where
696 T: Debug + PartialEq<T2>,
697 {
698 ChainedFuture {
699 first: self.assert_yields_unordered(expected),
700 second: self.assert_no_more(),
701 first_done: false,
702 }
703 }
704}
705
706impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
707 fn with_sink<Out>(
708 &self,
709 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
710 ) -> Out {
711 let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
712 let connections = &mut *connections.borrow_mut();
713 connections
714 .input_senders
715 .get(connections.external_registered.get(&self.0).unwrap())
716 .unwrap()
717 .clone()
718 });
719
720 thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
721 }
722}
723
724impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
725 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
728 self.with_sink(|send| {
729 for t in iter {
730 send(t).unwrap();
731 }
732 })
733 }
734}
735
736impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
737 pub fn send(&self, t: T) {
740 self.with_sink(|send| send(t)).unwrap();
741 }
742
743 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
746 self.with_sink(|send| {
747 for t in iter {
748 send(t).unwrap();
749 }
750 })
751 }
752}
753
754enum LogKind<W: std::io::Write> {
755 Null,
756 Stderr,
757 Custom(W),
758}
759
760impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
762 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
763 match self {
764 LogKind::Null => Ok(()),
765 LogKind::Stderr => {
766 eprint!("{}", s);
767 Ok(())
768 }
769 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
770 }
771 }
772}
773
774struct LaunchedSim<W: std::io::Write> {
777 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
778 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
779 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
780 hooks: Hooks<LocationId>,
781 inline_hooks: InlineHooks<LocationId>,
782 log: LogKind<W>,
783}
784
785impl<W: std::io::Write> LaunchedSim<W> {
786 async fn scheduler(&mut self) {
787 loop {
788 tokio::task::yield_now().await;
789 let mut any_made_progress = false;
790 for (loc, c_id, dfir) in &mut self.async_dfirs {
791 if dfir.run_tick().await {
792 any_made_progress = true;
793 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
794 .not_ready_ticks
795 .drain(..)
796 .partition(|(tick_loc, tick_c_id, _)| {
797 let LocationId::Tick(_, outer) = tick_loc else {
798 unreachable!()
799 };
800 outer.as_ref() == loc && tick_c_id == c_id
801 });
802
803 self.possibly_ready_ticks.extend(now_ready);
804 self.not_ready_ticks.extend(still_not_ready);
805 }
806 }
807
808 if any_made_progress {
809 continue;
810 } else {
811 use bolero::generator::*;
812
813 let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
814 .possibly_ready_ticks
815 .drain(..)
816 .partition(|(name, cid, _)| {
817 self.hooks
818 .get(&(name.clone(), *cid))
819 .unwrap()
820 .iter()
821 .any(|hook| {
822 hook.current_decision().unwrap_or(false)
823 || hook.can_make_nontrivial_decision()
824 })
825 });
826
827 self.possibly_ready_ticks = ready;
828 self.not_ready_ticks.append(&mut not_ready);
829
830 if self.possibly_ready_ticks.is_empty() {
831 break;
832 } else {
833 let next_tick = (0..self.possibly_ready_ticks.len()).any();
834 let mut removed = self.possibly_ready_ticks.remove(next_tick);
835
836 match &mut self.log {
837 LogKind::Null => {}
838 LogKind::Stderr => {
839 if let Some(cid) = &removed.1 {
840 eprintln!(
841 "\n{}",
842 format!("Running Tick (Cluster Member {})", cid)
843 .color(colored::Color::Magenta)
844 .bold()
845 )
846 } else {
847 eprintln!(
848 "\n{}",
849 "Running Tick".color(colored::Color::Magenta).bold()
850 )
851 }
852 }
853 LogKind::Custom(writer) => {
854 writeln!(
855 writer,
856 "\n{}",
857 "Running Tick".color(colored::Color::Magenta).bold()
858 )
859 .unwrap();
860 }
861 }
862
863 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
864 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
865 write.write_str(" ")
866 };
867
868 let mut tick_decision_writer =
869 indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
870 inserter: &mut asterisk_indenter,
871 });
872
873 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
874 let mut remaining_decision_count = hooks.len();
875 let mut made_nontrivial_decision = false;
876
877 bolero_generator::any::scope::borrow_with(|driver| {
878 hooks.iter_mut().for_each(|hook| {
880 if let Some(is_nontrivial) = hook.current_decision() {
881 made_nontrivial_decision |= is_nontrivial;
882 remaining_decision_count -= 1;
883 } else if !hook.can_make_nontrivial_decision() {
884 hook.autonomous_decision(driver, false);
888 remaining_decision_count -= 1;
889 }
890 });
891
892 hooks.iter_mut().for_each(|hook| {
893 if hook.current_decision().is_none() {
894 made_nontrivial_decision |= hook.autonomous_decision(
895 driver,
896 !made_nontrivial_decision && remaining_decision_count == 1,
897 );
898 remaining_decision_count -= 1;
899 }
900
901 hook.release_decision(&mut tick_decision_writer);
902 });
903 });
904
905 let run_tick_future = removed.2.run_tick();
906 if let Some(inline_hooks) =
907 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
908 {
909 let mut run_tick_future_pinned = pin!(run_tick_future);
910
911 loop {
912 tokio::select! {
913 biased;
914 r = &mut run_tick_future_pinned => {
915 assert!(r);
916 break;
917 }
918 _ = async {} => {
919 bolero_generator::any::scope::borrow_with(|driver| {
920 for hook in inline_hooks.iter_mut() {
921 if hook.pending_decision() {
922 if !hook.has_decision() {
923 hook.autonomous_decision(driver);
924 }
925
926 hook.release_decision(&mut tick_decision_writer);
927 }
928 }
929 });
930 }
931 }
932 }
933 } else {
934 assert!(run_tick_future.await);
935 }
936
937 self.possibly_ready_ticks.push(removed);
938 }
939 }
940 }
941 }
942}