hydro_lang/live_collections/
sliced.rs

1//! Utilities for transforming live collections via slicing.
2
3use super::boundedness::{Bounded, Unbounded};
4use crate::live_collections::keyed_singleton::BoundedValue;
5use crate::live_collections::stream::{Ordering, Retries};
6use crate::location::{Location, NoTick, Tick};
7use crate::nondet::NonDet;
8
9#[doc(hidden)]
10pub fn __sliced_wrap_invoke<A, B, O: Unslicable>(
11    a: A,
12    b: B,
13    f: impl FnOnce(A, B) -> O,
14) -> O::Unsliced {
15    let o_slice = f(a, b);
16    o_slice.unslice()
17}
18
19#[doc(hidden)]
20#[macro_export]
21macro_rules! __sliced_parse_uses__ {
22    (
23        @uses [$($uses:tt)*]
24        let $name:ident = use $(::$style:ident)?($expr:expr, $nondet:expr); $($rest:tt)*
25    ) => {
26        $crate::__sliced_parse_uses__!(
27            @uses [$($uses)* { $name, ($($style)?), $expr, $nondet }]
28            $($rest)*
29        )
30    };
31
32    (
33        @uses [{ $first_name:ident, ($($first_style:ident)?), $first:expr, $nondet_first:expr } $({ $rest_name:ident, ($($rest_style:ident)?), $rest:expr, $nondet_expl:expr })*]
34        $($body:tt)*
35    ) => {
36        {
37            let _ = $nondet_first;
38            $(let _ = $nondet_expl;)*
39
40            let __styled = (
41                $($crate::live_collections::sliced::style::$first_style)?($first),
42                $($($crate::live_collections::sliced::style::$rest_style)?($rest),)*
43            );
44
45            let __tick = $crate::live_collections::sliced::Slicable::preferred_tick(&__styled).unwrap_or_else(|| $crate::live_collections::sliced::Slicable::get_location(&__styled.0).tick());
46            let __backtraces = {
47                use $crate::compile::ir::backtrace::__macro_get_backtrace;
48                (
49                    $crate::macro_support::copy_span::copy_span!($first, {
50                        __macro_get_backtrace(1)
51                    }),
52                    $($crate::macro_support::copy_span::copy_span!($rest, {
53                        __macro_get_backtrace(1)
54                    }),)*
55                )
56            };
57            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces, $nondet_first);
58            let (
59                $first_name,
60                $($rest_name,)*
61            ) = __sliced;
62
63            $crate::live_collections::sliced::Unslicable::unslice({
64                $($body)*
65            })
66        }
67    };
68}
69
70#[macro_export]
71/// Transforms a live collection with a computation relying on a slice of another live collection.
72/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
73/// collection, such as joining a stream with the latest values from a singleton.
74///
75/// # Syntax
76/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
77/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
78/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
79/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
80///
81/// ```rust,ignore
82/// let stream = sliced! {
83///     let name1 = use(collection1, nondet!(/** explanation */));
84///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
85///
86///     // arbitrary statements can follow
87///     let intermediate = name1.map(...);
88///     intermediate.cross_singleton(name2)
89/// };
90/// ```
91///
92/// # Example with two collections
93/// ```rust
94/// # #[cfg(feature = "deploy")] {
95/// # use hydro_lang::prelude::*;
96/// # use futures::StreamExt;
97/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
98/// let singleton = process.singleton(q!(5));
99/// let stream = process.source_iter(q!(vec![1, 2, 3]));
100/// let out: Stream<(i32, i32), _> = sliced! {
101///     let batch_of_req = use(stream, nondet!(/** test */));
102///     let latest_singleton = use(singleton, nondet!(/** test */));
103///
104///     let mapped = batch_of_req.map(q!(|x| x * 2));
105///     mapped.cross_singleton(latest_singleton)
106/// };
107/// # out
108/// # }, |mut stream| async move {
109/// # assert_eq!(stream.next().await.unwrap(), (2, 5));
110/// # assert_eq!(stream.next().await.unwrap(), (4, 5));
111/// # assert_eq!(stream.next().await.unwrap(), (6, 5));
112/// # }));
113/// # }
114/// ```
115macro_rules! __sliced__ {
116    ($($tt:tt)*) => {
117        $crate::__sliced_parse_uses__!(
118            @uses []
119            $($tt)*
120        )
121    };
122}
123
124pub use crate::__sliced__ as sliced;
125
126/// Styles for use with the `sliced!` macro.
127pub mod style {
128    use super::Slicable;
129    use crate::live_collections::boundedness::{Bounded, Unbounded};
130    use crate::live_collections::keyed_singleton::BoundedValue;
131    use crate::live_collections::stream::{Ordering, Retries, Stream};
132    use crate::location::{Location, NoTick, Tick};
133    use crate::nondet::NonDet;
134
135    /// Marks a live collection to be treated atomically during slicing.
136    pub struct Atomic<T>(pub T);
137
138    /// Wraps a live collection to be treated atomically during slicing.
139    pub fn atomic<T>(t: T) -> Atomic<T> {
140        Atomic(t)
141    }
142
143    impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Slicable<'a, L>
144        for Atomic<Stream<T, crate::location::Atomic<L>, Unbounded, O, R>>
145    {
146        type Slice = Stream<T, Tick<L>, Bounded, O, R>;
147        type Backtrace = crate::compile::ir::backtrace::Backtrace;
148
149        fn preferred_tick(&self) -> Option<Tick<L>> {
150            Some(self.0.location().tick().as_regular_tick())
151        }
152
153        fn get_location(&self) -> &L {
154            panic!("Atomic location has no accessible inner location")
155        }
156
157        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
158            assert_eq!(
159                self.0.location().tick().as_regular_tick().id(),
160                tick.id(),
161                "Mismatched tick for atomic slicing"
162            );
163
164            let out = self.0.batch_atomic(nondet);
165            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
166            out
167        }
168    }
169
170    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
171        for Atomic<crate::live_collections::Singleton<T, crate::location::Atomic<L>, Unbounded>>
172    {
173        type Slice = crate::live_collections::Singleton<T, Tick<L>, Bounded>;
174        type Backtrace = crate::compile::ir::backtrace::Backtrace;
175
176        fn preferred_tick(&self) -> Option<Tick<L>> {
177            Some(self.0.location().tick().as_regular_tick())
178        }
179
180        fn get_location(&self) -> &L {
181            panic!("Atomic location has no accessible inner location")
182        }
183
184        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
185            assert_eq!(
186                self.0.location().tick().as_regular_tick().id(),
187                tick.id(),
188                "Mismatched tick for atomic slicing"
189            );
190
191            let out = self.0.snapshot_atomic(nondet);
192            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
193            out
194        }
195    }
196
197    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
198        for Atomic<crate::live_collections::Optional<T, crate::location::Atomic<L>, Unbounded>>
199    {
200        type Slice = crate::live_collections::Optional<T, Tick<L>, Bounded>;
201        type Backtrace = crate::compile::ir::backtrace::Backtrace;
202
203        fn preferred_tick(&self) -> Option<Tick<L>> {
204            Some(self.0.location().tick().as_regular_tick())
205        }
206
207        fn get_location(&self) -> &L {
208            panic!("Atomic location has no accessible inner location")
209        }
210
211        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
212            assert_eq!(
213                self.0.location().tick().as_regular_tick().id(),
214                tick.id(),
215                "Mismatched tick for atomic slicing"
216            );
217
218            let out = self.0.snapshot_atomic(nondet);
219            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
220            out
221        }
222    }
223
224    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
225        for Atomic<
226            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, Unbounded>,
227        >
228    {
229        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
230        type Backtrace = crate::compile::ir::backtrace::Backtrace;
231
232        fn preferred_tick(&self) -> Option<Tick<L>> {
233            Some(self.0.location().tick().as_regular_tick())
234        }
235
236        fn get_location(&self) -> &L {
237            panic!("Atomic location has no accessible inner location")
238        }
239
240        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
241            assert_eq!(
242                self.0.location().tick().as_regular_tick().id(),
243                tick.id(),
244                "Mismatched tick for atomic slicing"
245            );
246
247            let out = self.0.snapshot_atomic(nondet);
248            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
249            out
250        }
251    }
252
253    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
254        for Atomic<
255            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, BoundedValue>,
256        >
257    {
258        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
259        type Backtrace = crate::compile::ir::backtrace::Backtrace;
260
261        fn preferred_tick(&self) -> Option<Tick<L>> {
262            Some(self.0.location().tick().as_regular_tick())
263        }
264
265        fn get_location(&self) -> &L {
266            panic!("Atomic location has no accessible inner location")
267        }
268
269        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
270            assert_eq!(
271                self.0.location().tick().as_regular_tick().id(),
272                tick.id(),
273                "Mismatched tick for atomic slicing"
274            );
275
276            let out = self.0.batch_atomic(nondet);
277            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
278            out
279        }
280    }
281}
282
283/// A trait for live collections which can be sliced into bounded versions at a tick.
284pub trait Slicable<'a, L: Location<'a>> {
285    /// The sliced version of this live collection.
286    type Slice;
287
288    /// The type of backtrace associated with this slice.
289    type Backtrace;
290
291    /// Gets the preferred tick to slice at. Used for atomic slicing.
292    fn preferred_tick(&self) -> Option<Tick<L>>;
293
294    /// Gets the location associated with this live collection.
295    fn get_location(&self) -> &L;
296
297    /// Slices this live collection at the given tick.
298    ///
299    /// # Non-Determinism
300    /// Slicing a live collection may involve non-determinism, such as choosing which messages
301    /// to include in a batch. The provided `nondet` parameter should be used to explain the impact
302    /// of this non-determinism on the program's behavior.
303    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice;
304}
305
306/// A trait for live collections which can be yielded out of a slice back into their original form.
307pub trait Unslicable {
308    /// The unsliced version of this live collection.
309    type Unsliced;
310
311    /// Unslices a sliced live collection back into its original form.
312    fn unslice(self) -> Self::Unsliced;
313}
314
315impl<'a, L: Location<'a>> Slicable<'a, L> for () {
316    type Slice = ();
317    type Backtrace = ();
318
319    fn get_location(&self) -> &L {
320        unreachable!()
321    }
322
323    fn preferred_tick(&self) -> Option<Tick<L>> {
324        None
325    }
326
327    fn slice(self, _tick: &Tick<L>, __backtrace: Self::Backtrace, _nondet: NonDet) -> Self::Slice {}
328}
329
330impl Unslicable for () {
331    type Unsliced = ();
332
333    fn unslice(self) -> Self::Unsliced {}
334}
335
336macro_rules! impl_slicable_for_tuple {
337    ($($T:ident, $T_bt:ident, $idx:tt),*) => {
338        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),*> Slicable<'a, L> for ($($T,)*) {
339            type Slice = ($($T::Slice,)*);
340            type Backtrace = ($($T::Backtrace,)*);
341
342            fn get_location(&self) -> &L {
343                self.0.get_location()
344            }
345
346            fn preferred_tick(&self) -> Option<Tick<L>> {
347                let mut preferred: Option<Tick<L>> = None;
348                $(
349                    if let Some(tick) = self.$idx.preferred_tick() {
350                        preferred = Some(match preferred {
351                            Some(current) => {
352                                if $crate::location::Location::id(&current) == $crate::location::Location::id(&tick) {
353                                    current
354                                } else {
355                                    panic!("Mismatched preferred ticks for sliced collections")
356                                }
357                            },
358                            None => tick,
359                        });
360                    }
361                )*
362                preferred
363            }
364
365            #[expect(non_snake_case, reason = "macro codegen")]
366            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
367                let ($($T,)*) = self;
368                let ($($T_bt,)*) = backtrace;
369                ($($T.slice(tick, $T_bt, nondet),)*)
370            }
371        }
372    };
373}
374
375#[cfg(stageleft_runtime)]
376impl_slicable_for_tuple!(S1, S1_bt, 0);
377#[cfg(stageleft_runtime)]
378impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
379#[cfg(stageleft_runtime)]
380impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
381#[cfg(stageleft_runtime)]
382impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
383#[cfg(stageleft_runtime)]
384impl_slicable_for_tuple!(
385    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
386); // 5 slices ought to be enough for anyone
387
388impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
389    for super::Stream<T, L, Unbounded, O, R>
390{
391    type Slice = super::Stream<T, Tick<L>, Bounded, O, R>;
392    type Backtrace = crate::compile::ir::backtrace::Backtrace;
393
394    fn get_location(&self) -> &L {
395        self.location()
396    }
397
398    fn preferred_tick(&self) -> Option<Tick<L>> {
399        None
400    }
401
402    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
403        let out = self.batch(tick, nondet);
404        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
405        out
406    }
407}
408
409impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
410    for super::Stream<T, Tick<L>, Bounded, O, R>
411{
412    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
413
414    fn unslice(self) -> Self::Unsliced {
415        self.all_ticks()
416    }
417}
418
419impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Singleton<T, L, Unbounded> {
420    type Slice = super::Singleton<T, Tick<L>, Bounded>;
421    type Backtrace = crate::compile::ir::backtrace::Backtrace;
422
423    fn get_location(&self) -> &L {
424        self.location()
425    }
426
427    fn preferred_tick(&self) -> Option<Tick<L>> {
428        None
429    }
430
431    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
432        let out = self.snapshot(tick, nondet);
433        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
434        out
435    }
436}
437
438impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
439    type Unsliced = super::Singleton<T, L, Unbounded>;
440
441    fn unslice(self) -> Self::Unsliced {
442        self.latest()
443    }
444}
445
446impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Optional<T, L, Unbounded> {
447    type Slice = super::Optional<T, Tick<L>, Bounded>;
448    type Backtrace = crate::compile::ir::backtrace::Backtrace;
449
450    fn get_location(&self) -> &L {
451        self.location()
452    }
453
454    fn preferred_tick(&self) -> Option<Tick<L>> {
455        None
456    }
457
458    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
459        let out = self.snapshot(tick, nondet);
460        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
461        out
462    }
463}
464
465impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
466    type Unsliced = super::Optional<T, L, Unbounded>;
467
468    fn unslice(self) -> Self::Unsliced {
469        self.latest()
470    }
471}
472
473impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
474    for super::KeyedStream<K, V, L, Unbounded, O, R>
475{
476    type Slice = super::KeyedStream<K, V, Tick<L>, Bounded, O, R>;
477    type Backtrace = crate::compile::ir::backtrace::Backtrace;
478
479    fn get_location(&self) -> &L {
480        self.location()
481    }
482
483    fn preferred_tick(&self) -> Option<Tick<L>> {
484        None
485    }
486
487    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
488        let out = self.batch(tick, nondet);
489        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
490        out
491    }
492}
493
494impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
495    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
496{
497    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
498
499    fn unslice(self) -> Self::Unsliced {
500        self.all_ticks()
501    }
502}
503
504impl<'a, K, V, L: Location<'a>> Slicable<'a, L> for super::KeyedSingleton<K, V, L, Unbounded> {
505    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
506    type Backtrace = crate::compile::ir::backtrace::Backtrace;
507
508    fn get_location(&self) -> &L {
509        self.location()
510    }
511
512    fn preferred_tick(&self) -> Option<Tick<L>> {
513        None
514    }
515
516    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
517        let out = self.snapshot(tick, nondet);
518        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
519        out
520    }
521}
522
523impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
524    for super::KeyedSingleton<K, V, L, BoundedValue>
525{
526    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
527    type Backtrace = crate::compile::ir::backtrace::Backtrace;
528
529    fn get_location(&self) -> &L {
530        self.location()
531    }
532
533    fn preferred_tick(&self) -> Option<Tick<L>> {
534        None
535    }
536
537    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
538        let out = self.batch(tick, nondet);
539        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
540        out
541    }
542}