1use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::marker::PhantomData;
6use std::panic::RefUnwindSafe;
7use std::path::Path;
8use std::pin::Pin;
9use std::task::{Context, Poll, Waker};
10
11use bytes::Bytes;
12use colored::Colorize;
13use dfir_rs::scheduled::graph::Dfir;
14use futures::{FutureExt, Stream, StreamExt};
15use libloading::Library;
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use tempfile::TempPath;
19use tokio::sync::mpsc::UnboundedSender;
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22use super::runtime::SimHook;
23use crate::compile::deploy::ConnectableAsync;
24use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
25use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
26
27pub struct CompiledSim {
29 pub(super) _path: TempPath,
30 pub(super) lib: Library,
31 pub(super) external_ports: Vec<usize>,
32}
33
34#[sealed::sealed]
35pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
39#[sealed::sealed]
40impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
41
42fn null_handler(_args: fmt::Arguments) {}
43
44fn println_handler(args: fmt::Arguments) {
45 println!("{}", args);
46}
47
48fn eprintln_handler(args: fmt::Arguments) {
49 eprintln!("{}", args);
50}
51
52type SimLoaded<'a> = libloading::Symbol<
53 'a,
54 unsafe extern "Rust" fn(
55 bool,
56 HashMap<usize, UnboundedSender<Bytes>>,
57 HashMap<usize, UnboundedReceiverStream<Bytes>>,
58 fn(fmt::Arguments<'_>),
59 fn(fmt::Arguments<'_>),
60 ) -> (
61 Dfir<'static>,
62 Vec<(&'static str, Dfir<'static>)>,
63 HashMap<&'static str, Vec<Box<dyn SimHook>>>,
64 ),
65>;
66
67impl CompiledSim {
68 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
70 self.with_instantiator(|instantiator| thunk(instantiator()), true)
71 }
72
73 pub fn with_instantiator<T>(
81 &self,
82 thunk: impl FnOnce(&dyn Instantiator) -> T,
83 always_log: bool,
84 ) -> T {
85 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
86 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
87 thunk(
88 &(|| CompiledSimInstance {
89 func: func.clone(),
90 remaining_ports: self.external_ports.iter().cloned().collect(),
91 input_ports: HashMap::new(),
92 output_ports: HashMap::new(),
93 log,
94 }),
95 )
96 }
97
98 pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
109 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
110 .elements()
111 .into_iter()
112 .find(|e| {
113 !e.fn_name.starts_with("hydro_lang::sim::compiled")
114 && !e.fn_name.starts_with("hydro_lang::sim::flow")
115 && !e.fn_name.starts_with("fuzz<")
116 })
117 .unwrap();
118
119 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
120 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
121
122 let caller_fuzz_repro_path = repro_folder
123 .join(caller_fn.fn_name.replace("::", "__"))
124 .with_extension("bin");
125
126 if std::env::var("BOLERO_FUZZER").is_ok() {
127 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
128 std::fs::create_dir_all(&corpus_dir).unwrap();
129 let libfuzzer_args = format!(
130 "{} {} -artifact_prefix={}/ -handle_abrt=0",
131 corpus_dir.to_str().unwrap(),
132 corpus_dir.to_str().unwrap(),
133 corpus_dir.to_str().unwrap(),
134 );
135
136 std::fs::create_dir_all(&repro_folder).unwrap();
137
138 unsafe {
139 std::env::set_var(
140 "BOLERO_FAILURE_OUTPUT",
141 caller_fuzz_repro_path.to_str().unwrap(),
142 );
143
144 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
145 }
146
147 self.with_instantiator(
148 |instantiator| {
149 bolero::test(bolero::TargetLocation {
150 package_name: "",
151 manifest_dir: "",
152 module_path: "",
153 file: "",
154 line: 0,
155 item_path: "<unknown>::__bolero_item_path__",
156 test_name: None,
157 })
158 .run_with_replay(move |is_replay| {
159 let mut instance = instantiator();
160
161 if instance.log {
162 eprintln!(
163 "{}",
164 "\n==== New Simulation Instance ====\n"
165 .color(colored::Color::Cyan)
166 .bold()
167 );
168 }
169
170 if is_replay {
171 instance.log = true;
172 }
173
174 tokio::runtime::Builder::new_current_thread()
175 .build()
176 .unwrap()
177 .block_on(async {
178 let local_set = tokio::task::LocalSet::new();
179 local_set.run_until(thunk(instance)).await
180 })
181 })
182 },
183 false,
184 );
185 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
186 self.fuzz_repro(existing_bytes, thunk);
187 } else {
188 eprintln!(
189 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
190 caller_fuzz_repro_path.display()
191 );
192 self.with_instantiator(
193 |instantiator| {
194 bolero::test(bolero::TargetLocation {
195 package_name: "",
196 manifest_dir: "",
197 module_path: "",
198 file: ".",
199 line: 0,
200 item_path: "<unknown>::__bolero_item_path__",
201 test_name: None,
202 })
203 .with_iterations(8192)
204 .run(move || {
205 let instance = instantiator();
206 tokio::runtime::Builder::new_current_thread()
207 .build()
208 .unwrap()
209 .block_on(async {
210 let local_set = tokio::task::LocalSet::new();
211 local_set.run_until(thunk(instance)).await
212 })
213 })
214 },
215 false,
216 );
217 }
218 }
219
220 pub fn fuzz_repro<'a>(
224 &'a self,
225 bytes: Vec<u8>,
226 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
227 ) {
228 self.with_instance(|instance| {
229 bolero::bolero_engine::any::scope::with(
230 Box::new(bolero::bolero_engine::driver::object::Object(
231 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
232 )),
233 || {
234 tokio::runtime::Builder::new_current_thread()
235 .build()
236 .unwrap()
237 .block_on(async {
238 let local_set = tokio::task::LocalSet::new();
239 local_set.run_until(thunk(instance)).await
240 })
241 },
242 )
243 });
244 }
245
246 pub fn exhaustive<'a>(
257 &'a self,
258 thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe,
259 ) -> usize {
260 if std::env::var("BOLERO_FUZZER").is_ok() {
261 eprintln!(
262 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
263 );
264 std::process::abort();
265 }
266
267 let mut count = 0;
268 let count_mut = &mut count;
269
270 self.with_instantiator(
271 |instantiator| {
272 bolero::test(bolero::TargetLocation {
273 package_name: "",
274 manifest_dir: "",
275 module_path: "",
276 file: "",
277 line: 0,
278 item_path: "<unknown>::__bolero_item_path__",
279 test_name: None,
280 })
281 .exhaustive()
282 .run_with_replay(move |is_replay| {
283 *count_mut += 1;
284
285 let mut instance = instantiator();
286 if instance.log {
287 eprintln!(
288 "{}",
289 "\n==== New Simulation Instance ====\n"
290 .color(colored::Color::Cyan)
291 .bold()
292 );
293 }
294
295 if is_replay {
296 instance.log = true;
297 }
298
299 tokio::runtime::Builder::new_current_thread()
300 .build()
301 .unwrap()
302 .block_on(async {
303 let local_set = tokio::task::LocalSet::new();
304 local_set.run_until(thunk(instance)).await;
305 })
306 })
307 },
308 false,
309 );
310
311 count
312 }
313}
314
315pub struct CompiledSimInstance<'a> {
318 func: SimLoaded<'a>,
319 remaining_ports: HashSet<usize>,
320 output_ports: HashMap<usize, UnboundedSender<Bytes>>,
321 input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
322 log: bool,
323}
324
325impl<'a> CompiledSimInstance<'a> {
326 #[deprecated(note = "Use `connect` instead")]
327 pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
330 &mut self,
331 port: &ExternalBincodeSink<T, M, O, R>,
332 ) -> SimSender<T, O, R> {
333 self.connect(port)
334 }
335
336 #[deprecated(note = "Use `connect` instead")]
337 pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
340 &mut self,
341 port: &ExternalBincodeStream<T, O, R>,
342 ) -> SimReceiver<'a, T, O, R> {
343 self.connect(port)
344 }
345
346 pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
350 &'b mut self,
351 port: P,
352 ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
353 let mut pinned = std::pin::pin!(port.connect(self));
354 if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
355 v
356 } else {
357 panic!("Connect impl should not have used any async operations");
358 }
359 }
360
361 pub fn launch(self) {
364 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
365 }
366
367 pub fn schedule_with_logger<W: std::io::Write>(
372 self,
373 log_writer: W,
374 ) -> impl use<W> + Future<Output = ()> {
375 self.schedule_with_maybe_logger(Some(log_writer))
376 }
377
378 fn schedule_with_maybe_logger<W: std::io::Write>(
379 self,
380 log_override: Option<W>,
381 ) -> impl use<W> + Future<Output = ()> {
382 if !self.remaining_ports.is_empty() {
383 panic!(
384 "Cannot launch DFIR because some of the inputs / outputs have not been connected."
385 )
386 }
387
388 let (async_dfir, ticks, hooks) = unsafe {
389 (self.func)(
390 colored::control::SHOULD_COLORIZE.should_colorize(),
391 self.output_ports,
392 self.input_ports,
393 if self.log {
394 println_handler
395 } else {
396 null_handler
397 },
398 if self.log {
399 eprintln_handler
400 } else {
401 null_handler
402 },
403 )
404 };
405 let mut launched = LaunchedSim {
406 async_dfir,
407 possibly_ready_ticks: vec![],
408 not_ready_ticks: ticks.into_iter().collect(),
409 hooks,
410 log: if self.log {
411 if let Some(w) = log_override {
412 LogKind::Custom(w)
413 } else {
414 LogKind::Stderr
415 }
416 } else {
417 LogKind::Null
418 },
419 };
420
421 async move { launched.scheduler().await }
422 }
423}
424
425pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
427 Pin<Box<dyn Stream<Item = T> + 'a>>,
428 PhantomData<(O, R)>,
429);
430
431impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
432 pub async fn assert_no_more(mut self)
434 where
435 T: std::fmt::Debug,
436 {
437 if let Some(next) = self.0.next().await {
438 panic!("Stream yielded unexpected message: {:?}", next);
439 }
440 }
441}
442
443impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
444 pub async fn next(&mut self) -> Option<T> {
447 self.0.next().await
448 }
449
450 pub async fn collect<C: Default + Extend<T>>(self) -> C {
453 self.0.collect().await
454 }
455
456 pub async fn assert_yields(&mut self, expected: impl IntoIterator<Item = T>)
459 where
460 T: std::fmt::Debug + PartialEq,
461 {
462 let mut expected: VecDeque<T> = expected.into_iter().collect();
463
464 while !expected.is_empty() {
465 if let Some(next) = self.next().await {
466 assert_eq!(next, expected.pop_front().unwrap());
467 } else {
468 panic!("Stream ended early, still expected: {:?}", expected);
469 }
470 }
471 }
472
473 pub async fn assert_yields_only(mut self, expected: impl IntoIterator<Item = T>)
476 where
477 T: std::fmt::Debug + PartialEq,
478 {
479 self.assert_yields(expected).await;
480 self.assert_no_more().await;
481 }
482}
483
484impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
485 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
488 where
489 T: Ord,
490 {
491 let mut collected: C = self.0.collect().await;
492 collected.as_mut().sort();
493 collected
494 }
495
496 pub async fn assert_yields_unordered(&mut self, expected: impl IntoIterator<Item = T>)
499 where
500 T: std::fmt::Debug + PartialEq,
501 {
502 let mut expected: Vec<T> = expected.into_iter().collect();
503
504 while !expected.is_empty() {
505 if let Some(next) = self.0.next().await {
506 let prev_length = expected.len();
507 expected.retain(|e| e != &next);
508 if expected.len() == prev_length {
509 panic!("Stream yielded unexpected message: {:?}", next);
510 }
511 } else {
512 panic!("Stream ended early, still expected: {:?}", expected);
513 }
514 }
515 }
516
517 pub async fn assert_yields_only_unordered(mut self, expected: impl IntoIterator<Item = T>)
520 where
521 T: std::fmt::Debug + PartialEq,
522 {
523 self.assert_yields_unordered(expected).await;
524 self.assert_no_more().await;
525 }
526}
527
528impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
529 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
530{
531 type Output = SimReceiver<'a, T, O, R>;
532
533 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
534 assert!(ctx.remaining_ports.remove(&self.port_id));
535 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
536 ctx.output_ports.insert(self.port_id, sink);
537
538 SimReceiver(
539 Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
540 PhantomData,
541 )
542 }
543}
544
545pub struct SimSender<T, O: Ordering, R: Retries>(
547 Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
548 PhantomData<(O, R)>,
549);
550impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
551 pub fn send(&self, t: T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
554 (self.0)(t)
555 }
556
557 pub fn send_many<I: IntoIterator<Item = T>>(
560 &self,
561 iter: I,
562 ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
563 for t in iter {
564 (self.0)(t)?;
565 }
566 Ok(())
567 }
568}
569
570impl<T> SimSender<T, NoOrder, ExactlyOnce> {
571 pub fn send_many_unordered<I: IntoIterator<Item = T>>(
574 &self,
575 iter: I,
576 ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
577 for t in iter {
578 (self.0)(t)?;
579 }
580 Ok(())
581 }
582}
583
584impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
585 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
586{
587 type Output = SimSender<T, O, R>;
588
589 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
590 assert!(ctx.remaining_ports.remove(&self.port_id));
591 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
592 ctx.input_ports.insert(self.port_id, source);
593 SimSender(
594 Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
595 PhantomData,
596 )
597 }
598}
599
600enum LogKind<W: std::io::Write> {
601 Null,
602 Stderr,
603 Custom(W),
604}
605
606impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
608 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
609 match self {
610 LogKind::Null => Ok(()),
611 LogKind::Stderr => {
612 eprint!("{}", s);
613 Ok(())
614 }
615 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
616 }
617 }
618}
619struct LaunchedSim<W: std::io::Write> {
622 async_dfir: Dfir<'static>,
623 possibly_ready_ticks: Vec<(&'static str, Dfir<'static>)>,
624 not_ready_ticks: Vec<(&'static str, Dfir<'static>)>,
625 hooks: HashMap<&'static str, Vec<Box<dyn SimHook>>>,
626 log: LogKind<W>,
627}
628
629impl<W: std::io::Write> LaunchedSim<W> {
630 async fn scheduler(&mut self) {
631 loop {
632 tokio::task::yield_now().await;
633 if self.async_dfir.run_available().await {
634 self.possibly_ready_ticks.append(&mut self.not_ready_ticks);
635 continue;
636 } else {
637 use bolero::generator::*;
638
639 let (ready, mut not_ready): (Vec<_>, Vec<_>) =
640 self.possibly_ready_ticks.drain(..).partition(|(name, _)| {
641 self.hooks.get(name).unwrap().iter().any(|hook| {
642 hook.current_decision().unwrap_or(false)
643 || hook.can_make_nontrivial_decision()
644 })
645 });
646
647 self.possibly_ready_ticks = ready;
648 self.not_ready_ticks.append(&mut not_ready);
649
650 if self.possibly_ready_ticks.is_empty() {
651 break;
652 } else {
653 let next_tick = (0..self.possibly_ready_ticks.len()).any();
654 let mut removed: (&'static str, Dfir<'static>) =
655 self.possibly_ready_ticks.remove(next_tick);
656
657 match &mut self.log {
658 LogKind::Null => {}
659 LogKind::Stderr => {
660 eprintln!("\n{}", "Running Tick".color(colored::Color::Magenta).bold())
661 }
662 LogKind::Custom(writer) => {
663 writeln!(
664 writer,
665 "\n{}",
666 "Running Tick".color(colored::Color::Magenta).bold()
667 )
668 .unwrap();
669 }
670 }
671
672 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
673 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
674 write.write_str(" ")
675 };
676
677 let mut tick_decision_writer =
678 indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
679 inserter: &mut asterisk_indenter,
680 });
681
682 let hooks = self.hooks.get_mut(removed.0).unwrap();
683 let mut remaining_decision_count = hooks.len();
684 let mut made_nontrivial_decision = false;
685
686 bolero_generator::any::scope::borrow_with(|driver| {
687 hooks.iter_mut().for_each(|hook| {
689 if let Some(is_nontrivial) = hook.current_decision() {
690 made_nontrivial_decision |= is_nontrivial;
691 remaining_decision_count -= 1;
692 } else if !hook.can_make_nontrivial_decision() {
693 hook.autonomous_decision(driver, false);
697 remaining_decision_count -= 1;
698 }
699 });
700
701 hooks.iter_mut().for_each(|hook| {
702 if hook.current_decision().is_none() {
703 made_nontrivial_decision |= hook.autonomous_decision(
704 driver,
705 !made_nontrivial_decision && remaining_decision_count == 1,
706 );
707 remaining_decision_count -= 1;
708 }
709
710 hook.release_decision(&mut tick_decision_writer);
711 });
712 });
713
714 assert!(removed.1.run_tick().await);
715 self.possibly_ready_ticks.push(removed);
716 }
717 }
718 }
719 }
720}