Skip to main content

Partitioning

Partitioning splits the data on a single location in Hydro across multiple locations by rerouting data based on a partitioning function that we generate.

The challenge of partitioning is finding the right partitioning function on each input such that correctness is preserved.

note

Partitioning is always impossible if the recipient of the partitioned node requires a network channel with TotalOrder. The combined outputs of different partitions is necessarily out-of-order. We will assume that the recipient does not require total order below.

Precursor: Partitioning in Dedalus

The safety of partitioning was initially formalized over Dedalus, a relational algebra based on Datalog. Dedalus programs are a set of rules, where each rule defines the materialization of joins between relations in order to create new facts. Rules in Dedalus are analogous to operators in Hydro, relations are streams, and facts are data in those streams.

Partitioning is safe in Dedalus if any pair of facts that would have joined in the original program end up on the same machine in the partitioned program. This is guaranteed by partitioning the data based on the join key, also known as co-hashing.

info

Partitioning on the join key is actually unnecessarily conservative. Indeed, functional dependencies between keys were known (for example, if key A is the hash of key B), then we could partition according to that dependency instead. See the SIGMOD '24 paper Optimizing Distributed Protocols with Query Rewrites for more information.

Operator Analysis

Since Hydro supports operators beyond join, we must expand the criteria for partitioning to those new operators. An operator does not affect partitioning unless it involves simultaneously processing a pair of tuples, or requires comparison across pairs of tuples.

We describe each operator below, whether or not they can be partitioned, and why. If an operator is entirely independent of any input, i.e. it can be independently derived by each partition, then it is always partitionable (regardless of the table below).

OperatorPartitionable?Reason
PlaceholderShould not be in a Hydro program during analysis.
SourceHas no dependencies on inputs.
CycleSourceSingle input stream and no interactions between tuples of that stream.
TeeSingle input stream and no interactions between tuples of that stream.
PersistSingle input stream and no interactions between tuples of that stream.
UnpersistSingle input stream and no interactions between tuples of that stream.
DeltaEqui-joins on the entire tuple, will work with any partitioning scheme.
ChainNo interaction between tuples.
CrossProductCompares all pairs of tuple across both inputs.
CrossSingletonThe singleton cannot be partitioned.
Join⚠️Must partition on the join key (or some subset of the join key) of the two inputs.
Difference⚠️Similar to join but compares the entirety of the two inputs (as opposed to only the join key). Must partition on some attribute of the two inputs.
AntiJoin⚠️Similar to join but compares the join key of the 1st input against the entirety of the 2nd input.
ResolveFuturesSingle input stream and no interactions between tuples of that stream.
ResolveFuturesOrderedSingle input stream and no interactions between tuples of that stream.
MapSingle input stream and no interactions between tuples of that stream.
FlatMapSingle input stream and no interactions between tuples of that stream.
FilterSingle input stream and no interactions between tuples of that stream.
FilterMapSingle input stream and no interactions between tuples of that stream.
DeferTickSingle input stream and no interactions between tuples of that stream.
EnumerateRequires processing all elements of a stream on a single machine in order to assign a unique, incrementing index.
InspectSingle input stream and no interactions between tuples of that stream.
UniqueEqui-joins on the entire tuple, will work with any partitioning scheme.
SortOnly affects ordering, which we assume does not matter.
FoldComputing the output requires processing all elements of the stream.
ScanComputing the output requires processing all elements of the stream.
FoldKeyed⚠️Must partition on the fold key.
ReduceComputing the output requires processing all elements of the stream.
ReduceKeyed⚠️Must partition on the reduce key.
NetworkN/A
CounterShould not be in a Hydro program during analysis.
note

The implementation of the analysis above can be found in the partitioning_constraint_analysis_node function in partitioning_node_analysis.rs.

Implementation Details

During operator analysis, the following data structure is constructed:

let possible_partitionings: &mut BTreeMap<usize, BTreeSet<BTreeMap<usize, StructOrTupleIndex>>>;

This represents the partitioning constraints placed on each operator. possible_partitionings is a mapping from operator ID to a set of partitioning constraints on each input that taints the operator.

info

An parent taints an operator if the parent's values (or some transformation of those values) appear in the operator's stream. For example, both parents of Chain taint its output, but only the first parent of AntiJoin taints its output.

Each element of the set of partitioning constraints is a map, mapping from input ID to the index of the input that must be used to partition.

info

The index of a struct or tuple is the position of the field in the struct or tuple. For example, in the tuple (a, b), the index of a is 0; in the struct { x: 1, y: 2 }, the index of x is x.

Within each map, all inputs with matching values on the specific indices must be partitioned such that they are assigned to the same machine. Partitioning is only possible if for all operators, at least one such map in the set has its constraints met.

note

The code that determines whether such a partitioning exists, given these constraints, can be found in the partitioning_analysis function in partitioning_node_analysis.rs.

Partitioning on a specific key

Many of the operators above require partitioning on a specific key. But what does it mean to partition on a key?

