hydro_std/
compartmentalize.rs

1use dfir_rs::serde::Serialize;
2use dfir_rs::serde::de::DeserializeOwned;
3use hydro_lang::*;
4use location::{CanSend, NoTick};
5use stageleft::IntoQuotedMut;
6use stream::MinOrder;
7
8pub trait PartitionStream<'a, T, C1, C2, Order> {
9    fn send_partitioned<Tag, F: Fn((ClusterId<C2>, T)) -> (ClusterId<C2>, T) + 'a>(
10        self,
11        other: &Cluster<'a, C2>,
12        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
13    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
14    where
15        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
16        Cluster<'a, C1>:
17            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
18        T: Clone + Serialize + DeserializeOwned,
19        Order: MinOrder<
20                <Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>,
21                Min = NoOrder,
22            >;
23}
24
25impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
26    for Stream<(ClusterId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
27{
28    fn send_partitioned<Tag, F: Fn((ClusterId<C2>, T)) -> (ClusterId<C2>, T) + 'a>(
29        self,
30        other: &Cluster<'a, C2>,
31        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
32    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
33    where
34        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
35        Cluster<'a, C1>:
36            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
37        T: Clone + Serialize + DeserializeOwned,
38        Order: MinOrder<
39                <Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>,
40                Min = NoOrder,
41            >,
42    {
43        self.map(dist_policy).send_bincode_anonymous(other)
44    }
45}
46
47pub trait DecoupleClusterStream<'a, T, C1, B, Order> {
48    fn decouple_cluster<C2: 'a, Tag>(
49        self,
50        other: &Cluster<'a, C2>,
51    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
52    where
53        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
54        Cluster<'a, C1>:
55            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
56        T: Clone + Serialize + DeserializeOwned,
57        Order:
58            MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>;
59}
60
61impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order>
62    for Stream<T, Cluster<'a, C1>, B, Order>
63{
64    fn decouple_cluster<C2: 'a, Tag>(
65        self,
66        other: &Cluster<'a, C2>,
67    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
68    where
69        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
70        Cluster<'a, C1>:
71            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
72        T: Clone + Serialize + DeserializeOwned,
73        Order:
74            MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
75    {
76        let sent = self
77            .map(q!(move |b| (
78                ClusterId::from_raw(CLUSTER_SELF_ID.raw_id),
79                b.clone()
80            )))
81            .send_bincode_anonymous(other);
82
83        unsafe {
84            // SAFETY: this is safe because we are mapping clusters 1:1
85            sent.assume_ordering()
86        }
87    }
88}
89
90pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order> {
91    fn decouple_process<P2>(
92        self,
93        other: &Process<'a, P2>,
94    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
95    where
96        L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
97        T: Clone + Serialize + DeserializeOwned,
98        Order: MinOrder<
99                <L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>,
100                Min = Order,
101            >;
102}
103
104impl<'a, T, L: Location<'a> + NoTick, B, Order> DecoupleProcessStream<'a, T, L, B, Order>
105    for Stream<T, L, B, Order>
106{
107    fn decouple_process<P2>(
108        self,
109        other: &Process<'a, P2>,
110    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
111    where
112        L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
113        T: Clone + Serialize + DeserializeOwned,
114        Order: MinOrder<
115                <L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>,
116                Min = Order,
117            >,
118    {
119        self.send_bincode::<Process<'a, P2>, T>(other)
120    }
121}