Skip to main content

hydro_lang/live_collections/sliced/
mod.rs

1//! Utilities for transforming live collections via slicing.
2
3pub mod style;
4
5use super::boundedness::{Bounded, Unbounded};
6use super::stream::{Ordering, Retries};
7use crate::location::{Location, NoTick, Tick};
8
9#[doc(hidden)]
10#[macro_export]
11macro_rules! __sliced_parse_uses__ {
12    // Parse immutable use statements with style: let name = use::style(args...);
13    (
14        @uses [$($uses:tt)*]
15        @states [$($states:tt)*]
16        let $name:ident = use:: $invocation:expr; $($rest:tt)*
17    ) => {
18        $crate::__sliced_parse_uses__!(
19            @uses [$($uses)* { $name, $invocation, $invocation }]
20            @states [$($states)*]
21            $($rest)*
22        )
23    };
24
25    // Parse immutable use statements without style: let name = use(args...);
26    (
27        @uses [$($uses:tt)*]
28        @states [$($states:tt)*]
29        let $name:ident = use($($args:expr),* $(,)?); $($rest:tt)*
30    ) => {
31        $crate::__sliced_parse_uses__!(
32            @uses [$($uses)* { $name, $crate::macro_support::copy_span::copy_span!($($args,)* default)($($args),*), $($args),* }]
33            @states [$($states)*]
34            $($rest)*
35        )
36    };
37
38    // Parse mutable state statements: let mut name = use::style::<Type>(args);
39    (
40        @uses [$($uses:tt)*]
41        @states [$($states:tt)*]
42        let mut $name:ident = use:: $style:ident $(::<$ty:ty>)? ($($args:expr)?); $($rest:tt)*
43    ) => {
44        $crate::__sliced_parse_uses__!(
45            @uses [$($uses)*]
46            @states [$($states)* { $name, $style, (($($ty)?), ($($args)?)) }]
47            $($rest)*
48        )
49    };
50
51    // Terminal case: no uses, only states
52    (
53        @uses []
54        @states [$({ $state_name:ident, $state_style:ident, $state_arg:tt })+]
55        $($body:tt)*
56    ) => {
57        {
58            // We need at least one use to get a tick, so panic if there are none
59            compile_error!("sliced! requires at least one `let name = use(...)` statement to determine the tick")
60        }
61    };
62
63    // Terminal case: uses with optional states
64    (
65        @uses [$({ $use_name:ident, $invocation:expr, $($invocation_spans:expr),* })+]
66        @states [$({ $state_name:ident, $state_style:ident, (($($state_ty:ty)?), ($($state_arg:expr)?)) })*]
67        $($body:tt)*
68    ) => {
69        {
70            use $crate::live_collections::sliced::style::*;
71            let __styled = (
72                $($invocation,)+
73            );
74
75            let __tick = $crate::live_collections::sliced::Slicable::create_tick(&__styled.0);
76            let __backtraces = {
77                use $crate::compile::ir::backtrace::__macro_get_backtrace;
78                (
79                    $($crate::macro_support::copy_span::copy_span!($($invocation_spans,)* {
80                        __macro_get_backtrace(1)
81                    }),)+
82                )
83            };
84            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces);
85            let (
86                $($use_name,)+
87            ) = __sliced;
88
89            // Create all cycles and pack handles/values into tuples
90            let (__handles, __states) = $crate::live_collections::sliced::unzip_cycles((
91                $($crate::live_collections::sliced::style::$state_style$(::<$state_ty, _>)?(& __tick, $($state_arg)?),)*
92            ));
93
94            // Unpack mutable state values
95            let (
96                $(mut $state_name,)*
97            ) = __states;
98
99            // Execute the body
100            let __body_result = {
101                $($body)*
102            };
103
104            // Re-pack the final state values and complete cycles
105            let __final_states = (
106                $($state_name,)*
107            );
108            $crate::live_collections::sliced::complete_cycles(__handles, __final_states);
109
110            // Unslice the result
111            $crate::live_collections::sliced::Unslicable::unslice(__body_result)
112        }
113    };
114}
115
116#[macro_export]
117/// Transforms a live collection with a computation relying on a slice of another live collection.
118/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
119/// collection, such as joining a stream with the latest values from a singleton.
120///
121/// # Syntax
122/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
123/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
124/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
125/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
126///
127/// ```rust,ignore
128/// let stream = sliced! {
129///     let name1 = use(collection1, nondet!(/** explanation */));
130///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
131///
132///     // arbitrary statements can follow
133///     let intermediate = name1.map(...);
134///     intermediate.cross_singleton(name2)
135/// };
136/// ```
137///
138/// # Stateful Computations
139/// The `sliced!` macro also supports stateful computations across iterations using `let mut` bindings
140/// with `use::state` or `use::state_null`. These create cycles that persist values between iterations.
141///
142/// - `use::state(|l| initial)`: Creates a cycle with an initial value. The closure receives
143///   the slice location and returns the initial state for the first iteration.
144/// - `use::state_null::<Type>()`: Creates a cycle that starts as null/empty on the first iteration.
145///
146/// The mutable binding can be reassigned in the body, and the final value will be passed to the
147/// next iteration.
148///
149/// ```rust,ignore
150/// let counter_stream = sliced! {
151///     let batch = use(input_stream, nondet!(/** explanation */));
152///     let mut counter = use::state(|l| l.singleton(q!(0)));
153///
154///     // Increment counter by the number of items in this batch
155///     let new_count = counter.clone().zip(batch.count())
156///         .map(q!(|(old, add)| old + add));
157///     counter = new_count.clone();
158///     new_count.into_stream()
159/// };
160/// ```
161macro_rules! __sliced__ {
162    ($($tt:tt)*) => {
163        $crate::__sliced_parse_uses__!(
164            @uses []
165            @states []
166            $($tt)*
167        )
168    };
169}
170
171pub use crate::__sliced__ as sliced;
172
173/// Marks this live collection as atomically-yielded, which means that the output outside
174/// `sliced` will be at an atomic location that is synchronous with respect to the body
175/// of the slice.
176pub fn yield_atomic<T>(t: T) -> style::Atomic<T> {
177    style::Atomic {
178        collection: t,
179        // yield_atomic doesn't need a nondet since it's for output, not input
180        nondet: crate::nondet::NonDet,
181    }
182}
183
184/// A trait for live collections which can be sliced into bounded versions at a tick.
185pub trait Slicable<'a, L: Location<'a>> {
186    /// The sliced version of this live collection.
187    type Slice;
188
189    /// The type of backtrace associated with this slice.
190    type Backtrace;
191
192    /// Gets the location associated with this live collection.
193    fn get_location(&self) -> &L;
194
195    /// Creates a tick that is appropriate for the collection's location.
196    fn create_tick(&self) -> Tick<L> {
197        self.get_location().try_tick().unwrap()
198    }
199
200    /// Slices this live collection at the given tick.
201    ///
202    /// # Non-Determinism
203    /// Slicing a live collection may involve non-determinism, such as choosing which messages
204    /// to include in a batch.
205    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice;
206}
207
208/// A trait for live collections which can be yielded out of a slice back into their original form.
209pub trait Unslicable {
210    /// The unsliced version of this live collection.
211    type Unsliced;
212
213    /// Unslices a sliced live collection back into its original form.
214    fn unslice(self) -> Self::Unsliced;
215}
216
217/// A trait for unzipping a tuple of (handle, state) pairs into separate tuples.
218#[doc(hidden)]
219pub trait UnzipCycles {
220    /// The tuple of cycle handles.
221    type Handles;
222    /// The tuple of state values.
223    type States;
224
225    /// Unzips the cycles into handles and states.
226    fn unzip(self) -> (Self::Handles, Self::States);
227}
228
229/// Unzips a tuple of cycles into handles and states.
230#[doc(hidden)]
231pub fn unzip_cycles<T: UnzipCycles>(cycles: T) -> (T::Handles, T::States) {
232    cycles.unzip()
233}
234
235/// A trait for completing a tuple of cycle handles with their final state values.
236#[doc(hidden)]
237pub trait CompleteCycles<States> {
238    /// Completes all cycles with the provided state values.
239    fn complete(self, states: States);
240}
241
242/// Completes a tuple of cycle handles with their final state values.
243#[doc(hidden)]
244pub fn complete_cycles<H: CompleteCycles<S>, S>(handles: H, states: S) {
245    handles.complete(states);
246}
247
248impl<'a, L: Location<'a>> Slicable<'a, L> for () {
249    type Slice = ();
250    type Backtrace = ();
251
252    fn get_location(&self) -> &L {
253        unreachable!()
254    }
255
256    fn slice(self, _tick: &Tick<L>, _backtrace: Self::Backtrace) -> Self::Slice {}
257}
258
259impl Unslicable for () {
260    type Unsliced = ();
261
262    fn unslice(self) -> Self::Unsliced {}
263}
264
265macro_rules! impl_slicable_for_tuple {
266    ($($T:ident, $T_bt:ident, $idx:tt),+) => {
267        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),+> Slicable<'a, L> for ($($T,)+) {
268            type Slice = ($($T::Slice,)+);
269            type Backtrace = ($($T::Backtrace,)+);
270
271            fn get_location(&self) -> &L {
272                self.0.get_location()
273            }
274
275            #[expect(non_snake_case, reason = "macro codegen")]
276            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice {
277                let ($($T,)+) = self;
278                let ($($T_bt,)+) = backtrace;
279                ($($T.slice(tick, $T_bt),)+)
280            }
281        }
282
283        impl<$($T: Unslicable),+> Unslicable for ($($T,)+) {
284            type Unsliced = ($($T::Unsliced,)+);
285
286            #[expect(non_snake_case, reason = "macro codegen")]
287            fn unslice(self) -> Self::Unsliced {
288                let ($($T,)+) = self;
289                ($($T.unslice(),)+)
290            }
291        }
292    };
293}
294
295#[cfg(stageleft_runtime)]
296impl_slicable_for_tuple!(S1, S1_bt, 0);
297#[cfg(stageleft_runtime)]
298impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
299#[cfg(stageleft_runtime)]
300impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
301#[cfg(stageleft_runtime)]
302impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
303#[cfg(stageleft_runtime)]
304impl_slicable_for_tuple!(
305    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
306);
307#[cfg(stageleft_runtime)]
308impl_slicable_for_tuple!(
309    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5
310);
311#[cfg(stageleft_runtime)]
312impl_slicable_for_tuple!(
313    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
314    6
315);
316#[cfg(stageleft_runtime)]
317impl_slicable_for_tuple!(
318    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
319    6, S8, S8_bt, 7
320);
321#[cfg(stageleft_runtime)]
322impl_slicable_for_tuple!(
323    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
324    6, S8, S8_bt, 7, S9, S9_bt, 8
325);
326#[cfg(stageleft_runtime)]
327impl_slicable_for_tuple!(
328    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
329    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9
330);
331#[cfg(stageleft_runtime)]
332impl_slicable_for_tuple!(
333    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
334    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10
335);
336#[cfg(stageleft_runtime)]
337impl_slicable_for_tuple!(
338    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
339    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10, S12, S12_bt, 11
340);
341
342macro_rules! impl_cycles_for_tuple {
343    ($($H:ident, $S:ident, $idx:tt),*) => {
344        impl<$($H, $S),*> UnzipCycles for ($(($H, $S),)*) {
345            type Handles = ($($H,)*);
346            type States = ($($S,)*);
347
348            #[expect(clippy::allow_attributes, reason = "macro codegen")]
349            #[allow(non_snake_case, reason = "macro codegen")]
350            fn unzip(self) -> (Self::Handles, Self::States) {
351                let ($($H,)*) = self;
352                (
353                    ($($H.0,)*),
354                    ($($H.1,)*),
355                )
356            }
357        }
358
359        impl<$($H: crate::forward_handle::CompleteCycle<$S>, $S),*> CompleteCycles<($($S,)*)> for ($($H,)*) {
360            #[expect(clippy::allow_attributes, reason = "macro codegen")]
361            #[allow(non_snake_case, reason = "macro codegen")]
362            fn complete(self, states: ($($S,)*)) {
363                let ($($H,)*) = self;
364                let ($($S,)*) = states;
365                $($H.complete_next_tick($S);)*
366            }
367        }
368    };
369}
370
371#[cfg(stageleft_runtime)]
372impl_cycles_for_tuple!();
373#[cfg(stageleft_runtime)]
374impl_cycles_for_tuple!(H1, S1, 0);
375#[cfg(stageleft_runtime)]
376impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1);
377#[cfg(stageleft_runtime)]
378impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2);
379#[cfg(stageleft_runtime)]
380impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3);
381#[cfg(stageleft_runtime)]
382impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4);
383#[cfg(stageleft_runtime)]
384impl_cycles_for_tuple!(
385    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5
386);
387#[cfg(stageleft_runtime)]
388impl_cycles_for_tuple!(
389    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6
390);
391#[cfg(stageleft_runtime)]
392impl_cycles_for_tuple!(
393    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7
394);
395#[cfg(stageleft_runtime)]
396impl_cycles_for_tuple!(
397    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
398    8
399);
400#[cfg(stageleft_runtime)]
401impl_cycles_for_tuple!(
402    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
403    8, H10, S10, 9
404);
405#[cfg(stageleft_runtime)]
406impl_cycles_for_tuple!(
407    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
408    8, H10, S10, 9, H11, S11, 10
409);
410#[cfg(stageleft_runtime)]
411impl_cycles_for_tuple!(
412    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
413    8, H10, S10, 9, H11, S11, 10, H12, S12, 11
414);
415
416// Unslicable implementations for plain collections (used when returning from sliced! body)
417impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
418    for super::Stream<T, Tick<L>, Bounded, O, R>
419{
420    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
421
422    fn unslice(self) -> Self::Unsliced {
423        self.all_ticks()
424    }
425}
426
427impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
428    type Unsliced = super::Singleton<T, L, Unbounded>;
429
430    fn unslice(self) -> Self::Unsliced {
431        self.latest()
432    }
433}
434
435impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
436    type Unsliced = super::Optional<T, L, Unbounded>;
437
438    fn unslice(self) -> Self::Unsliced {
439        self.latest()
440    }
441}
442
443impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
444    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
445{
446    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
447
448    fn unslice(self) -> Self::Unsliced {
449        self.all_ticks()
450    }
451}
452
453// Unslicable implementations for Atomic-wrapped bounded collections
454impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
455    for style::Atomic<super::Stream<T, Tick<L>, Bounded, O, R>>
456{
457    type Unsliced = super::Stream<T, crate::location::Atomic<L>, Unbounded, O, R>;
458
459    fn unslice(self) -> Self::Unsliced {
460        self.collection.all_ticks_atomic()
461    }
462}
463
464impl<'a, T, L: Location<'a> + NoTick> Unslicable
465    for style::Atomic<super::Singleton<T, Tick<L>, Bounded>>
466{
467    type Unsliced = super::Singleton<T, crate::location::Atomic<L>, Unbounded>;
468
469    fn unslice(self) -> Self::Unsliced {
470        self.collection.latest_atomic()
471    }
472}
473
474impl<'a, T, L: Location<'a> + NoTick> Unslicable
475    for style::Atomic<super::Optional<T, Tick<L>, Bounded>>
476{
477    type Unsliced = super::Optional<T, crate::location::Atomic<L>, Unbounded>;
478
479    fn unslice(self) -> Self::Unsliced {
480        self.collection.latest_atomic()
481    }
482}
483
484impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
485    for style::Atomic<super::KeyedStream<K, V, Tick<L>, Bounded, O, R>>
486{
487    type Unsliced = super::KeyedStream<K, V, crate::location::Atomic<L>, Unbounded, O, R>;
488
489    fn unslice(self) -> Self::Unsliced {
490        self.collection.all_ticks_atomic()
491    }
492}
493
494#[cfg(feature = "sim")]
495#[cfg(test)]
496mod tests {
497    use stageleft::q;
498
499    use super::sliced;
500    use crate::location::Location;
501    use crate::nondet::nondet;
502    use crate::prelude::FlowBuilder;
503
504    /// Test a counter using `use::state` with an initial singleton value.
505    /// Each input increments the counter, and we verify the output after each tick.
506    #[test]
507    fn sim_state_counter() {
508        let mut flow = FlowBuilder::new();
509        let node = flow.process::<()>();
510
511        let (input_send, input) = node.sim_input::<i32, _, _>();
512
513        let out_recv = sliced! {
514            let batch = use(input, nondet!(/** test */));
515            let mut counter = use::state(|l| l.singleton(q!(0)));
516
517            let new_count = counter.clone().zip(batch.count())
518                .map(q!(|(old, add)| old + add));
519            counter = new_count.clone();
520            new_count.into_stream()
521        }
522        .sim_output();
523
524        flow.sim().exhaustive(async || {
525            input_send.send(1);
526            assert_eq!(out_recv.next().await.unwrap(), 1);
527
528            input_send.send(1);
529            assert_eq!(out_recv.next().await.unwrap(), 2);
530
531            input_send.send(1);
532            assert_eq!(out_recv.next().await.unwrap(), 3);
533        });
534    }
535
536    /// Test `use::state_null` with an Optional that starts as None.
537    #[cfg(feature = "sim")]
538    #[test]
539    fn sim_state_null_optional() {
540        use crate::live_collections::Optional;
541        use crate::live_collections::boundedness::Bounded;
542        use crate::location::{Location, Tick};
543
544        let mut flow = FlowBuilder::new();
545        let node = flow.process::<()>();
546
547        let (input_send, input) = node.sim_input::<i32, _, _>();
548
549        let out_recv = sliced! {
550            let batch = use(input, nondet!(/** test */));
551            let mut prev = use::state_null::<Optional<i32, Tick<_>, Bounded>>();
552
553            // Output the previous value (or -1 if none)
554            let output = prev.clone().unwrap_or(prev.location().singleton(q!(-1)));
555            // Store the current batch's first value for next tick
556            prev = batch.first();
557            output.into_stream()
558        }
559        .sim_output();
560
561        flow.sim().exhaustive(async || {
562            input_send.send(10);
563            // First tick: prev is None, so output is -1
564            assert_eq!(out_recv.next().await.unwrap(), -1);
565
566            input_send.send(20);
567            // Second tick: prev is Some(10), so output is 10
568            assert_eq!(out_recv.next().await.unwrap(), 10);
569
570            input_send.send(30);
571            // Third tick: prev is Some(20), so output is 20
572            assert_eq!(out_recv.next().await.unwrap(), 20);
573        });
574    }
575
576    /// Test `use::state` with `source_iter` to initialize a stream state.
577    /// On the first tick, the state is the initial `[10, 20]` from `source_iter`.
578    /// On subsequent ticks, the state is the batch from the previous tick.
579    #[test]
580    fn sim_state_source_iter() {
581        let mut flow = FlowBuilder::new();
582        let node = flow.process::<()>();
583
584        let (input_send, input) = node.sim_input::<i32, _, _>();
585
586        let out_recv = sliced! {
587            let batch = use(input, nondet!(/** test */));
588            let mut items = use::state(|l| l.source_iter(q!([10, 20])));
589
590            // Output the current state, then replace it with the batch
591            let output = items.clone();
592            items = batch;
593            output
594        }
595        .sim_output();
596
597        flow.sim().exhaustive(async || {
598            input_send.send(3);
599            // First tick: items = initial [10, 20], output = [10, 20]
600            let mut results = vec![];
601            results.push(out_recv.next().await.unwrap());
602            results.push(out_recv.next().await.unwrap());
603            results.sort();
604            assert_eq!(results, vec![10, 20]);
605
606            input_send.send(4);
607            // Second tick: items = [3] (from previous batch), output = [3]
608            assert_eq!(out_recv.next().await.unwrap(), 3);
609
610            input_send.send(5);
611            // Third tick: items = [4] (from previous batch), output = [4]
612            assert_eq!(out_recv.next().await.unwrap(), 4);
613        });
614    }
615
616    /// Test atomic slicing with keyed streams.
617    #[test]
618    fn sim_sliced_atomic_keyed_stream() {
619        let mut flow = FlowBuilder::new();
620        let node = flow.process::<()>();
621
622        let (input_send, input) = node.sim_input::<(i32, i32), _, _>();
623        let atomic_keyed_input = input.into_keyed().atomic();
624        let accumulated_inputs = atomic_keyed_input
625            .clone()
626            .assume_ordering(nondet!(/** Test */))
627            .fold(
628                q!(|| 0),
629                q!(|curr, new| {
630                    *curr += new;
631                }),
632            );
633
634        let out_recv = sliced! {
635            let atomic_keyed_input = use::atomic(atomic_keyed_input, nondet!(/** test */));
636            let accumulated_inputs = use::atomic(accumulated_inputs, nondet!(/** test */));
637            accumulated_inputs.join_keyed_stream(atomic_keyed_input)
638                .map(q!(|(sum, _input)| sum))
639                .entries()
640        }
641        .assume_ordering_trusted(nondet!(/** test */))
642        .sim_output();
643
644        flow.sim().exhaustive(async || {
645            input_send.send((1, 1));
646            assert_eq!(out_recv.next().await.unwrap(), (1, 1));
647
648            input_send.send((1, 2));
649            assert_eq!(out_recv.next().await.unwrap(), (1, 3));
650
651            input_send.send((2, 1));
652            assert_eq!(out_recv.next().await.unwrap(), (2, 1));
653
654            input_send.send((1, 3));
655            assert_eq!(out_recv.next().await.unwrap(), (1, 6));
656        });
657    }
658}