Skip to main content

Aggregations and Ticks

Hydroflow+ streams support aggregation operators such as fold(), which can be used to compute results that combine information from multiple elements along a stream. However, because Hydroflow+ streams are infinite, these operators have slightly different semantics than typical implementations.

In particular, Hydroflow (and Hydroflow+) adhere to a tick model of computation, where the stream is chunked into finite ticks of data. At the beginning of each tick on each process (ticks are not synchronized), a batch of inputs are collected, the local dataflow graph is executed, and a batch of outputs are produced. This enables Hydroflow to support infinite streams while still being able to apply optimizations such as vectorization.

Specifying Windows

For most operators, such as map and filter, which operate on each element independently, the tick model is transparent. However, for operators that combine multiple elements, such as fold, the tick model is more visible. In particular, developers must explicitly specify whether they want aggregations such as fold or multi-stream operators such as join to operate over the latest batch of data or all data since the beginning of the stream.

note

Hydroflow+ tracks windows using the W type parameter on Stream. For streams with an unspecified window, this type will be Async, but with a window it will be Windowed. This guards against accidental aggregations since the type system will prevent you from aggregating over a stream with an Async window.

To specify this window, Hydroflow+ offers two operators, tick_batch() and all_ticks(). The former specifies that the operator should operate over the latest batch of data, while the latter specifies that the operator should operate over all data since the beginning of the stream.

For example, consider a pipelined aggregation across two processes. We can sum up elements on the first process in a batched manner using tick_batch(), then sum up the results on the second process in an unbounded manner using all_ticks():

let root_stream = flow.source_stream(&process, q!(1..=10));
root_stream
.tick_batch()
.fold(q!(|| 0), q!(|acc, x| *acc += x))
.send_bincode(&process2);
.all_ticks()
.fold(q!(|| 0), q!(|acc, x| acc + x));

Note that Hydroflow+ streams are still unbounded! The all_ticks() operator does not wait until all elements are received, instead on each tick it will operate over all data since the beginning of the stream.

So if we were to pass in inputs 1, 2, 3, we might get the following results on process 2:

First Tick: 0
Second Tick: (1 + 2) = 3
Third Tick: (1 + 2) + (3) = 6