hydro_lang/location/external_process.rs
1//! Types for representing external processes that communicate with a Hydro dataflow.
2//!
3//! An **external process** is a process that lives outside the Hydro dataflow graph but
4//! can send data to and receive data from locations within the graph. This is the primary
5//! mechanism for feeding input into a Hydro program and observing its output at runtime.
6//!
7//! The main type in this module is [`External`], which represents a handle to an external
8//! process. Port types such as [`ExternalBytesPort`], [`ExternalBincodeSink`],
9//! [`ExternalBincodeBidi`], and [`ExternalBincodeStream`] represent the different kinds of
10//! communication channels that can be established between an external process and the
11//! dataflow.
12
13use std::marker::PhantomData;
14
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17
18use crate::compile::builder::{ExternalPortId, FlowState};
19use crate::live_collections::stream::{ExactlyOnce, Ordering, Retries, TotalOrder};
20use crate::location::LocationKey;
21use crate::staging_util::Invariant;
22
23/// Marker type indicating that a port is connected to a single external client.
24pub enum NotMany {}
25/// Marker type indicating that a port is connected to multiple external clients.
26pub enum Many {}
27
28/// A port handle for sending or receiving raw bytes with an external process.
29///
30/// The `M` type parameter is either [`NotMany`] (single client) or [`Many`] (multiple
31/// clients). When `M` is [`Many`], the port can be cloned to allow multiple external
32/// clients to connect.
33pub struct ExternalBytesPort<M = NotMany> {
34 pub(crate) process_key: LocationKey,
35 pub(crate) port_id: ExternalPortId,
36 pub(crate) _phantom: PhantomData<M>,
37}
38
39impl Clone for ExternalBytesPort<Many> {
40 fn clone(&self) -> Self {
41 Self {
42 process_key: self.process_key,
43 port_id: self.port_id,
44 _phantom: Default::default(),
45 }
46 }
47}
48
49/// A sink handle for sending bincode-serialized data from an external process into the
50/// dataflow.
51///
52/// The type parameters control the serialized type (`Type`), whether multiple clients
53/// are supported (`Many`), the ordering guarantee (`O`), and the retry semantics (`R`).
54pub struct ExternalBincodeSink<
55 Type,
56 Many = NotMany,
57 O: Ordering = TotalOrder,
58 R: Retries = ExactlyOnce,
59> where
60 Type: Serialize,
61{
62 pub(crate) process_key: LocationKey,
63 pub(crate) port_id: ExternalPortId,
64 pub(crate) _phantom: PhantomData<(Type, Many, O, R)>,
65}
66
67impl<T: Serialize, O: Ordering, R: Retries> Clone for ExternalBincodeSink<T, Many, O, R> {
68 fn clone(&self) -> Self {
69 Self {
70 process_key: self.process_key,
71 port_id: self.port_id,
72 _phantom: Default::default(),
73 }
74 }
75}
76
77/// A bidirectional port handle for exchanging bincode-serialized data with an external
78/// process.
79///
80/// `InType` is the type of messages received from the external process, and `OutType` is
81/// the type of messages sent back. The `M` type parameter controls whether multiple
82/// clients are supported ([`NotMany`] or [`Many`]).
83pub struct ExternalBincodeBidi<InType, OutType, M = NotMany> {
84 pub(crate) process_key: LocationKey,
85 pub(crate) port_id: ExternalPortId,
86 pub(crate) _phantom: PhantomData<(InType, OutType, M)>,
87}
88
89impl<InT, OutT> Clone for ExternalBincodeBidi<InT, OutT, Many> {
90 fn clone(&self) -> Self {
91 Self {
92 process_key: self.process_key,
93 port_id: self.port_id,
94 _phantom: Default::default(),
95 }
96 }
97}
98
99/// A stream handle for receiving bincode-serialized data from an external process.
100///
101/// The type parameters control the deserialized element type (`Type`), the ordering
102/// guarantee (`O`), and the retry semantics (`R`).
103pub struct ExternalBincodeStream<Type, O: Ordering = TotalOrder, R: Retries = ExactlyOnce>
104where
105 Type: DeserializeOwned,
106{
107 #[cfg_attr(
108 not(feature = "build"),
109 expect(unused, reason = "unused without feature")
110 )]
111 pub(crate) process_key: LocationKey,
112 #[cfg_attr(
113 not(feature = "build"),
114 expect(unused, reason = "unused without feature")
115 )]
116 pub(crate) port_id: ExternalPortId,
117 pub(crate) _phantom: PhantomData<(Type, O, R)>,
118}
119
120/// A handle representing an external process that can communicate with the Hydro dataflow.
121///
122/// External processes live outside the compiled dataflow graph and interact with it by
123/// sending and receiving data through ports. Use methods on [`Location`](super::Location)
124/// such as [`source_external_bytes`](super::Location::source_external_bytes),
125/// [`source_external_bincode`](super::Location::source_external_bincode), and
126/// [`bind_single_client`](super::Location::bind_single_client) to establish communication
127/// channels between a location and an external process.
128///
129/// The `Tag` type parameter is a user-defined marker type that distinguishes different
130/// external process roles at the type level.
131pub struct External<'a, Tag> {
132 pub(crate) key: LocationKey,
133
134 pub(crate) flow_state: FlowState,
135
136 pub(crate) _phantom: Invariant<'a, Tag>,
137}
138
139impl<P> Clone for External<'_, P> {
140 fn clone(&self) -> Self {
141 External {
142 key: self.key,
143 flow_state: self.flow_state.clone(),
144 _phantom: PhantomData,
145 }
146 }
147}