1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use dfir_rs::serde::de::DeserializeOwned;
use dfir_rs::serde::Serialize;
use hydro_lang::*;
use location::{CanSend, NoTick};
use stageleft::IntoQuotedMut;
use stream::MinOrder;

pub trait PartitionStream<'a, T, C1, C2, Order> {
    fn send_partitioned<Tag, F: Fn((ClusterId<C2>, T)) -> (ClusterId<C2>, T) + 'a>(
        self,
        other: &Cluster<'a, C2>,
        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
    where
        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
        Cluster<'a, C1>:
            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
        T: Clone + Serialize + DeserializeOwned,
        Order: MinOrder<
            <Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>,
            Min = NoOrder,
        >;
}

impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
    for Stream<(ClusterId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
{
    fn send_partitioned<Tag, F: Fn((ClusterId<C2>, T)) -> (ClusterId<C2>, T) + 'a>(
        self,
        other: &Cluster<'a, C2>,
        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
    where
        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
        Cluster<'a, C1>:
            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
        T: Clone + Serialize + DeserializeOwned,
        Order: MinOrder<
            <Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>,
            Min = NoOrder,
        >,
    {
        self.map(dist_policy).send_bincode_interleaved(other)
    }
}

pub trait DecoupleClusterStream<'a, T, C1, B, Order> {
    fn decouple_cluster<C2: 'a, Tag>(
        self,
        other: &Cluster<'a, C2>,
    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
    where
        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
        Cluster<'a, C1>:
            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
        T: Clone + Serialize + DeserializeOwned,
        Order:
            MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>;
}

impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order>
    for Stream<T, Cluster<'a, C1>, B, Order>
{
    fn decouple_cluster<C2: 'a, Tag>(
        self,
        other: &Cluster<'a, C2>,
    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
    where
        Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
        Cluster<'a, C1>:
            CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
        T: Clone + Serialize + DeserializeOwned,
        Order:
            MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
    {
        let sent = self
            .map(q!(move |b| (
                ClusterId::from_raw(CLUSTER_SELF_ID.raw_id),
                b.clone()
            )))
            .send_bincode_interleaved(other);

        unsafe {
            // SAFETY: this is safe because we are mapping clusters 1:1
            sent.assume_ordering()
        }
    }
}

pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order> {
    fn decouple_process<P2>(
        self,
        other: &Process<'a, P2>,
    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
    where
        L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
        T: Clone + Serialize + DeserializeOwned,
        Order: MinOrder<
            <L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>,
            Min = Order,
        >;
}

impl<'a, T, L: Location<'a> + NoTick, B, Order> DecoupleProcessStream<'a, T, L, B, Order>
    for Stream<T, L, B, Order>
{
    fn decouple_process<P2>(
        self,
        other: &Process<'a, P2>,
    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
    where
        L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
        T: Clone + Serialize + DeserializeOwned,
        Order: MinOrder<
            <L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>,
            Min = Order,
        >,
    {
        self.send_bincode::<Process<'a, P2>, T>(other)
    }
}