1use std::marker::PhantomData;
2
3use bytes::Bytes;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use super::{Location, LocationId, NoTick};
8use crate::builder::FlowState;
9use crate::ir::{DebugInstantiate, HydroNode, HydroSource};
10use crate::staging_util::Invariant;
11use crate::{Stream, Unbounded};
12
13pub struct ExternalBytesPort {
14 #[cfg_attr(
15 not(feature = "build"),
16 expect(unused, reason = "unused without feature")
17 )]
18 pub(crate) process_id: usize,
19 #[cfg_attr(
20 not(feature = "build"),
21 expect(unused, reason = "unused without feature")
22 )]
23 pub(crate) port_id: usize,
24}
25
26pub struct ExternalBincodeSink<Type>
27where
28 Type: Serialize,
29{
30 #[cfg_attr(
31 not(feature = "build"),
32 expect(unused, reason = "unused without feature")
33 )]
34 pub(crate) process_id: usize,
35 #[cfg_attr(
36 not(feature = "build"),
37 expect(unused, reason = "unused without feature")
38 )]
39 pub(crate) port_id: usize,
40 pub(crate) _phantom: PhantomData<Type>,
41}
42
43pub struct ExternalBincodeStream<Type>
44where
45 Type: DeserializeOwned,
46{
47 #[cfg_attr(
48 not(feature = "build"),
49 expect(unused, reason = "unused without feature")
50 )]
51 pub(crate) process_id: usize,
52 #[cfg_attr(
53 not(feature = "build"),
54 expect(unused, reason = "unused without feature")
55 )]
56 pub(crate) port_id: usize,
57 pub(crate) _phantom: PhantomData<Type>,
58}
59
60pub struct ExternalProcess<'a, ProcessTag> {
61 pub(crate) id: usize,
62
63 pub(crate) flow_state: FlowState,
64
65 pub(crate) _phantom: Invariant<'a, ProcessTag>,
66}
67
68impl<P> Clone for ExternalProcess<'_, P> {
69 fn clone(&self) -> Self {
70 ExternalProcess {
71 id: self.id,
72 flow_state: self.flow_state.clone(),
73 _phantom: PhantomData,
74 }
75 }
76}
77
78impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
79 type Root = Self;
80
81 fn root(&self) -> Self::Root {
82 self.clone()
83 }
84
85 fn id(&self) -> LocationId {
86 LocationId::ExternalProcess(self.id)
87 }
88
89 fn flow_state(&self) -> &FlowState {
90 &self.flow_state
91 }
92
93 fn is_top_level() -> bool {
94 true
95 }
96}
97
98impl<'a, P> ExternalProcess<'a, P> {
99 pub fn source_external_bytes<L>(
100 &self,
101 to: &L,
102 ) -> (ExternalBytesPort, Stream<Bytes, L, Unbounded>)
103 where
104 L: Location<'a> + NoTick,
105 {
106 let next_external_port_id = {
107 let mut flow_state = self.flow_state.borrow_mut();
108 let id = flow_state.next_external_out;
109 flow_state.next_external_out += 1;
110 id
111 };
112
113 let deser_expr: syn::Expr = syn::parse_quote!(|b| b.unwrap().freeze());
114
115 (
116 ExternalBytesPort {
117 process_id: self.id,
118 port_id: next_external_port_id,
119 },
120 Stream::new(
121 to.clone(),
122 HydroNode::Persist {
123 inner: Box::new(HydroNode::Network {
124 from_key: Some(next_external_port_id),
125 to_location: to.id(),
126 to_key: None,
127 serialize_fn: None,
128 instantiate_fn: DebugInstantiate::Building,
129 deserialize_fn: Some(deser_expr.into()),
130 input: Box::new(HydroNode::Source {
131 source: HydroSource::ExternalNetwork(),
132 location_kind: LocationId::ExternalProcess(self.id),
133 metadata: self.new_node_metadata::<Bytes>(),
134 }),
135 metadata: to.new_node_metadata::<Bytes>(),
136 }),
137 metadata: to.new_node_metadata::<Bytes>(),
138 },
139 ),
140 )
141 }
142
143 pub fn source_external_bincode<L, T>(
144 &self,
145 to: &L,
146 ) -> (ExternalBincodeSink<T>, Stream<T, L, Unbounded>)
147 where
148 L: Location<'a> + NoTick,
149 T: Serialize + DeserializeOwned,
150 {
151 let next_external_port_id = {
152 let mut flow_state = self.flow_state.borrow_mut();
153 let id = flow_state.next_external_out;
154 flow_state.next_external_out += 1;
155 id
156 };
157
158 (
159 ExternalBincodeSink {
160 process_id: self.id,
161 port_id: next_external_port_id,
162 _phantom: PhantomData,
163 },
164 Stream::new(
165 to.clone(),
166 HydroNode::Persist {
167 inner: Box::new(HydroNode::Network {
168 from_key: Some(next_external_port_id),
169 to_location: to.id(),
170 to_key: None,
171 serialize_fn: None,
172 instantiate_fn: DebugInstantiate::Building,
173 deserialize_fn: Some(crate::stream::deserialize_bincode::<T>(None).into()),
174 input: Box::new(HydroNode::Source {
175 source: HydroSource::ExternalNetwork(),
176 location_kind: LocationId::ExternalProcess(self.id),
177 metadata: self.new_node_metadata::<T>(),
178 }),
179 metadata: to.new_node_metadata::<T>(),
180 }),
181 metadata: to.new_node_metadata::<T>(),
182 },
183 ),
184 )
185 }
186}