hydro_std/
compartmentalize.rs

1use hydro_lang::boundedness::Boundedness;
2use hydro_lang::*;
3use location::NoTick;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use stageleft::IntoQuotedMut;
7
8pub trait PartitionStream<'a, T, C1, C2, Order> {
9    fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
10        self,
11        other: &Cluster<'a, C2>,
12        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
13    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
14    where
15        T: Clone + Serialize + DeserializeOwned;
16}
17
18impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
19    for Stream<(MemberId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
20{
21    fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
22        self,
23        other: &Cluster<'a, C2>,
24        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
25    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
26    where
27        T: Clone + Serialize + DeserializeOwned,
28    {
29        self.map(dist_policy).demux_bincode(other).values()
30    }
31}
32
33pub trait DecoupleClusterStream<'a, T, C1, B, Order> {
34    fn decouple_cluster<C2: 'a>(
35        self,
36        other: &Cluster<'a, C2>,
37    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
38    where
39        T: Clone + Serialize + DeserializeOwned;
40}
41
42impl<'a, T, C1, B: Boundedness, Order> DecoupleClusterStream<'a, T, C1, B, Order>
43    for Stream<T, Cluster<'a, C1>, B, Order>
44{
45    fn decouple_cluster<C2: 'a>(
46        self,
47        other: &Cluster<'a, C2>,
48    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
49    where
50        T: Clone + Serialize + DeserializeOwned,
51    {
52        let sent = self
53            .map(q!(move |b| (
54                MemberId::from_raw(CLUSTER_SELF_ID.raw_id),
55                b.clone()
56            )))
57            .demux_bincode(other)
58            .values();
59
60        sent.assume_ordering(
61            nondet!(/** this is safe because we are only receiving from one sender */),
62        )
63    }
64}
65
66pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order> {
67    fn decouple_process<P2>(
68        self,
69        other: &Process<'a, P2>,
70    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
71    where
72        T: Clone + Serialize + DeserializeOwned;
73}
74
75impl<'a, T, L, B: Boundedness, Order> DecoupleProcessStream<'a, T, Process<'a, L>, B, Order>
76    for Stream<T, Process<'a, L>, B, Order>
77{
78    fn decouple_process<P2>(
79        self,
80        other: &Process<'a, P2>,
81    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
82    where
83        T: Clone + Serialize + DeserializeOwned,
84    {
85        self.send_bincode(other)
86    }
87}