1use hydro_lang::boundedness::Boundedness;
2use hydro_lang::*;
3use location::NoTick;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use stageleft::IntoQuotedMut;
7
8pub trait PartitionStream<'a, T, C1, C2, Order> {
9 fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
10 self,
11 other: &Cluster<'a, C2>,
12 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
13 ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
14 where
15 T: Clone + Serialize + DeserializeOwned;
16}
17
18impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
19 for Stream<(MemberId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
20{
21 fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
22 self,
23 other: &Cluster<'a, C2>,
24 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
25 ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
26 where
27 T: Clone + Serialize + DeserializeOwned,
28 {
29 self.map(dist_policy).demux_bincode(other).values()
30 }
31}
32
33pub trait DecoupleClusterStream<'a, T, C1, B, Order> {
34 fn decouple_cluster<C2: 'a>(
35 self,
36 other: &Cluster<'a, C2>,
37 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
38 where
39 T: Clone + Serialize + DeserializeOwned;
40}
41
42impl<'a, T, C1, B: Boundedness, Order> DecoupleClusterStream<'a, T, C1, B, Order>
43 for Stream<T, Cluster<'a, C1>, B, Order>
44{
45 fn decouple_cluster<C2: 'a>(
46 self,
47 other: &Cluster<'a, C2>,
48 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
49 where
50 T: Clone + Serialize + DeserializeOwned,
51 {
52 let sent = self
53 .map(q!(move |b| (
54 MemberId::from_raw(CLUSTER_SELF_ID.raw_id),
55 b.clone()
56 )))
57 .demux_bincode(other)
58 .values();
59
60 sent.assume_ordering(
61 nondet!(),
62 )
63 }
64}
65
66pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order> {
67 fn decouple_process<P2>(
68 self,
69 other: &Process<'a, P2>,
70 ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
71 where
72 T: Clone + Serialize + DeserializeOwned;
73}
74
75impl<'a, T, L, B: Boundedness, Order> DecoupleProcessStream<'a, T, Process<'a, L>, B, Order>
76 for Stream<T, Process<'a, L>, B, Order>
77{
78 fn decouple_process<P2>(
79 self,
80 other: &Process<'a, P2>,
81 ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
82 where
83 T: Clone + Serialize + DeserializeOwned,
84 {
85 self.send_bincode(other)
86 }
87}