1use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31
32struct SimConnections {
33 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
34 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
35 external_registered: HashMap<ExternalPortId, SimExternalPort>,
36}
37
38tokio::task_local! {
39 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
40}
41
42pub struct CompiledSim {
44 pub(super) _path: TempPath,
45 pub(super) lib: Library,
46 pub(super) externals_port_registry: SimExternalPortRegistry,
47}
48
49#[sealed::sealed]
50pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
54#[sealed::sealed]
55impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
56
57fn null_handler(_args: fmt::Arguments) {}
58
59fn println_handler(args: fmt::Arguments) {
60 println!("{}", args);
61}
62
63fn eprintln_handler(args: fmt::Arguments) {
64 eprintln!("{}", args);
65}
66
67type SimLoaded<'a> = libloading::Symbol<
73 'a,
74 unsafe extern "Rust" fn(
75 should_color: bool,
76 external_out: HashMap<usize, UnboundedSender<Bytes>>,
78 external_in: HashMap<usize, UnboundedReceiverStream<Bytes>>,
80 println_handler: fn(fmt::Arguments<'_>),
81 eprintln_handler: fn(fmt::Arguments<'_>),
82 ) -> (
83 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
84 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
85 Hooks<&'static str>,
86 InlineHooks<&'static str>,
87 ),
88>;
89
90impl CompiledSim {
91 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
93 self.with_instantiator(|instantiator| thunk(instantiator()), true)
94 }
95
96 pub fn with_instantiator<T>(
104 &self,
105 thunk: impl FnOnce(&dyn Instantiator) -> T,
106 always_log: bool,
107 ) -> T {
108 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
109 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
110 thunk(
111 &(|| CompiledSimInstance {
112 func: func.clone(),
113 externals_port_registry: self.externals_port_registry.clone(),
114 input_ports: HashMap::new(),
115 output_ports: HashMap::new(),
116 log,
117 }),
118 )
119 }
120
121 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
132 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
133 .elements()
134 .into_iter()
135 .find(|e| {
136 !e.fn_name.starts_with("hydro_lang::sim::compiled")
137 && !e.fn_name.starts_with("hydro_lang::sim::flow")
138 && !e.fn_name.starts_with("fuzz<")
139 && !e.fn_name.starts_with("<hydro_lang::sim")
140 })
141 .unwrap();
142
143 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
144 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
145
146 let caller_fuzz_repro_path = repro_folder
147 .join(caller_fn.fn_name.replace("::", "__"))
148 .with_extension("bin");
149
150 if std::env::var("BOLERO_FUZZER").is_ok() {
151 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
152 std::fs::create_dir_all(&corpus_dir).unwrap();
153 let libfuzzer_args = format!(
154 "{} {} -artifact_prefix={}/ -handle_abrt=0",
155 corpus_dir.to_str().unwrap(),
156 corpus_dir.to_str().unwrap(),
157 corpus_dir.to_str().unwrap(),
158 );
159
160 std::fs::create_dir_all(&repro_folder).unwrap();
161
162 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
163 unsafe {
164 std::env::set_var(
165 "BOLERO_FAILURE_OUTPUT",
166 caller_fuzz_repro_path.to_str().unwrap(),
167 );
168 }
169 }
170
171 unsafe {
172 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
173 }
174
175 self.with_instantiator(
176 |instantiator| {
177 bolero::test(bolero::TargetLocation {
178 package_name: "",
179 manifest_dir: "",
180 module_path: "",
181 file: "",
182 line: 0,
183 item_path: "<unknown>::__bolero_item_path__",
184 test_name: None,
185 })
186 .run_with_replay(move |is_replay| {
187 let mut instance = instantiator();
188
189 if instance.log {
190 eprintln!(
191 "{}",
192 "\n==== New Simulation Instance ===="
193 .color(colored::Color::Cyan)
194 .bold()
195 );
196 }
197
198 if is_replay {
199 instance.log = true;
200 }
201
202 tokio::runtime::Builder::new_current_thread()
203 .build()
204 .unwrap()
205 .block_on(async { instance.run(&mut thunk).await })
206 })
207 },
208 false,
209 );
210 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
211 self.fuzz_repro(existing_bytes, async |compiled| {
212 compiled.launch();
213 thunk().await
214 });
215 } else {
216 eprintln!(
217 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
218 caller_fuzz_repro_path.display()
219 );
220 self.with_instantiator(
221 |instantiator| {
222 bolero::test(bolero::TargetLocation {
223 package_name: "",
224 manifest_dir: "",
225 module_path: "",
226 file: ".",
227 line: 0,
228 item_path: "<unknown>::__bolero_item_path__",
229 test_name: None,
230 })
231 .with_iterations(8192)
232 .run(move || {
233 let instance = instantiator();
234 tokio::runtime::Builder::new_current_thread()
235 .build()
236 .unwrap()
237 .block_on(async { instance.run(&mut thunk).await })
238 })
239 },
240 false,
241 );
242 }
243 }
244
245 pub fn fuzz_repro<'a>(
249 &'a self,
250 bytes: Vec<u8>,
251 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
252 ) {
253 self.with_instance(|instance| {
254 bolero::bolero_engine::any::scope::with(
255 Box::new(bolero::bolero_engine::driver::object::Object(
256 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
257 )),
258 || {
259 tokio::runtime::Builder::new_current_thread()
260 .build()
261 .unwrap()
262 .block_on(async { instance.run_without_launching(thunk).await })
263 },
264 )
265 });
266 }
267
268 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
279 if std::env::var("BOLERO_FUZZER").is_ok() {
280 eprintln!(
281 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
282 );
283 std::process::abort();
284 }
285
286 let mut count = 0;
287 let count_mut = &mut count;
288
289 self.with_instantiator(
290 |instantiator| {
291 bolero::test(bolero::TargetLocation {
292 package_name: "",
293 manifest_dir: "",
294 module_path: "",
295 file: "",
296 line: 0,
297 item_path: "<unknown>::__bolero_item_path__",
298 test_name: None,
299 })
300 .exhaustive()
301 .run_with_replay(move |is_replay| {
302 *count_mut += 1;
303
304 let mut instance = instantiator();
305 if instance.log {
306 eprintln!(
307 "{}",
308 "\n==== New Simulation Instance ===="
309 .color(colored::Color::Cyan)
310 .bold()
311 );
312 }
313
314 if is_replay {
315 instance.log = true;
316 }
317
318 tokio::runtime::Builder::new_current_thread()
319 .build()
320 .unwrap()
321 .block_on(async { instance.run(&mut thunk).await })
322 })
323 },
324 false,
325 );
326
327 count
328 }
329}
330
331pub struct CompiledSimInstance<'a> {
334 func: SimLoaded<'a>,
335 externals_port_registry: SimExternalPortRegistry,
336 output_ports: HashMap<SimExternalPort, UnboundedSender<Bytes>>,
337 input_ports: HashMap<SimExternalPort, UnboundedReceiverStream<Bytes>>,
338 log: bool,
339}
340
341impl<'a> CompiledSimInstance<'a> {
342 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
343 self.run_without_launching(async |instance| {
344 instance.launch();
345 thunk().await;
346 })
347 .await;
348 }
349
350 async fn run_without_launching(
351 mut self,
352 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
353 ) {
354 let mut input_senders = HashMap::new();
355 let mut output_receivers = HashMap::new();
356 for registered_port in self.externals_port_registry.port_counter.range_up_to() {
357 {
358 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
359 self.output_ports.insert(registered_port, sender);
360 output_receivers.insert(
361 registered_port,
362 Rc::new(Mutex::new(UnboundedReceiverStream::new(
363 receiver.into_inner(),
364 ))),
365 );
366 }
367
368 {
369 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
370 self.input_ports.insert(registered_port, receiver);
371 input_senders.insert(registered_port, Rc::new(sender));
372 }
373 }
374
375 let local_set = tokio::task::LocalSet::new();
376 local_set
377 .run_until(CURRENT_SIM_CONNECTIONS.scope(
378 RefCell::new(SimConnections {
379 input_senders,
380 output_receivers,
381 external_registered: self.externals_port_registry.registered.clone(),
382 }),
383 async move {
384 thunk(self).await;
385 },
386 ))
387 .await;
388 }
389
390 fn launch(self) {
393 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
394 }
395
396 pub fn schedule_with_logger<W: std::io::Write>(
399 self,
400 log_writer: W,
401 ) -> impl use<W> + Future<Output = ()> {
402 self.schedule_with_maybe_logger(Some(log_writer))
403 }
404
405 fn schedule_with_maybe_logger<W: std::io::Write>(
406 self,
407 log_override: Option<W>,
408 ) -> impl use<W> + Future<Output = ()> {
409 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = unsafe {
410 (self.func)(
411 colored::control::SHOULD_COLORIZE.should_colorize(),
412 self.output_ports
413 .into_iter()
414 .map(|(k, v)| (k.into_inner(), v))
415 .collect(),
416 self.input_ports
417 .into_iter()
418 .map(|(k, v)| (k.into_inner(), v))
419 .collect(),
420 if self.log {
421 println_handler
422 } else {
423 null_handler
424 },
425 if self.log {
426 eprintln_handler
427 } else {
428 null_handler
429 },
430 )
431 };
432 let mut launched = LaunchedSim {
433 async_dfirs: async_dfirs
434 .into_iter()
435 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
436 .collect(),
437 possibly_ready_ticks: vec![],
438 not_ready_ticks: tick_dfirs
439 .into_iter()
440 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
441 .collect(),
442 hooks: hooks
443 .into_iter()
444 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
445 .collect(),
446 inline_hooks: inline_hooks
447 .into_iter()
448 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
449 .collect(),
450 log: if self.log {
451 if let Some(w) = log_override {
452 LogKind::Custom(w)
453 } else {
454 LogKind::Stderr
455 }
456 } else {
457 LogKind::Null
458 },
459 };
460
461 async move { launched.scheduler().await }
462 }
463}
464
465impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
466 fn clone(&self) -> Self {
467 *self
468 }
469}
470
471impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
472
473impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
474 async fn with_stream<Out>(
475 &self,
476 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
477 ) -> Out {
478 let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
479 let connections = &mut *connections.borrow_mut();
480 connections
481 .output_receivers
482 .get(connections.external_registered.get(&self.0).unwrap())
483 .unwrap()
484 .clone()
485 });
486
487 let mut receiver_stream = receiver.lock().await;
488 thunk(&mut pin!(
489 &mut receiver_stream
490 .by_ref()
491 .map(|b| bincode::deserialize(&b).unwrap())
492 ))
493 .await
494 }
495
496 pub fn assert_no_more(self) -> impl Future<Output = ()>
498 where
499 T: Debug,
500 {
501 FutureTrackingCaller {
502 future: async move {
503 self.with_stream(async |stream| {
504 if let Some(next) = stream.next().await {
505 Err(format!(
506 "Stream yielded unexpected message: {:?}, expected termination",
507 next
508 ))?;
509 }
510 Ok(())
511 })
512 .await
513 },
514 }
515 }
516}
517
518impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
519 pub async fn next(&self) -> Option<T> {
522 self.with_stream(async |stream| stream.next().await).await
523 }
524
525 pub async fn collect<C: Default + Extend<T>>(self) -> C {
528 self.with_stream(async |stream| stream.collect().await)
529 .await
530 }
531
532 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
535 &self,
536 expected: I,
537 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
538 where
539 T: Debug + PartialEq<T2>,
540 {
541 FutureTrackingCaller {
542 future: async {
543 let mut expected: VecDeque<T2> = expected.into_iter().collect();
544
545 while !expected.is_empty() {
546 if let Some(next) = self.next().await {
547 let next_expected = expected.pop_front().unwrap();
548 if next != next_expected {
549 Err(format!(
550 "Stream yielded unexpected message: {:?}, expected: {:?}",
551 next, next_expected
552 ))?
553 }
554 } else {
555 Err(format!(
556 "Stream ended early, still expected: {:?}",
557 expected
558 ))?;
559 }
560 }
561
562 Ok(())
563 },
564 }
565 }
566
567 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
570 &self,
571 expected: I,
572 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
573 where
574 T: Debug + PartialEq<T2>,
575 {
576 ChainedFuture {
577 first: self.assert_yields(expected),
578 second: self.assert_no_more(),
579 first_done: false,
580 }
581 }
582}
583
584pin_project_lite::pin_project! {
585 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
595 #[pin]
596 future: F,
597 }
598}
599
600impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
601 type Output = ();
602
603 #[track_caller]
604 fn poll(
605 mut self: Pin<&mut Self>,
606 cx: &mut std::task::Context<'_>,
607 ) -> std::task::Poll<Self::Output> {
608 match ready!(self.as_mut().project().future.poll(cx)) {
609 Ok(()) => std::task::Poll::Ready(()),
610 Err(e) => panic!("{}", e),
611 }
612 }
613}
614
615pin_project_lite::pin_project! {
616 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
620 #[pin]
621 first: F1,
622 #[pin]
623 second: F2,
624 first_done: bool,
625 }
626}
627
628impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
629 type Output = ();
630
631 #[track_caller]
632 fn poll(
633 mut self: Pin<&mut Self>,
634 cx: &mut std::task::Context<'_>,
635 ) -> std::task::Poll<Self::Output> {
636 if !self.first_done {
637 ready!(self.as_mut().project().first.poll(cx));
638 *self.as_mut().project().first_done = true;
639 }
640
641 self.as_mut().project().second.poll(cx)
642 }
643}
644
645impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
646 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
649 where
650 T: Ord,
651 {
652 self.with_stream(async |stream| {
653 let mut collected: C = stream.collect().await;
654 collected.as_mut().sort();
655 collected
656 })
657 .await
658 }
659
660 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
663 &self,
664 expected: I,
665 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
666 where
667 T: Debug + PartialEq<T2>,
668 {
669 FutureTrackingCaller {
670 future: async {
671 self.with_stream(async |stream| {
672 let mut expected: Vec<T2> = expected.into_iter().collect();
673
674 while !expected.is_empty() {
675 if let Some(next) = stream.next().await {
676 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
677 if let Some((i, _)) = idx {
678 expected.swap_remove(i);
679 } else {
680 Err(format!("Stream yielded unexpected message: {:?}", next))?
681 }
682 } else {
683 Err(format!(
684 "Stream ended early, still expected: {:?}",
685 expected
686 ))?
687 }
688 }
689
690 Ok(())
691 })
692 .await
693 },
694 }
695 }
696
697 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
700 &self,
701 expected: I,
702 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
703 where
704 T: Debug + PartialEq<T2>,
705 {
706 ChainedFuture {
707 first: self.assert_yields_unordered(expected),
708 second: self.assert_no_more(),
709 first_done: false,
710 }
711 }
712}
713
714impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
715 fn with_sink<Out>(
716 &self,
717 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
718 ) -> Out {
719 let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
720 let connections = &mut *connections.borrow_mut();
721 connections
722 .input_senders
723 .get(connections.external_registered.get(&self.0).unwrap())
724 .unwrap()
725 .clone()
726 });
727
728 thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
729 }
730}
731
732impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
733 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
736 self.with_sink(|send| {
737 for t in iter {
738 send(t).unwrap();
739 }
740 })
741 }
742}
743
744impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
745 pub fn send(&self, t: T) {
748 self.with_sink(|send| send(t)).unwrap();
749 }
750
751 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
754 self.with_sink(|send| {
755 for t in iter {
756 send(t).unwrap();
757 }
758 })
759 }
760}
761
762enum LogKind<W: std::io::Write> {
763 Null,
764 Stderr,
765 Custom(W),
766}
767
768impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
770 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
771 match self {
772 LogKind::Null => Ok(()),
773 LogKind::Stderr => {
774 eprint!("{}", s);
775 Ok(())
776 }
777 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
778 }
779 }
780}
781
782struct LaunchedSim<W: std::io::Write> {
785 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
786 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
787 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
788 hooks: Hooks<LocationId>,
789 inline_hooks: InlineHooks<LocationId>,
790 log: LogKind<W>,
791}
792
793impl<W: std::io::Write> LaunchedSim<W> {
794 async fn scheduler(&mut self) {
795 loop {
796 tokio::task::yield_now().await;
797 let mut any_made_progress = false;
798 for (loc, c_id, dfir) in &mut self.async_dfirs {
799 if dfir.run_tick().await {
800 any_made_progress = true;
801 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
802 .not_ready_ticks
803 .drain(..)
804 .partition(|(tick_loc, tick_c_id, _)| {
805 let LocationId::Tick(_, outer) = tick_loc else {
806 unreachable!()
807 };
808 outer.as_ref() == loc && tick_c_id == c_id
809 });
810
811 self.possibly_ready_ticks.extend(now_ready);
812 self.not_ready_ticks.extend(still_not_ready);
813 }
814 }
815
816 if any_made_progress {
817 continue;
818 } else {
819 use bolero::generator::*;
820
821 let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
822 .possibly_ready_ticks
823 .drain(..)
824 .partition(|(name, cid, _)| {
825 self.hooks
826 .get(&(name.clone(), *cid))
827 .unwrap()
828 .iter()
829 .any(|hook| {
830 hook.current_decision().unwrap_or(false)
831 || hook.can_make_nontrivial_decision()
832 })
833 });
834
835 self.possibly_ready_ticks = ready;
836 self.not_ready_ticks.append(&mut not_ready);
837
838 if self.possibly_ready_ticks.is_empty() {
839 break;
840 } else {
841 let next_tick = (0..self.possibly_ready_ticks.len()).any();
842 let mut removed = self.possibly_ready_ticks.remove(next_tick);
843
844 match &mut self.log {
845 LogKind::Null => {}
846 LogKind::Stderr => {
847 if let Some(cid) = &removed.1 {
848 eprintln!(
849 "\n{}",
850 format!("Running Tick (Cluster Member {})", cid)
851 .color(colored::Color::Magenta)
852 .bold()
853 )
854 } else {
855 eprintln!(
856 "\n{}",
857 "Running Tick".color(colored::Color::Magenta).bold()
858 )
859 }
860 }
861 LogKind::Custom(writer) => {
862 writeln!(
863 writer,
864 "\n{}",
865 "Running Tick".color(colored::Color::Magenta).bold()
866 )
867 .unwrap();
868 }
869 }
870
871 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
872 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
873 write.write_str(" ")
874 };
875
876 let mut tick_decision_writer =
877 indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
878 inserter: &mut asterisk_indenter,
879 });
880
881 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
882 let mut remaining_decision_count = hooks.len();
883 let mut made_nontrivial_decision = false;
884
885 bolero_generator::any::scope::borrow_with(|driver| {
886 hooks.iter_mut().for_each(|hook| {
888 if let Some(is_nontrivial) = hook.current_decision() {
889 made_nontrivial_decision |= is_nontrivial;
890 remaining_decision_count -= 1;
891 } else if !hook.can_make_nontrivial_decision() {
892 hook.autonomous_decision(driver, false);
896 remaining_decision_count -= 1;
897 }
898 });
899
900 hooks.iter_mut().for_each(|hook| {
901 if hook.current_decision().is_none() {
902 made_nontrivial_decision |= hook.autonomous_decision(
903 driver,
904 !made_nontrivial_decision && remaining_decision_count == 1,
905 );
906 remaining_decision_count -= 1;
907 }
908
909 hook.release_decision(&mut tick_decision_writer);
910 });
911 });
912
913 let run_tick_future = removed.2.run_tick();
914 if let Some(inline_hooks) =
915 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
916 {
917 let mut run_tick_future_pinned = pin!(run_tick_future);
918
919 loop {
920 tokio::select! {
921 biased;
922 r = &mut run_tick_future_pinned => {
923 assert!(r);
924 break;
925 }
926 _ = async {} => {
927 bolero_generator::any::scope::borrow_with(|driver| {
928 for hook in inline_hooks.iter_mut() {
929 if hook.pending_decision() {
930 if !hook.has_decision() {
931 hook.autonomous_decision(driver);
932 }
933
934 hook.release_decision(&mut tick_decision_writer);
935 }
936 }
937 });
938 }
939 }
940 }
941 } else {
942 assert!(run_tick_future.await);
943 }
944
945 self.possibly_ready_ticks.push(removed);
946 }
947 }
948 }
949 }
950}