1use std::marker::PhantomData;
2
3use dfir_rs::bytes::Bytes;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use super::{Location, LocationId, NoTick};
8use crate::builder::FlowState;
9use crate::ir::{HydroNode, HydroSource};
10use crate::staging_util::Invariant;
11use crate::{Stream, Unbounded};
12
13pub struct ExternalBytesPort {
14 #[cfg_attr(
15 not(feature = "build"),
16 expect(unused, reason = "unused without feature")
17 )]
18 pub(crate) process_id: usize,
19 #[cfg_attr(
20 not(feature = "build"),
21 expect(unused, reason = "unused without feature")
22 )]
23 pub(crate) port_id: usize,
24}
25
26pub struct ExternalBincodeSink<T: Serialize> {
27 #[cfg_attr(
28 not(feature = "build"),
29 expect(unused, reason = "unused without feature")
30 )]
31 pub(crate) process_id: usize,
32 #[cfg_attr(
33 not(feature = "build"),
34 expect(unused, reason = "unused without feature")
35 )]
36 pub(crate) port_id: usize,
37 pub(crate) _phantom: PhantomData<T>,
38}
39
40pub struct ExternalBincodeStream<T: DeserializeOwned> {
41 #[cfg_attr(
42 not(feature = "build"),
43 expect(unused, reason = "unused without feature")
44 )]
45 pub(crate) process_id: usize,
46 #[cfg_attr(
47 not(feature = "build"),
48 expect(unused, reason = "unused without feature")
49 )]
50 pub(crate) port_id: usize,
51 pub(crate) _phantom: PhantomData<T>,
52}
53
54pub struct ExternalProcess<'a, P> {
55 pub(crate) id: usize,
56
57 pub(crate) flow_state: FlowState,
58
59 pub(crate) _phantom: Invariant<'a, P>,
60}
61
62impl<P> Clone for ExternalProcess<'_, P> {
63 fn clone(&self) -> Self {
64 ExternalProcess {
65 id: self.id,
66 flow_state: self.flow_state.clone(),
67 _phantom: PhantomData,
68 }
69 }
70}
71
72impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
73 type Root = Self;
74
75 fn root(&self) -> Self::Root {
76 self.clone()
77 }
78
79 fn id(&self) -> LocationId {
80 LocationId::ExternalProcess(self.id)
81 }
82
83 fn flow_state(&self) -> &FlowState {
84 &self.flow_state
85 }
86
87 fn is_top_level() -> bool {
88 true
89 }
90}
91
92impl<'a, P> ExternalProcess<'a, P> {
93 pub fn source_external_bytes<L: Location<'a> + NoTick>(
94 &self,
95 to: &L,
96 ) -> (ExternalBytesPort, Stream<Bytes, L, Unbounded>) {
97 let next_external_port_id = {
98 let mut flow_state = self.flow_state.borrow_mut();
99 let id = flow_state.next_external_out;
100 flow_state.next_external_out += 1;
101 id
102 };
103
104 let deser_expr: syn::Expr = syn::parse_quote!(|b| b.unwrap().freeze());
105
106 (
107 ExternalBytesPort {
108 process_id: self.id,
109 port_id: next_external_port_id,
110 },
111 Stream::new(
112 to.clone(),
113 HydroNode::Persist {
114 inner: Box::new(HydroNode::Network {
115 from_key: Some(next_external_port_id),
116 to_location: to.id(),
117 to_key: None,
118 serialize_fn: None,
119 instantiate_fn: crate::ir::DebugInstantiate::Building,
120 deserialize_fn: Some(deser_expr.into()),
121 input: Box::new(HydroNode::Source {
122 source: HydroSource::ExternalNetwork(),
123 location_kind: LocationId::ExternalProcess(self.id),
124 metadata: self.new_node_metadata::<Bytes>(),
125 }),
126 metadata: to.new_node_metadata::<Bytes>(),
127 }),
128 metadata: to.new_node_metadata::<Bytes>(),
129 },
130 ),
131 )
132 }
133
134 pub fn source_external_bincode<L: Location<'a> + NoTick, T: Serialize + DeserializeOwned>(
135 &self,
136 to: &L,
137 ) -> (ExternalBincodeSink<T>, Stream<T, L, Unbounded>) {
138 let next_external_port_id = {
139 let mut flow_state = self.flow_state.borrow_mut();
140 let id = flow_state.next_external_out;
141 flow_state.next_external_out += 1;
142 id
143 };
144
145 (
146 ExternalBincodeSink {
147 process_id: self.id,
148 port_id: next_external_port_id,
149 _phantom: PhantomData,
150 },
151 Stream::new(
152 to.clone(),
153 HydroNode::Persist {
154 inner: Box::new(HydroNode::Network {
155 from_key: Some(next_external_port_id),
156 to_location: to.id(),
157 to_key: None,
158 serialize_fn: None,
159 instantiate_fn: crate::ir::DebugInstantiate::Building,
160 deserialize_fn: Some(crate::stream::deserialize_bincode::<T>(None).into()),
161 input: Box::new(HydroNode::Source {
162 source: HydroSource::ExternalNetwork(),
163 location_kind: LocationId::ExternalProcess(self.id),
164 metadata: self.new_node_metadata::<T>(),
165 }),
166 metadata: to.new_node_metadata::<T>(),
167 }),
168 metadata: to.new_node_metadata::<T>(),
169 },
170 ),
171 )
172 }
173}