Skip to main content

hydro_std/
compartmentalize.rs

1use hydro_lang::live_collections::boundedness::Boundedness;
2use hydro_lang::live_collections::stream::{NoOrder, Ordering};
3use hydro_lang::location::cluster::CLUSTER_SELF_ID;
4use hydro_lang::location::{Location, MemberId, NoTick};
5use hydro_lang::prelude::*;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8use stageleft::IntoQuotedMut;
9
10pub trait PartitionStream<'a, T, C1, C2, Order> {
11    fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
12        self,
13        other: &Cluster<'a, C2>,
14        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
15    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
16    where
17        T: Clone + Serialize + DeserializeOwned;
18}
19
20impl<'a, T, C1, C2, Order: Ordering> PartitionStream<'a, T, C1, C2, Order>
21    for Stream<(MemberId<C2>, T), Cluster<'a, C1>, Unbounded, Order>
22{
23    fn send_partitioned<F: Fn((MemberId<C2>, T)) -> (MemberId<C2>, T) + 'a>(
24        self,
25        other: &Cluster<'a, C2>,
26        dist_policy: impl IntoQuotedMut<'a, F, Cluster<'a, C1>>,
27    ) -> Stream<T, Cluster<'a, C2>, Unbounded, NoOrder>
28    where
29        T: Clone + Serialize + DeserializeOwned,
30    {
31        self.map(dist_policy)
32            .demux(other, TCP.fail_stop().bincode())
33            .values()
34    }
35}
36
37pub trait DecoupleClusterStream<'a, T, C1, B, Order: Ordering> {
38    fn decouple_cluster<C2: 'a>(
39        self,
40        other: &Cluster<'a, C2>,
41    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
42    where
43        T: Clone + Serialize + DeserializeOwned,
44        C1: 'a;
45}
46
47impl<'a, T, C1, B: Boundedness, Order: Ordering> DecoupleClusterStream<'a, T, C1, B, Order>
48    for Stream<T, Cluster<'a, C1>, B, Order>
49where
50    C1: 'a,
51{
52    fn decouple_cluster<C2: 'a>(
53        self,
54        other: &Cluster<'a, C2>,
55    ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order>
56    where
57        T: Clone + Serialize + DeserializeOwned,
58        C1: 'a,
59    {
60        let sent = self
61            .map(q!(move |b| (
62                MemberId::from_tagless(CLUSTER_SELF_ID.clone().into_tagless()), // this is a seemingly round about way to convert from one member id tag to another.
63                b
64            )))
65            .demux(other, TCP.fail_stop().bincode())
66            .values();
67
68        sent.assume_ordering(
69            nondet!(/** this is safe because we are only receiving from one sender */),
70        )
71    }
72}
73
74pub trait DecoupleProcessStream<'a, T, L: Location<'a> + NoTick, B, Order: Ordering> {
75    fn decouple_process<P2>(
76        self,
77        other: &Process<'a, P2>,
78    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
79    where
80        T: Clone + Serialize + DeserializeOwned;
81}
82
83impl<'a, T, L, B: Boundedness, Order: Ordering>
84    DecoupleProcessStream<'a, T, Process<'a, L>, B, Order> for Stream<T, Process<'a, L>, B, Order>
85{
86    fn decouple_process<P2>(
87        self,
88        other: &Process<'a, P2>,
89    ) -> Stream<T, Process<'a, P2>, Unbounded, Order>
90    where
91        T: Clone + Serialize + DeserializeOwned,
92    {
93        self.send(other, TCP.fail_stop().bincode())
94    }
95}