hydro_lang/location/
external_process.rs

1use std::marker::PhantomData;
2
3use dfir_rs::bytes::Bytes;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use super::{Location, LocationId, NoTick};
8use crate::builder::FlowState;
9use crate::ir::{HydroNode, HydroSource};
10use crate::staging_util::Invariant;
11use crate::{Stream, Unbounded};
12
13pub struct ExternalBytesPort {
14    #[cfg_attr(
15        not(feature = "build"),
16        expect(unused, reason = "unused without feature")
17    )]
18    pub(crate) process_id: usize,
19    #[cfg_attr(
20        not(feature = "build"),
21        expect(unused, reason = "unused without feature")
22    )]
23    pub(crate) port_id: usize,
24}
25
26pub struct ExternalBincodeSink<T: Serialize> {
27    #[cfg_attr(
28        not(feature = "build"),
29        expect(unused, reason = "unused without feature")
30    )]
31    pub(crate) process_id: usize,
32    #[cfg_attr(
33        not(feature = "build"),
34        expect(unused, reason = "unused without feature")
35    )]
36    pub(crate) port_id: usize,
37    pub(crate) _phantom: PhantomData<T>,
38}
39
40pub struct ExternalBincodeStream<T: DeserializeOwned> {
41    #[cfg_attr(
42        not(feature = "build"),
43        expect(unused, reason = "unused without feature")
44    )]
45    pub(crate) process_id: usize,
46    #[cfg_attr(
47        not(feature = "build"),
48        expect(unused, reason = "unused without feature")
49    )]
50    pub(crate) port_id: usize,
51    pub(crate) _phantom: PhantomData<T>,
52}
53
54pub struct ExternalProcess<'a, P> {
55    pub(crate) id: usize,
56
57    pub(crate) flow_state: FlowState,
58
59    pub(crate) _phantom: Invariant<'a, P>,
60}
61
62impl<P> Clone for ExternalProcess<'_, P> {
63    fn clone(&self) -> Self {
64        ExternalProcess {
65            id: self.id,
66            flow_state: self.flow_state.clone(),
67            _phantom: PhantomData,
68        }
69    }
70}
71
72impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
73    type Root = Self;
74
75    fn root(&self) -> Self::Root {
76        self.clone()
77    }
78
79    fn id(&self) -> LocationId {
80        LocationId::ExternalProcess(self.id)
81    }
82
83    fn flow_state(&self) -> &FlowState {
84        &self.flow_state
85    }
86
87    fn is_top_level() -> bool {
88        true
89    }
90}
91
92impl<'a, P> ExternalProcess<'a, P> {
93    pub fn source_external_bytes<L: Location<'a> + NoTick>(
94        &self,
95        to: &L,
96    ) -> (ExternalBytesPort, Stream<Bytes, L, Unbounded>) {
97        let next_external_port_id = {
98            let mut flow_state = self.flow_state.borrow_mut();
99            let id = flow_state.next_external_out;
100            flow_state.next_external_out += 1;
101            id
102        };
103
104        let deser_expr: syn::Expr = syn::parse_quote!(|b| b.unwrap().freeze());
105
106        (
107            ExternalBytesPort {
108                process_id: self.id,
109                port_id: next_external_port_id,
110            },
111            Stream::new(
112                to.clone(),
113                HydroNode::Persist {
114                    inner: Box::new(HydroNode::Network {
115                        from_key: Some(next_external_port_id),
116                        to_location: to.id(),
117                        to_key: None,
118                        serialize_fn: None,
119                        instantiate_fn: crate::ir::DebugInstantiate::Building,
120                        deserialize_fn: Some(deser_expr.into()),
121                        input: Box::new(HydroNode::Source {
122                            source: HydroSource::ExternalNetwork(),
123                            location_kind: LocationId::ExternalProcess(self.id),
124                            metadata: self.new_node_metadata::<Bytes>(),
125                        }),
126                        metadata: to.new_node_metadata::<Bytes>(),
127                    }),
128                    metadata: to.new_node_metadata::<Bytes>(),
129                },
130            ),
131        )
132    }
133
134    pub fn source_external_bincode<L: Location<'a> + NoTick, T: Serialize + DeserializeOwned>(
135        &self,
136        to: &L,
137    ) -> (ExternalBincodeSink<T>, Stream<T, L, Unbounded>) {
138        let next_external_port_id = {
139            let mut flow_state = self.flow_state.borrow_mut();
140            let id = flow_state.next_external_out;
141            flow_state.next_external_out += 1;
142            id
143        };
144
145        (
146            ExternalBincodeSink {
147                process_id: self.id,
148                port_id: next_external_port_id,
149                _phantom: PhantomData,
150            },
151            Stream::new(
152                to.clone(),
153                HydroNode::Persist {
154                    inner: Box::new(HydroNode::Network {
155                        from_key: Some(next_external_port_id),
156                        to_location: to.id(),
157                        to_key: None,
158                        serialize_fn: None,
159                        instantiate_fn: crate::ir::DebugInstantiate::Building,
160                        deserialize_fn: Some(crate::stream::deserialize_bincode::<T>(None).into()),
161                        input: Box::new(HydroNode::Source {
162                            source: HydroSource::ExternalNetwork(),
163                            location_kind: LocationId::ExternalProcess(self.id),
164                            metadata: self.new_node_metadata::<T>(),
165                        }),
166                        metadata: to.new_node_metadata::<T>(),
167                    }),
168                    metadata: to.new_node_metadata::<T>(),
169                },
170            ),
171        )
172    }
173}