Skip to main content

Data Sources and Sinks in Rust

Any useful flow requires us to define sources of data, either generated computationally or received from an outside environment via I/O.

One-time Iterator Sources

A flow can receive data from a Rust collection object via the source_iter() operator, which takes the iterable collection as an argument and passes the items down the flow. For example, here we iterate through a vector of usize items and push them down the flow:

    source_iter(vec![0, 1]) -> ...

The Hello, World example above uses this construct.

The source_file() and source_json() operators are similar, but read from a specified file.

All of these operators output the contents of their collection only once, during the first tick. To output every tick, consider feeding results into a persist() operator.

Streaming Sources

More commonly, a flow should handle external data coming in asynchronously from a Tokio runtime.

One way to do this is with channels that allow Rust code to send data into Hydroflow via the source_stream() operator. The code below creates a channel for data of (Rust) type (usize, usize):

    let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();

Under the hood this uses Tokio unbounded channels. Now in Rust we can now push data into the channel. E.g. for testing we can do it explicitly as follows:

    input_send.send((0, 1)).unwrap()

And in our Hydroflow syntax we can receive the data from the channel using the source_stream() operator and pass it along a flow:

    source_stream(input_recv) -> ...

To put this together, let's revisit our Hello, World example from above with data sent in from outside the flow:

use hydroflow::hydroflow_syntax;

fn main() {
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<&str>();
let mut flow = hydroflow_syntax! {
source_stream(input_recv) -> map(|x| x.to_uppercase())
-> for_each(|x| println!("{}", x));
};
input_send.send("Hello").unwrap();
input_send.send("World").unwrap();
flow.run_available();
}

Sometimes we want to trigger activity based on timing, not data. To achieve this, we can use the source_interval() operator, which takes a Duration d as an argument, and outputs a unit () after every d units of time pass.

Destinations

As duals to our data source operators, we also have data destination operators. The dest operators you'll likely use most often are dest_sink() and dest_file(). dest_sink provides a way to output to async Sinks, while dest_file provides a way to append to a file given a particular path. They are fairly straightforward, so the best source for further information is the documentation you can find by following the links on the operator names above.

Outputing data synchronously

Not all output destinations are asynchronous, in which case we can use for_each instead. The easiest way to output data from Hydroflow is using an unbounded channel. Since the channel is unbounded, we can always synchronously write to it using a for_each operator:

use hydroflow::hydroflow_syntax;

fn main() {
let (output_send, mut output_recv) = hydroflow::util::unbounded_channel::<char>();
let mut flow = hydroflow_syntax! {
source_iter("Hello World".chars()) -> map(|c| c.to_ascii_uppercase())
-> for_each(|c| output_send.send(c).unwrap());
};
flow.run_available();

let output = &*hydroflow::util::collect_ready::<String, _>(&mut output_recv);
assert_eq!(output, "HELLO WORLD");
}

Here we use collect_ready as a quick way to validate the output. In a fully functional program you should spin up a separate async task to consume the output from output_recv as it arrives.

Serde: Network Serialization and Deserialization

One of the mechanical annoyances of networked systems is the need to convert data to wire format ("serialization") and convert it back from wire format to data ("deserialization"), also known as "Serde". This can be done with map functions, but we provide a convenience source/sink pair that does serde and networking for you. The source side, source_serde() generates tuples of the type (T, SocketAddr), where the first field is a deserialized item of type T, and the second field is the address of the sender of the item. The dest side, source_serde(), takes in tuples of type (T, SocketAddr), where the first field is an item of type T to be serialized, and the second field is a destination address.