It means that whatever partitioning scheme is used:

  1. For each single-stream constraint (FoldKeyed, ReduceKeyed), the input must be partitioned such that any two tuples in that stream with the same key is assigned to the same machine.
  2. For each two-stream constraint (Join, Difference, AntiJoin), the input must be partitioned such that tuples in stream 1 and stream 2 that share the same key are assigned to the same machine.

Critically, although the partitioning constraints are defined on operators, the partitioning scheme is defined on inputs.

Therefore, analysis is necessary in order to translate the constraints on each operator into constraints on inputs and determine if there is a partitioning scheme that simultaneously satisfies all operators' constraints. Specifically, we need a mapping from each operator's constraints to constraints on its inputs' partitioning.

Input Dependency Analysis

We first consider the task of input dependency analysis assuming, as in Dedalus, that each operator's keys were simply a reordering of its inputs' keys. Then, a 1:1 equality can be drawn between each operator's keys and its inputs, and the constraints on partitioning can be transferred directly. This is not the case with the introduction of the Map and FilterMap operators, which introduce UDFs (User-Defined Functions) that can manipulate data in arbitrary ways. We will discuss the implications of UDFs and their analysis below in UDF Analysis.

note

The implementation of the analysis below can be found in the input_dependency_analysis_node function in partitioning_node_analysis.rs.

Drawing an equality between an operator's keys and its inputs' keys raises an interesting question: what happens if an operator has multiple inputs?

Resolving Unions

Consider Chain, which combines two streams. If each parent of Chain has different dependencies on the same input, what are the dependencies of Chain? What if they have dependencies on different inputs?

Unions with Dependencies on the Same Inputs

Chain's dependencies are the intersection of the dependencies of its parents for a given input.

Imagine Chain then joins with another stream, as in the following program.

fn chain_parents_same_input<'a, L>(
flow: FlowBuilder<'a>,
input1: Stream<(usize, usize), Process<'a, L>, Unbounded>,
input2: Stream<(usize, usize), Process<'a, L>, Unbounded>,
) {
let process = flow.process::<L>();
let tick = process.tick();

let parent1 = input1.clone();
let parent2 = input1.map(q!(|(a, b)| (a, b+2)));
parent1.batch(&tick, nondet!(/** ... */)).chain(parent2.batch(&tick, nondet!(/** ... */)))
.join(input2.batch(&tick, nondet!(/** ... */)))
.all_ticks()
.for_each(q!(|(a, (input1b, input2b))| {
println!("Joined: {} {} {}", a, input1b, input2b);
}));
}

The program is only partitionable if both parents of Chain have a 1:1 equality to the input on the join key. In this example, the 0th position of both parent1 and parent2's tuples depend directly on the 0th position of input1; therefore, we can partition on the 0th position of input1.

Otherwise, partitioning would not be possible. If parent2 is instead initialized to let parent2 = input1.map(q!(|(a, b)| (b, b+2))), then partitioning on the 0th position of input1 would fail for the following values:

  • input1 = [(2, 1), (1, 2)]
  • input2 = [(2, 0)]

The two tuples of input1 both need to join with the single tuple of input2, but they are partitioned to different machines.

Unions with Dependencies on Different Inputs

Chain's dependencies are the union of the dependencies of its parents across inputs.

Imagine Chain then joins with another stream, as in the following program.

fn chain_parents_different_inputs<'a, L>(
flow: FlowBuilder<'a>,
input1: Stream<(usize, usize), Process<'a, L>, Unbounded>,
input2: Stream<(usize, usize), Process<'a, L>, Unbounded>,
) {
let process = flow.process::<L>();
let tick = process.tick();

let parent1 = input1.map(q!(|(a, b)| (a, b+2)));
let parent2 = input2.clone().map(q!(|(a, b)| (a, b+2)));
parent1.batch(&tick, nondet!(/** ...*/)).chain(parent2.batch(&tick, nondet!(/** ... */)))
.join(input2.batch(&tick, nondet!(/** ... */)))
.all_ticks()
.for_each(q!(|(a, (input1and2b, input2b))| {
println!("Joined: {} {} {}", a, input1and2b, input2b);
}));
}

The program is only partitionable if each parent of Chain has a 1:1 equality to its input on the join key. In this example, the 0th position of parent1 depends on the 0th position of input1, and the 0th position of parent2 depends directly on the 0th position of input2; therefore, we can partition on the 0th position of input1 and input2.

In summary, for union operators (Chain), its dependencies are the intersection of its parents' dependencies on the same input, and the union of its parents' dependencies across different inputs. Partitioning is only possible if it is possible over all inputs that Chain depends on.

Resolving Intersections

We now consider the other operators that have multiple parents.

The output of Difference and AntiJoin is a subset of their first parent, so they inherit input dependencies directly from that parent. This is independent of Operator Analysis above, where a constraint will be added based on how the parents join.

The output of CrossProduct and CrossSingleton contains elements of both parents combined into tuples. The lineage of each tuple element can be directly traced to each parent and its dependencies inherit directly.

The output of Join contains keys present in both parents and their respective values. As such, the outputted key of Join must simultaneously satisfy the dependencies of both parents; its dependency is the union of its parents' dependencies.

