hydro_lang/location/
external_process.rs

1use std::marker::PhantomData;
2
3use bytes::Bytes;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use super::{Location, LocationId, NoTick};
8use crate::builder::FlowState;
9use crate::ir::{DebugInstantiate, HydroNode, HydroSource};
10use crate::staging_util::Invariant;
11use crate::{Stream, Unbounded};
12
13pub struct ExternalBytesPort {
14    #[cfg_attr(
15        not(feature = "build"),
16        expect(unused, reason = "unused without feature")
17    )]
18    pub(crate) process_id: usize,
19    #[cfg_attr(
20        not(feature = "build"),
21        expect(unused, reason = "unused without feature")
22    )]
23    pub(crate) port_id: usize,
24}
25
26pub struct ExternalBincodeSink<Type>
27where
28    Type: Serialize,
29{
30    #[cfg_attr(
31        not(feature = "build"),
32        expect(unused, reason = "unused without feature")
33    )]
34    pub(crate) process_id: usize,
35    #[cfg_attr(
36        not(feature = "build"),
37        expect(unused, reason = "unused without feature")
38    )]
39    pub(crate) port_id: usize,
40    pub(crate) _phantom: PhantomData<Type>,
41}
42
43pub struct ExternalBincodeStream<Type>
44where
45    Type: DeserializeOwned,
46{
47    #[cfg_attr(
48        not(feature = "build"),
49        expect(unused, reason = "unused without feature")
50    )]
51    pub(crate) process_id: usize,
52    #[cfg_attr(
53        not(feature = "build"),
54        expect(unused, reason = "unused without feature")
55    )]
56    pub(crate) port_id: usize,
57    pub(crate) _phantom: PhantomData<Type>,
58}
59
60pub struct ExternalProcess<'a, ProcessTag> {
61    pub(crate) id: usize,
62
63    pub(crate) flow_state: FlowState,
64
65    pub(crate) _phantom: Invariant<'a, ProcessTag>,
66}
67
68impl<P> Clone for ExternalProcess<'_, P> {
69    fn clone(&self) -> Self {
70        ExternalProcess {
71            id: self.id,
72            flow_state: self.flow_state.clone(),
73            _phantom: PhantomData,
74        }
75    }
76}
77
78impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
79    type Root = Self;
80
81    fn root(&self) -> Self::Root {
82        self.clone()
83    }
84
85    fn id(&self) -> LocationId {
86        LocationId::ExternalProcess(self.id)
87    }
88
89    fn flow_state(&self) -> &FlowState {
90        &self.flow_state
91    }
92
93    fn is_top_level() -> bool {
94        true
95    }
96}
97
98impl<'a, P> ExternalProcess<'a, P> {
99    pub fn source_external_bytes<L>(
100        &self,
101        to: &L,
102    ) -> (ExternalBytesPort, Stream<Bytes, L, Unbounded>)
103    where
104        L: Location<'a> + NoTick,
105    {
106        let next_external_port_id = {
107            let mut flow_state = self.flow_state.borrow_mut();
108            let id = flow_state.next_external_out;
109            flow_state.next_external_out += 1;
110            id
111        };
112
113        let deser_expr: syn::Expr = syn::parse_quote!(|b| b.unwrap().freeze());
114
115        (
116            ExternalBytesPort {
117                process_id: self.id,
118                port_id: next_external_port_id,
119            },
120            Stream::new(
121                to.clone(),
122                HydroNode::Persist {
123                    inner: Box::new(HydroNode::Network {
124                        from_key: Some(next_external_port_id),
125                        to_location: to.id(),
126                        to_key: None,
127                        serialize_fn: None,
128                        instantiate_fn: DebugInstantiate::Building,
129                        deserialize_fn: Some(deser_expr.into()),
130                        input: Box::new(HydroNode::Source {
131                            source: HydroSource::ExternalNetwork(),
132                            location_kind: LocationId::ExternalProcess(self.id),
133                            metadata: self.new_node_metadata::<Bytes>(),
134                        }),
135                        metadata: to.new_node_metadata::<Bytes>(),
136                    }),
137                    metadata: to.new_node_metadata::<Bytes>(),
138                },
139            ),
140        )
141    }
142
143    pub fn source_external_bincode<L, T>(
144        &self,
145        to: &L,
146    ) -> (ExternalBincodeSink<T>, Stream<T, L, Unbounded>)
147    where
148        L: Location<'a> + NoTick,
149        T: Serialize + DeserializeOwned,
150    {
151        let next_external_port_id = {
152            let mut flow_state = self.flow_state.borrow_mut();
153            let id = flow_state.next_external_out;
154            flow_state.next_external_out += 1;
155            id
156        };
157
158        (
159            ExternalBincodeSink {
160                process_id: self.id,
161                port_id: next_external_port_id,
162                _phantom: PhantomData,
163            },
164            Stream::new(
165                to.clone(),
166                HydroNode::Persist {
167                    inner: Box::new(HydroNode::Network {
168                        from_key: Some(next_external_port_id),
169                        to_location: to.id(),
170                        to_key: None,
171                        serialize_fn: None,
172                        instantiate_fn: DebugInstantiate::Building,
173                        deserialize_fn: Some(crate::stream::deserialize_bincode::<T>(None).into()),
174                        input: Box::new(HydroNode::Source {
175                            source: HydroSource::ExternalNetwork(),
176                            location_kind: LocationId::ExternalProcess(self.id),
177                            metadata: self.new_node_metadata::<T>(),
178                        }),
179                        metadata: to.new_node_metadata::<T>(),
180                    }),
181                    metadata: to.new_node_metadata::<T>(),
182                },
183            ),
184        )
185    }
186}