1use hydro_lang::live_collections::boundedness::Boundedness;
2use hydro_lang::live_collections::stream::{NoOrder, Ordering};
3use hydro_lang::location::cluster::CLUSTER_SELF_ID;
4use hydro_lang::location::{Location, MemberId, NoTick};
5use hydro_lang::prelude::*;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8use stageleft::IntoQuotedMut;
9
10pub trait PartitionStream<'a, T, C1, C2, Order> {
11 fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
12 self,
13 other: &Cluster<'a, C2>,
14 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
15 ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
16 where
17 T: Clone + Serialize + DeserializeOwned;
18}
19
20impl<'a, T, C1, C2, Order: Ordering> PartitionStream<'a, T, C1, C2, Order>
21 for Stream<(MemberId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
22{
23 fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
24 self,
25 other: &Cluster<'a, C2>,
26 dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
27 ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
28 where
29 T: Clone + Serialize + DeserializeOwned,
30 {
31 self.map(dist_policy)
32 .demux(other, TCP.fail_stop().bincode())
33 .values()
34 }
35}
36
37pub trait DecoupleClusterStream<'a, T, C1, B, Order: Ordering> {
38 fn decouple_cluster<C2: 'a>(
39 self,
40 other: &Cluster<'a, C2>,
41 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
42 where
43 T: Clone + Serialize + DeserializeOwned,
44 C1: 'a;
45}
46
47impl<'a, T, C1, B: Boundedness, Order: Ordering> DecoupleClusterStream<'a, T, C1, B, Order>
48 for Stream<T, Cluster<'a, C1>, B, Order>
49where
50 C1: 'a,
51{
52 fn decouple_cluster<C2: 'a>(
53 self,
54 other: &Cluster<'a, C2>,
55 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
56 where
57 T: Clone + Serialize + DeserializeOwned,
58 C1: 'a,
59 {
60 let sent = self
61 .map(q!(move |b| (
62 MemberId::from_tagless(CLUSTER_SELF_ID.clone().into_tagless()), b
64 )))
65 .demux(other, TCP.fail_stop().bincode())
66 .values();
67
68 sent.assume_ordering(
69 nondet!(),
70 )
71 }
72}
73
74pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order: Ordering> {
75 fn decouple_process<P2>(
76 self,
77 other: &Process<'a, P2>,
78 ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
79 where
80 T: Clone + Serialize + DeserializeOwned;
81}
82
83impl<'a, T, L, B: Boundedness, Order: Ordering>
84 DecoupleProcessStream<'a, T, Process<'a, L>, B, Order> for Stream<T, Process<'a, L>, B, Order>
85{
86 fn decouple_process<P2>(
87 self,
88 other: &Process<'a, P2>,
89 ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
90 where
91 T: Clone + Serialize + DeserializeOwned,
92 {
93 self.send(other, TCP.fail_stop().bincode())
94 }
95}