Resolving Cycles

Cycles complicate analysis, as the input dependencies of operators depend on that of their parents, which simultaneously has dependencies on their children. At some point in analysis, we will need to derive an operator's dependencies without knowledge of its parents' dependencies.

We know, however, that there are no self-contained cycles without any external inputs; all cycles must stem from a source. Therefore, we can start analysis from where the source enters the cycle. We split analysis into an inflationary (optimistic) and a deflationary (pessimistic) phase. In the inflationary phase, in a Chain operator, if only one parent's dependencies are known, then we assume that the other parent has the same dependencies. Once fixpoint is reached, we enter the deflationary phase, where we delete any dependencies that are no longer valid once both parents' dependencies are known.

Implementation Details

The following data structure is maintained during input dependency analysis:

pub struct InputDependencyMetadata {
// Const fields
pub cluster_to_partition: LocationId,
pub inputs: BTreeSet<usize>,
// Variables
pub optimistic_phase: bool,
pub input_taint: BTreeMap<usize, BTreeSet<usize>>,
pub input_dependencies: BTreeMap<usize, BTreeMap<usize, StructOrTuple>>,
pub syn_analysis: BTreeMap<usize, StructOrTuple>,
}
  • optimistic_phase is true during the inflationary phase and false during the deflationary phase.
  • input_taint is a mapping from operator ID to the set of inputs that taint it.
  • input_dependencies is a mapping from operator ID to the inputs that taint it, and for each field in the operator's output, which input fields it depends on. For a given operator, if it is tainted by an input in input_taint but has no dependencies on it in input_dependencies, then none of its fields have a 1:1 equality to that input. If the operator can only be conditionally partitioned (as seen in Operator Analysis), then it will incapable of satisfying that condition.
  • syn_analysis is a cached mapping from each Map or FilterMap operator ID to its UDF's dependency analysis results, derived from UDF Analysis below. These results map the dependencies of the UDF's output fields to the UDF's parameters, which can then be used to derive its dependencies on the program input.

UDF Analysis

As alluded to earlier, the introduction of the Map and FilterMap operators allow arbitrary manipulation of data. This means that a deep symbolic analysis of each UDF is necessary in order to determine which parameters, if any, have a 1:1 equality to the output fields of each UDF.

note

This analysis can be found in partition_syn_analysis.rs.

A critical data structure in UDF analysis is StructOrTuple, which represents a named variable's dependencies on the parameters of the UDF.

pub type StructOrTupleIndex = Vec<String>;
pub struct StructOrTuple {
dependencies: BTreeSet<StructOrTupleIndex>,
fields: BTreeMap<String, Box<StructOrTuple>>,
could_be_none: bool,
}
  • dependencies contains any UDF parameter indices that this struct or tuple has a 1:1 equivalence to. If the set is empty, then there are no dependencies; if the set contains the empty vector, then this struct or tuple is equivalent to the entire UDF parameter.
  • fields is a mapping from field names to their corresponding dependencies. Since tuples and structs may have multiple layers, this maps to a deeper StructOrTuple.
  • could_be_none is a boolean that is true if the struct or tuple could be None, for example if we have let x = if y { Some(z) } else { None }. This is useful in FilterMap, where None values are removed, giving x a 1:1 equivalence to z; otherwise, x would have no dependencies.

As an example, for the following trivial function, x would be represented with a StructOrTuple with no dependencies and two fields, 0 and 1. The field 0 would map to a StructOrTuple with dependencies on 1 (the index of b in the parameter), and the field 1 would map to a StructOrTuple with dependencies on 0 (the index of a in the parameter).

let _ = |(a, b): (usize, usize)| {
let x = (b, a);
};

UDF analysis begins with AnalyzeClosure, which finds the UDF block in the given operator. It records all named parameters with TupleDeclareLhs, and analyzes the block with EqualityAnalysis.

  • TupleDeclareLhs recursively unwraps tuples and structs, recording a mapping from each named variable to its index in the input.
    • For example, given the parameters (a, (b, c)), it will map a to [0], b to [1, 0], and c to [1, 1].
    • These indices are stored as StructOrTupleIndex, which is a vector of strings.
  • EqualityAnalysis iterates through all statements in a given block (code surrounded by {}).
    • For declarations or assignments (code of the format LHS = RHS or let LHS = RHS), it extracts the newly named or shadowed variables on the LHS with TupleDeclareLhs, and analyzes the RHS with StructOrTupleUseRhs.
    • It maintains a running list of mappings from named variables to their dependencies, overwriting older values as they are shadowed or replaced.
    • The return value of the block is the last expression that is not followed by a semicolon.
  • StructOrTupleUseRhs recursively analyzes the RHS of an assignment, which may contain code blocks and invoke EqualityAnalysis recursively.
    • It constructs a StructOrTuple representing the dependencies on the UDF parameters on the RHS, which is mapped to the named variable on the LHS by EqualityAnalysis.
warning

UDF analysis is conservative and does not support many Rust features.