An Example With Streaming Input
In this example we will cover:
- the input
channel
concept, which streams data in from outside the Hydroflow spec- the
source_stream
operator that brings channel input into Hydroflow- Rust syntax to programmatically send data to a (local) channel
In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the source_iter
operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same thread and sent into the channel programmatically via Rust.
For discussion, we start with a skeleton much like before:
use hydroflow::hydroflow_syntax;
pub fn main() {
let mut hydroflow = hydroflow_syntax! {
// code will go here
};
hydroflow.run_available();
}
TODO: Make the following less intimidating to users who are not Tokio experts.
To add a new external input
channel, we can use the hydroflow::util::unbounded_channel()
function in Rust before we declare the Hydroflow spec:
// Create our channel input
let (input_example, example_recv) = hydroflow::util::unbounded_channel::<usize>();
Under the covers, this is a multiple-producer/single-consumer (mpsc
) channel
provided by the tokio library for Rust, which is usually the appropriate choice for an inbound Hydroflow stream.
Think of it as a high-performance "mailbox" that any sender can fill with well-typed data.
The Rust ::<usize>
syntax uses what is affectionately
called the "turbofish", which is how type parameters (generic arguments) are
supplied to generic types and functions. In this case it specifies that this tokio channel
transmits items of type usize
.
The returned example_recv
value can be used via a source_stream
to build a Hydroflow subgraph just like before.
Here is the same program as before, but using the
input channel. Back in the simple
project, replace the contents of src/main.rs
with the following:
use hydroflow::hydroflow_syntax;
pub fn main() {
// Create our channel input
let (input_example, example_recv) = hydroflow::util::unbounded_channel::<usize>();
let mut flow = hydroflow_syntax! {
source_stream(example_recv)
-> filter_map(|n: usize| {
let n2 = n * n;
if n2 > 10 {
Some(n2)
}
else {
None
}
})
-> flat_map(|n| (n..=n+1))
-> for_each(|n| println!("Ahoy, {}", n));
};
println!("A");
input_example.send(1).unwrap();
input_example.send(0).unwrap();
input_example.send(2).unwrap();
input_example.send(3).unwrap();
input_example.send(4).unwrap();
input_example.send(5).unwrap();
flow.run_available();
println!("B");
input_example.send(6).unwrap();
input_example.send(7).unwrap();
input_example.send(8).unwrap();
input_example.send(9).unwrap();
flow.run_available();
}
cargo run
<build output>
A
Ahoy, 16
Ahoy, 17
Ahoy, 25
Ahoy, 26
B
Ahoy, 36
Ahoy, 37
Ahoy, 49
Ahoy, 50
Ahoy, 64
Ahoy, 65
Ahoy, 81
Ahoy, 82
At the bottom of main.rs
we can see how to programatically supply usize
-typed inputs with the tokio
.send()
method.
We call Rust's .unwrap()
method
to ignore the error messages from .send()
in this simple case. In later examples we'll see how to
allow for data coming in over a network.