hydro_lang/properties/
mod.rs1use std::marker::PhantomData;
4
5use stageleft::properties::Property;
6
7use crate::live_collections::stream::{ExactlyOnce, Ordering, Retries, TotalOrder};
8
9#[sealed::sealed]
11pub trait CommutativeProof {
12 fn register_proof(&self, expr: &syn::Expr);
16}
17
18#[sealed::sealed]
20pub trait IdempotentProof {
21 fn register_proof(&self, expr: &syn::Expr);
25}
26
27pub struct ManualProof();
32#[sealed::sealed]
33impl CommutativeProof for ManualProof {
34 fn register_proof(&self, _expr: &syn::Expr) {}
35}
36#[sealed::sealed]
37impl IdempotentProof for ManualProof {
38 fn register_proof(&self, _expr: &syn::Expr) {}
39}
40
41#[doc(inline)]
42pub use crate::__manual_proof__ as manual_proof;
43
44#[macro_export]
45macro_rules! __manual_proof__ {
63 ($(#[doc = $doc:expr])+) => {
64 $crate::properties::ManualProof()
65 };
66}
67
68pub enum NotProved {}
70
71pub enum Proved {}
73
74pub struct AggFuncAlgebra<Commutative = NotProved, Idempotent = NotProved>(
92 Option<Box<dyn CommutativeProof>>,
93 Option<Box<dyn IdempotentProof>>,
94 PhantomData<(Commutative, Idempotent)>,
95);
96
97impl<C, I> AggFuncAlgebra<C, I> {
98 pub fn commutative(self, proof: impl CommutativeProof + 'static) -> AggFuncAlgebra<Proved, I> {
100 AggFuncAlgebra(Some(Box::new(proof)), self.1, PhantomData)
101 }
102
103 pub fn idempotent(self, proof: impl IdempotentProof + 'static) -> AggFuncAlgebra<C, Proved> {
105 AggFuncAlgebra(self.0, Some(Box::new(proof)), PhantomData)
106 }
107
108 pub(crate) fn register_proof(self, expr: &syn::Expr) {
110 if let Some(comm_proof) = self.0 {
111 comm_proof.register_proof(expr);
112 }
113
114 if let Some(idem_proof) = self.1 {
115 idem_proof.register_proof(expr);
116 }
117 }
118}
119
120impl<C, I> Property for AggFuncAlgebra<C, I> {
121 type Root = AggFuncAlgebra;
122
123 fn make_root(_target: &mut Option<Self>) -> Self::Root {
124 AggFuncAlgebra(None, None, PhantomData)
125 }
126}
127
128#[diagnostic::on_unimplemented(
130 message = "Because the input stream has ordering `{O}`, the closure must demonstrate commutativity with a `commutative = ...` annotation.",
131 label = "required for this call",
132 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
133)]
134#[sealed::sealed]
135pub trait ValidCommutativityFor<O: Ordering> {}
136#[sealed::sealed]
137impl ValidCommutativityFor<TotalOrder> for NotProved {}
138#[sealed::sealed]
139impl<O: Ordering> ValidCommutativityFor<O> for Proved {}
140
141#[diagnostic::on_unimplemented(
143 message = "Because the input stream has retries `{R}`, the closure must demonstrate idempotence with an `idempotent = ...` annotation.",
144 label = "required for this call",
145 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
146)]
147#[sealed::sealed]
148pub trait ValidIdempotenceFor<R: Retries> {}
149#[sealed::sealed]
150impl ValidIdempotenceFor<ExactlyOnce> for NotProved {}
151#[sealed::sealed]
152impl<R: Retries> ValidIdempotenceFor<R> for Proved {}