Skip to main content

hydro_lang/location/
external_process.rs

1//! Types for representing external processes that communicate with a Hydro dataflow.
2//!
3//! An **external process** is a process that lives outside the Hydro dataflow graph but
4//! can send data to and receive data from locations within the graph. This is the primary
5//! mechanism for feeding input into a Hydro program and observing its output at runtime.
6//!
7//! The main type in this module is [`External`], which represents a handle to an external
8//! process. Port types such as [`ExternalBytesPort`], [`ExternalBincodeSink`],
9//! [`ExternalBincodeBidi`], and [`ExternalBincodeStream`] represent the different kinds of
10//! communication channels that can be established between an external process and the
11//! dataflow.
12
13use std::marker::PhantomData;
14
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17
18use crate::compile::builder::{ExternalPortId, FlowState};
19use crate::live_collections::stream::{ExactlyOnce, Ordering, Retries, TotalOrder};
20use crate::location::LocationKey;
21use crate::staging_util::Invariant;
22
23/// Marker type indicating that a port is connected to a single external client.
24pub enum NotMany {}
25/// Marker type indicating that a port is connected to multiple external clients.
26pub enum Many {}
27
28/// A port handle for sending or receiving raw bytes with an external process.
29///
30/// The `M` type parameter is either [`NotMany`] (single client) or [`Many`] (multiple
31/// clients). When `M` is [`Many`], the port can be cloned to allow multiple external
32/// clients to connect.
33pub struct ExternalBytesPort<M = NotMany> {
34    pub(crate) process_key: LocationKey,
35    pub(crate) port_id: ExternalPortId,
36    pub(crate) _phantom: PhantomData<M>,
37}
38
39impl Clone for ExternalBytesPort<Many> {
40    fn clone(&self) -> Self {
41        Self {
42            process_key: self.process_key,
43            port_id: self.port_id,
44            _phantom: Default::default(),
45        }
46    }
47}
48
49/// A sink handle for sending bincode-serialized data from an external process into the
50/// dataflow.
51///
52/// The type parameters control the serialized type (`Type`), whether multiple clients
53/// are supported (`Many`), the ordering guarantee (`O`), and the retry semantics (`R`).
54pub struct ExternalBincodeSink<
55    Type,
56    Many = NotMany,
57    O: Ordering = TotalOrder,
58    R: Retries = ExactlyOnce,
59> where
60    Type: Serialize,
61{
62    pub(crate) process_key: LocationKey,
63    pub(crate) port_id: ExternalPortId,
64    pub(crate) _phantom: PhantomData<(Type, Many, O, R)>,
65}
66
67impl<T: Serialize, O: Ordering, R: Retries> Clone for ExternalBincodeSink<T, Many, O, R> {
68    fn clone(&self) -> Self {
69        Self {
70            process_key: self.process_key,
71            port_id: self.port_id,
72            _phantom: Default::default(),
73        }
74    }
75}
76
77/// A bidirectional port handle for exchanging bincode-serialized data with an external
78/// process.
79///
80/// `InType` is the type of messages received from the external process, and `OutType` is
81/// the type of messages sent back. The `M` type parameter controls whether multiple
82/// clients are supported ([`NotMany`] or [`Many`]).
83pub struct ExternalBincodeBidi<InType, OutType, M = NotMany> {
84    pub(crate) process_key: LocationKey,
85    pub(crate) port_id: ExternalPortId,
86    pub(crate) _phantom: PhantomData<(InType, OutType, M)>,
87}
88
89impl<InT, OutT> Clone for ExternalBincodeBidi<InT, OutT, Many> {
90    fn clone(&self) -> Self {
91        Self {
92            process_key: self.process_key,
93            port_id: self.port_id,
94            _phantom: Default::default(),
95        }
96    }
97}
98
99/// A stream handle for receiving bincode-serialized data from an external process.
100///
101/// The type parameters control the deserialized element type (`Type`), the ordering
102/// guarantee (`O`), and the retry semantics (`R`).
103pub struct ExternalBincodeStream<Type, O: Ordering = TotalOrder, R: Retries = ExactlyOnce>
104where
105    Type: DeserializeOwned,
106{
107    #[cfg_attr(
108        not(feature = "build"),
109        expect(unused, reason = "unused without feature")
110    )]
111    pub(crate) process_key: LocationKey,
112    #[cfg_attr(
113        not(feature = "build"),
114        expect(unused, reason = "unused without feature")
115    )]
116    pub(crate) port_id: ExternalPortId,
117    pub(crate) _phantom: PhantomData<(Type, O, R)>,
118}
119
120/// A handle representing an external process that can communicate with the Hydro dataflow.
121///
122/// External processes live outside the compiled dataflow graph and interact with it by
123/// sending and receiving data through ports. Use methods on [`Location`](super::Location)
124/// such as [`source_external_bytes`](super::Location::source_external_bytes),
125/// [`source_external_bincode`](super::Location::source_external_bincode), and
126/// [`bind_single_client`](super::Location::bind_single_client) to establish communication
127/// channels between a location and an external process.
128///
129/// The `Tag` type parameter is a user-defined marker type that distinguishes different
130/// external process roles at the type level.
131pub struct External<'a, Tag> {
132    pub(crate) key: LocationKey,
133
134    pub(crate) flow_state: FlowState,
135
136    pub(crate) _phantom: Invariant<'a, Tag>,
137}
138
139impl<P> Clone for External<'_, P> {
140    fn clone(&self) -> Self {
141        External {
142            key: self.key,
143            flow_state: self.flow_state.clone(),
144            _phantom: PhantomData,
145        }
146    }
147}