Skip to main content

Processes and Streams

Hydroflow+ involves two main concepts:

  • Processes, which represent where elements of a dataflow program are processed
  • Streams, which define what is being computed

By combining the two, Hydroflow+ makes it possible to implement both low-level distributed protocols and high-level dataflow programs using the same API, all while supporting compile-time checks to guard against unexpected sources of nondeterminism.

Processes

Unlike most streaming systems, Hydroflow+ requires that all streams be associated with a particular process. A process is a logical unit of computation that can be deployed to a single machine. Processes are most closely related to actors in actor-based systems, but use streaming operators rather than an imperative API.

To create a process, we must take a ProcessSpec as an argument to our function. This trait abstracts over what the dataflow graph is being built for: compilation to a Rust binary or deployment.

pub fn my_flow<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
...
}

Process specs represent a template for a process, which can be instantiated multiple times to create multiple processes. Multiple process specs can be useful to specify deployment characteristics for different sets of processes, such as deploying them to different cloud providers or regions.

Instantiating a process from a process spec is done using the process method on FlowBuilder:

let process = flow.process(process_spec);

Streams

Streams are infinite ordered sequences of elements. They can be transformed using functional operators such as map and filter, relational operators such as join, and can be connected across processes using send_to.

Instantiating Streams

Root streams are created using methods available on an an instantiated process.

source_iter

To create a stream from a Rust iterator, use source_iter. This is useful for loading static data into the graph. Each element of the iterator will be emitted exactly once in the first tick of execution (see Aggregations and Ticks).

let stream = process.source_iter(q!(vec![1, 2, 3]));

source_stream

To create a stream from an asynchronous source, use source_stream. This takes any type that implements futures::Stream and emits each element as it is received. This is useful for loading data from external sources such as Kafka or a database. Typically, you will want to take the stream as a RuntimeData parameter to your function, and pass the stream in your runtime binary.

pub fn my_flow<'a, D: Deploy<'a>>(
...,
my_stream: RuntimeData<impl Stream<Item = i32>>
) {
let stream = process.source_stream(my_stream);
...
}

Sending Streams between Processes

To send a stream from one process to another, use the send_* methods on the source stream. This takes a parameter of the process to send the data to.

If sending a type that supports serialization using serde, use send_bincode, which uses the bincode crate to serialize the data.

let process0 = flow.process(process_spec);
let process1 = flow.process(process_spec);

let stream0 = process0.source_iter(...);
let stream1 = stream0.send_bincode(process1);

To use custom serializers, you can use the send_bytes method to send a stream of Bytes values.

let stream0 = process0.source_iter(...);
let stream1 = stream0.send_bytes(process1);