1use dfir_rs::serde::Serialize;
2use dfir_rs::serde::de::DeserializeOwned;
3use hydro_lang::*;
4use location::{CanSend, NoTick};
5use stageleft::IntoQuotedMut;
6use stream::MinOrder;
7
8pub trait PartitionStream<'a, T, C1, C2, Order> {
9 fn send_partitioned<Tag, F: Fn((ClusterId<C2>, T)) -> (ClusterId<C2>, T) + 'a>(
10 self,
11 other: &Cluster<'a, C2>,
12 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
13 ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
14 where
15 Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
16 Cluster<'a, C1>:
17 CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
18 T: Clone + Serialize + DeserializeOwned,
19 Order: MinOrder<
20 <Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>,
21 Min = NoOrder,
22 >;
23}
24
25impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
26 for Stream<(ClusterId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
27{
28 fn send_partitioned<Tag, F: Fn((ClusterId<C2>, T)) -> (ClusterId<C2>, T) + 'a>(
29 self,
30 other: &Cluster<'a, C2>,
31 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
32 ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
33 where
34 Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
35 Cluster<'a, C1>:
36 CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
37 T: Clone + Serialize + DeserializeOwned,
38 Order: MinOrder<
39 <Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>,
40 Min = NoOrder,
41 >,
42 {
43 self.map(dist_policy).send_bincode_anonymous(other)
44 }
45}
46
47pub trait DecoupleClusterStream<'a, T, C1, B, Order> {
48 fn decouple_cluster<C2: 'a, Tag>(
49 self,
50 other: &Cluster<'a, C2>,
51 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
52 where
53 Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
54 Cluster<'a, C1>:
55 CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
56 T: Clone + Serialize + DeserializeOwned,
57 Order:
58 MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>;
59}
60
61impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order>
62 for Stream<T, Cluster<'a, C1>, B, Order>
63{
64 fn decouple_cluster<C2: 'a, Tag>(
65 self,
66 other: &Cluster<'a, C2>,
67 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
68 where
69 Cluster<'a, C1>: Location<'a, Root = Cluster<'a, C1>>,
70 Cluster<'a, C1>:
71 CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
72 T: Clone + Serialize + DeserializeOwned,
73 Order:
74 MinOrder<<Cluster<'a, C1> as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
75 {
76 let sent = self
77 .map(q!(move |b| (
78 ClusterId::from_raw(CLUSTER_SELF_ID.raw_id),
79 b.clone()
80 )))
81 .send_bincode_anonymous(other);
82
83 unsafe {
84 sent.assume_ordering()
86 }
87 }
88}
89
90pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order> {
91 fn decouple_process<P2>(
92 self,
93 other: &Process<'a, P2>,
94 ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
95 where
96 L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
97 T: Clone + Serialize + DeserializeOwned,
98 Order: MinOrder<
99 <L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>,
100 Min = Order,
101 >;
102}
103
104impl<'a, T, L: Location<'a> + NoTick, B, Order> DecoupleProcessStream<'a, T, L, B, Order>
105 for Stream<T, L, B, Order>
106{
107 fn decouple_process<P2>(
108 self,
109 other: &Process<'a, P2>,
110 ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
111 where
112 L::Root: CanSend<'a, Process<'a, P2>, In<T> = T, Out<T> = T>,
113 T: Clone + Serialize + DeserializeOwned,
114 Order: MinOrder<
115 <L::Root as CanSend<'a, Process<'a, P2>>>::OutStrongestOrder<Order>,
116 Min = Order,
117 >,
118 {
119 self.send_bincode::<Process<'a, P2>, T>(other)
120 }
121}