hydro_lang/location/process.rs
1//! Definition of the [`Process`] location type, representing a single-node
2//! compute location in a distributed Hydro program.
3//!
4//! A [`Process`] is the simplest kind of location: it corresponds to exactly one
5//! machine (or OS process) and all live collections placed on it are materialized
6//! on that single node. Use a process when the computation does not need to be
7//! replicated or partitioned across multiple nodes.
8//!
9//! Processes are created via [`FlowBuilder::process`](crate::compile::builder::FlowBuilder::process)
10//! and are parameterized by a **tag type** (`ProcessTag`) that lets the type
11//! system distinguish different processes at compile time.
12
13use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::LocationKey;
19use crate::staging_util::Invariant;
20
21/// A single-node location in a distributed Hydro program.
22///
23/// `Process` represents exactly one machine (or OS process) and is one of the
24/// core location types that implements the [`Location`] trait. Live collections
25/// placed on a `Process` are materialized entirely on that single node.
26///
27/// The type parameter `ProcessTag` is a compile-time marker that differentiates
28/// distinct processes in the same dataflow graph (e.g. `Process<'a, Leader>` vs
29/// `Process<'a, Follower>`). It defaults to `()` when only one process is
30/// needed.
31///
32/// # Creating a Process
33/// ```rust,ignore
34/// let mut flow = FlowBuilder::new();
35/// let node = flow.process::<MyTag>();
36/// ```
37pub struct Process<'a, ProcessTag = ()> {
38 pub(crate) key: LocationKey,
39 pub(crate) flow_state: FlowState,
40 pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 write!(f, "Process({})", self.key)
46 }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51 fn eq(&self, other: &Self) -> bool {
52 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53 }
54}
55
56impl<P> Clone for Process<'_, P> {
57 fn clone(&self) -> Self {
58 Process {
59 key: self.key,
60 flow_state: self.flow_state.clone(),
61 _phantom: PhantomData,
62 }
63 }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67 fn id(&self) -> LocationId {
68 LocationId::Process(self.key)
69 }
70
71 fn flow_state(&self) -> &FlowState {
72 &self.flow_state
73 }
74
75 fn is_top_level() -> bool {
76 true
77 }
78
79 fn multiversioned(&self) -> bool {
80 false // processes are always single-versioned
81 }
82}
83
84impl<'a, P> Location<'a> for Process<'a, P> {
85 type Root = Self;
86
87 fn root(&self) -> Self::Root {
88 self.clone()
89 }
90}