Partitioning
Partitioning splits the data on a single location in Hydro across multiple locations by rerouting data based on a partitioning function that we generate.
The challenge of partitioning is finding the right partitioning function on each input such that correctness is preserved.
Partitioning is always impossible if the recipient of the partitioned node requires a network channel with TotalOrder
.
The combined outputs of different partitions is necessarily out-of-order.
We will assume that the recipient does not require total order below.
Precursor: Partitioning in Dedalus
The safety of partitioning was initially formalized over Dedalus, a relational algebra based on Datalog. Dedalus programs are a set of rules, where each rule defines the materialization of joins between relations in order to create new facts. Rules in Dedalus are analogous to operators in Hydro, relations are streams, and facts are data in those streams.
Partitioning is safe in Dedalus if any pair of facts that would have joined in the original program end up on the same machine in the partitioned program. This is guaranteed by partitioning the data based on the join key, also known as co-hashing.
Partitioning on the join key is actually unnecessarily conservative. Indeed, functional dependencies between keys were known (for example, if key A
is the hash
of key B
), then we could partition according to that dependency instead. See the SIGMOD '24 paper Optimizing Distributed Protocols with Query Rewrites for more information.
Operator Analysis
Since Hydro supports operators beyond join, we must expand the criteria for partitioning to those new operators. An operator does not affect partitioning unless it involves simultaneously processing a pair of tuples, or requires comparison across pairs of tuples.
We describe each operator below, whether or not they can be partitioned, and why. If an operator is entirely independent of any input, i.e. it can be independently derived by each partition, then it is always partitionable (regardless of the table below).
Operator | Partitionable? | Reason |
---|---|---|
Placeholder | ❌ | Should not be in a Hydro program during analysis. |
Source | ✅ | Has no dependencies on inputs. |
CycleSource | ✅ | Single input stream and no interactions between tuples of that stream. |
Tee | ✅ | Single input stream and no interactions between tuples of that stream. |
Persist | ✅ | Single input stream and no interactions between tuples of that stream. |
Unpersist | ✅ | Single input stream and no interactions between tuples of that stream. |
Delta | ✅ | Equi-joins on the entire tuple, will work with any partitioning scheme. |
Chain | ✅ | No interaction between tuples. |
CrossProduct | ❌ | Compares all pairs of tuple across both inputs. |
CrossSingleton | ❌ | The singleton cannot be partitioned. |
Join | ⚠️ | Must partition on the join key (or some subset of the join key) of the two inputs. |
Difference | ⚠️ | Similar to join but compares the entirety of the two inputs (as opposed to only the join key). Must partition on some attribute of the two inputs. |
AntiJoin | ⚠️ | Similar to join but compares the join key of the 1st input against the entirety of the 2nd input. |
ResolveFutures | ✅ | Single input stream and no interactions between tuples of that stream. |
ResolveFuturesOrdered | ✅ | Single input stream and no interactions between tuples of that stream. |
Map | ✅ | Single input stream and no interactions between tuples of that stream. |
FlatMap | ✅ | Single input stream and no interactions between tuples of that stream. |
Filter | ✅ | Single input stream and no interactions between tuples of that stream. |
FilterMap | ✅ | Single input stream and no interactions between tuples of that stream. |
DeferTick | ✅ | Single input stream and no interactions between tuples of that stream. |
Enumerate | ❌ | Requires processing all elements of a stream on a single machine in order to assign a unique, incrementing index. |
Inspect | ✅ | Single input stream and no interactions between tuples of that stream. |
Unique | ✅ | Equi-joins on the entire tuple, will work with any partitioning scheme. |
Sort | ✅ | Only affects ordering, which we assume does not matter. |
Fold | ❌ | Computing the output requires processing all elements of the stream. |
Scan | ❌ | Computing the output requires processing all elements of the stream. |
FoldKeyed | ⚠️ | Must partition on the fold key. |
Reduce | ❌ | Computing the output requires processing all elements of the stream. |
ReduceKeyed | ⚠️ | Must partition on the reduce key. |
Network | ❌ | N/A |
Counter | ❌ | Should not be in a Hydro program during analysis. |
The implementation of the analysis above can be found in the partitioning_constraint_analysis_node
function in partitioning_node_analysis.rs
.
Implementation Details
During operator analysis, the following data structure is constructed:
let possible_partitionings: &mut BTreeMap<usize, BTreeSet<BTreeMap<usize, StructOrTupleIndex>>>;
This represents the partitioning constraints placed on each operator.
possible_partitionings
is a mapping from operator ID to a set of partitioning constraints on each input that taints the operator.
An parent taints an operator if the parent's values (or some transformation of those values) appear in the operator's stream.
For example, both parents of Chain
taint its output, but only the first parent of AntiJoin
taints its output.
Each element of the set of partitioning constraints is a map, mapping from input ID to the index of the input that must be used to partition.
The index of a struct or tuple is the position of the field in the struct or tuple.
For example, in the tuple (a, b)
, the index of a
is 0
; in the struct { x: 1, y: 2 }
, the index of x
is x
.
Within each map, all inputs with matching values on the specific indices must be partitioned such that they are assigned to the same machine. Partitioning is only possible if for all operators, at least one such map in the set has its constraints met.
The code that determines whether such a partitioning exists, given these constraints, can be found in the partitioning_analysis
function in partitioning_node_analysis.rs
.
Partitioning on a specific key
Many of the operators above require partitioning on a specific key. But what does it mean to partition on a key?
It means that whatever partitioning scheme is used:
- For each single-stream constraint (
FoldKeyed
,ReduceKeyed
), the input must be partitioned such that any two tuples in that stream with the same key is assigned to the same machine. - For each two-stream constraint (
Join
,Difference
,AntiJoin
), the input must be partitioned such that tuples in stream 1 and stream 2 that share the same key are assigned to the same machine.
Critically, although the partitioning constraints are defined on operators, the partitioning scheme is defined on inputs.
Therefore, analysis is necessary in order to translate the constraints on each operator into constraints on inputs and determine if there is a partitioning scheme that simultaneously satisfies all operators' constraints. Specifically, we need a mapping from each operator's constraints to constraints on its inputs' partitioning.
Input Dependency Analysis
We first consider the task of input dependency analysis assuming, as in Dedalus, that each operator's keys were simply a reordering of its inputs' keys.
Then, a 1:1 equality can be drawn between each operator's keys and its inputs, and the constraints on partitioning can be transferred directly.
This is not the case with the introduction of the Map
and FilterMap
operators, which introduce UDFs (User-Defined Functions) that can manipulate data in arbitrary ways.
We will discuss the implications of UDFs and their analysis below in UDF Analysis.
The implementation of the analysis below can be found in the input_dependency_analysis_node
function in partitioning_node_analysis.rs
.
Drawing an equality between an operator's keys and its inputs' keys raises an interesting question: what happens if an operator has multiple inputs?
Resolving Unions
Consider Chain
, which combines two streams.
If each parent of Chain
has different dependencies on the same input, what are the dependencies of Chain
? What if they have dependencies on different inputs?
Unions with Dependencies on the Same Inputs
Chain
's dependencies are the intersection of the dependencies of its parents for a given input.
Imagine Chain
then joins with another stream, as in the following program.
fn chain_parents_same_input<'a, L>(
flow: FlowBuilder<'a>,
input1: Stream<(usize, usize), Process<'a, L>, Unbounded>,
input2: Stream<(usize, usize), Process<'a, L>, Unbounded>,
) {
let process = flow.process::<L>();
let tick = process.tick();
let parent1 = input1.clone();
let parent2 = input1.map(q!(|(a, b)| (a, b+2)));
parent1.batch(&tick, nondet!(/** ... */)).chain(parent2.batch(&tick, nondet!(/** ... */)))
.join(input2.batch(&tick, nondet!(/** ... */)))
.all_ticks()
.for_each(q!(|(a, (input1b, input2b))| {
println!("Joined: {} {} {}", a, input1b, input2b);
}));
}
The program is only partitionable if both parents of Chain
have a 1:1 equality to the input on the join key.
In this example, the 0th position of both parent1
and parent2
's tuples depend directly on the 0th position of input1
; therefore, we can partition on the 0th position of input1
.
Otherwise, partitioning would not be possible.
If parent2
is instead initialized to let parent2 = input1.map(q!(|(a, b)| (b, b+2)))
, then partitioning on the 0th position of input1
would fail for the following values:
input1 = [(2, 1), (1, 2)]
input2 = [(2, 0)]
The two tuples of input1
both need to join with the single tuple of input2
, but they are partitioned to different machines.
Unions with Dependencies on Different Inputs
Chain
's dependencies are the union of the dependencies of its parents across inputs.
Imagine Chain
then joins with another stream, as in the following program.
fn chain_parents_different_inputs<'a, L>(
flow: FlowBuilder<'a>,
input1: Stream<(usize, usize), Process<'a, L>, Unbounded>,
input2: Stream<(usize, usize), Process<'a, L>, Unbounded>,
) {
let process = flow.process::<L>();
let tick = process.tick();
let parent1 = input1.map(q!(|(a, b)| (a, b+2)));
let parent2 = input2.clone().map(q!(|(a, b)| (a, b+2)));
parent1.batch(&tick, nondet!(/** ...*/)).chain(parent2.batch(&tick, nondet!(/** ... */)))
.join(input2.batch(&tick, nondet!(/** ... */)))
.all_ticks()
.for_each(q!(|(a, (input1and2b, input2b))| {
println!("Joined: {} {} {}", a, input1and2b, input2b);
}));
}
The program is only partitionable if each parent of Chain
has a 1:1 equality to its input on the join key.
In this example, the 0th position of parent1
depends on the 0th position of input1
, and the 0th position of parent2
depends directly on the 0th position of input2
; therefore, we can partition on the 0th position of input1
and input2
.
In summary, for union operators (Chain
), its dependencies are the intersection of its parents' dependencies on the same input, and the union of its parents' dependencies across different inputs.
Partitioning is only possible if it is possible over all inputs that Chain
depends on.
Resolving Intersections
We now consider the other operators that have multiple parents.
The output of Difference
and AntiJoin
is a subset of their first parent, so they inherit input dependencies directly from that parent.
This is independent of Operator Analysis above, where a constraint will be added based on how the parents join.
The output of CrossProduct
and CrossSingleton
contains elements of both parents combined into tuples.
The lineage of each tuple element can be directly traced to each parent and its dependencies inherit directly.
The output of Join
contains keys present in both parents and their respective values.
As such, the outputted key of Join
must simultaneously satisfy the dependencies of both parents; its dependency is the union of its parents' dependencies.
Resolving Cycles
Cycles complicate analysis, as the input dependencies of operators depend on that of their parents, which simultaneously has dependencies on their children. At some point in analysis, we will need to derive an operator's dependencies without knowledge of its parents' dependencies.
We know, however, that there are no self-contained cycles without any external inputs; all cycles must stem from a source.
Therefore, we can start analysis from where the source enters the cycle.
We split analysis into an inflationary (optimistic) and a deflationary (pessimistic) phase.
In the inflationary phase, in a Chain
operator, if only one parent's dependencies are known, then we assume that the other parent has the same dependencies.
Once fixpoint is reached, we enter the deflationary phase, where we delete any dependencies that are no longer valid once both parents' dependencies are known.
Implementation Details
The following data structure is maintained during input dependency analysis:
pub struct InputDependencyMetadata {
// Const fields
pub cluster_to_partition: LocationId,
pub inputs: BTreeSet<usize>,
// Variables
pub optimistic_phase: bool,
pub input_taint: BTreeMap<usize, BTreeSet<usize>>,
pub input_dependencies: BTreeMap<usize, BTreeMap<usize, StructOrTuple>>,
pub syn_analysis: BTreeMap<usize, StructOrTuple>,
}
optimistic_phase
istrue
during the inflationary phase andfalse
during the deflationary phase.input_taint
is a mapping from operator ID to the set of inputs that taint it.input_dependencies
is a mapping from operator ID to the inputs that taint it, and for each field in the operator's output, which input fields it depends on. For a given operator, if it is tainted by an input ininput_taint
but has no dependencies on it ininput_dependencies
, then none of its fields have a 1:1 equality to that input. If the operator can only be conditionally partitioned (as seen in Operator Analysis), then it will incapable of satisfying that condition.syn_analysis
is a cached mapping from eachMap
orFilterMap
operator ID to its UDF's dependency analysis results, derived from UDF Analysis below. These results map the dependencies of the UDF's output fields to the UDF's parameters, which can then be used to derive its dependencies on the program input.
UDF Analysis
As alluded to earlier, the introduction of the Map
and FilterMap
operators allow arbitrary manipulation of data.
This means that a deep symbolic analysis of each UDF is necessary in order to determine which parameters, if any, have a 1:1 equality to the output fields of each UDF.
This analysis can be found in partition_syn_analysis.rs
.
A critical data structure in UDF analysis is StructOrTuple
, which represents a named variable's dependencies on the parameters of the UDF.
pub type StructOrTupleIndex = Vec<String>;
pub struct StructOrTuple {
dependencies: BTreeSet<StructOrTupleIndex>,
fields: BTreeMap<String, Box<StructOrTuple>>,
could_be_none: bool,
}
dependencies
contains any UDF parameter indices that this struct or tuple has a 1:1 equivalence to. If the set is empty, then there are no dependencies; if the set contains the empty vector, then this struct or tuple is equivalent to the entire UDF parameter.fields
is a mapping from field names to their corresponding dependencies. Since tuples and structs may have multiple layers, this maps to a deeperStructOrTuple
.could_be_none
is a boolean that is true if the struct or tuple could beNone
, for example if we havelet x = if y { Some(z) } else { None }
. This is useful inFilterMap
, whereNone
values are removed, givingx
a 1:1 equivalence toz
; otherwise,x
would have no dependencies.
As an example, for the following trivial function, x
would be represented with a StructOrTuple
with no dependencies and two fields, 0
and 1
. The field 0
would map to a StructOrTuple
with dependencies on 1
(the index of b
in the parameter), and the field 1
would map to a StructOrTuple
with dependencies on 0
(the index of a
in the parameter).
let _ = |(a, b): (usize, usize)| {
let x = (b, a);
};
UDF analysis begins with AnalyzeClosure
, which finds the UDF block in the given operator.
It records all named parameters with TupleDeclareLhs
, and analyzes the block with EqualityAnalysis
.
TupleDeclareLhs
recursively unwraps tuples and structs, recording a mapping from each named variable to its index in the input.- For example, given the parameters
(a, (b, c))
, it will mapa
to[0]
,b
to[1, 0]
, andc
to[1, 1]
. - These indices are stored as
StructOrTupleIndex
, which is a vector of strings.
- For example, given the parameters
EqualityAnalysis
iterates through all statements in a given block (code surrounded by{}
).- For declarations or assignments (code of the format
LHS = RHS
orlet LHS = RHS
), it extracts the newly named or shadowed variables on the LHS withTupleDeclareLhs
, and analyzes the RHS withStructOrTupleUseRhs
. - It maintains a running list of mappings from named variables to their dependencies, overwriting older values as they are shadowed or replaced.
- The return value of the block is the last expression that is not followed by a semicolon.
- For declarations or assignments (code of the format
StructOrTupleUseRhs
recursively analyzes the RHS of an assignment, which may contain code blocks and invokeEqualityAnalysis
recursively.- It constructs a
StructOrTuple
representing the dependencies on the UDF parameters on the RHS, which is mapped to the named variable on the LHS byEqualityAnalysis
.
- It constructs a
UDF analysis is conservative and does not support many Rust features.