Data Sources and Sinks in Rust
Any useful flow requires us to define sources of data, either generated computationally or received from an outside environment via I/O.
One-time Iterator Sources
A flow can receive data from a Rust collection object via the source_iter()
operator, which takes the
iterable collection as an argument and passes the items down the flow.
For example, here we iterate through a vector of usize
items and push them down the flow:
source_iter(vec![0, 1]) -> ...
The Hello, World example above uses this construct.
The source_file()
and source_json()
operators are similar, but read from a specified file.
All of these operators output the contents of their collection only once, during the first tick.
To output every tick, consider feeding results into a persist()
operator.
Streaming Sources
More commonly, a flow should handle external data coming in asynchronously from a Tokio runtime.
One way to do this is with channels that allow Rust code to send data into DFIR via the source_stream()
operator.
The code below creates a channel for data of (Rust) type (usize, usize)
:
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<(usize, usize)>();
Under the hood this uses Tokio unbounded channels. Now in Rust we can now push data into the channel. E.g. for testing we can do it explicitly as follows:
input_send.send((0, 1)).unwrap()
And in our DFIR syntax we can receive the data from the channel using the source_stream()
operator and
pass it along a flow:
source_stream(input_recv) -> ...
To put this together, let's revisit our Hello, World example from above with data sent in from outside the flow:
use dfir_rs::dfir_syntax;
fn main() {
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<&str>();
let mut flow = dfir_syntax! {
source_stream(input_recv) -> map(|x| x.to_uppercase())
-> for_each(|x| println!("{}", x));
};
input_send.send("Hello").unwrap();
input_send.send("World").unwrap();
flow.run_available();
}
Sometimes we want to trigger activity based on timing, not data. To achieve this, we can use the source_interval()
operator, which takes a Duration
d
as an argument, and outputs a unit ()
after every d
units of time pass.
Destinations
As duals to our data source operators, we also have data destination operators. The dest operators you'll likely use
most often are dest_sink()
and dest_file()
.
dest_sink
provides a way to output to async
Sink
s,
while dest_file
provides a way to append to a file given a particular path.
They are fairly straightforward, so the best source for further information is the documentation you can find by following the links on the operator names above.
Outputing data synchronously
Not all output destinations are asynchronous, in which case we can use for_each
instead.
The easiest way to output data from DFIR is using an unbounded channel.
Since the channel is unbounded, we can always synchronously write to it using a for_each
operator:
use dfir_rs::dfir_syntax;
fn main() {
let (output_send, mut output_recv) = dfir_rs::util::unbounded_channel::<char>();
let mut flow = dfir_syntax! {
source_iter("Hello World".chars()) -> map(|c| c.to_ascii_uppercase())
-> for_each(|c| output_send.send(c).unwrap());
};
flow.run_available();
let output = &*dfir_rs::util::collect_ready::<String, _>(&mut output_recv);
assert_eq!(output, "HELLO WORLD");
}
Here we use collect_ready
as a quick way to validate the output. In a fully functional program you should spin up a separate async task to consume the output from output_recv
as it arrives.
Serde: Network Serialization and Deserialization
One of the mechanical annoyances of networked systems is the need to convert data to wire format ("serialization") and convert it back from wire format to data ("deserialization"),
also known as "Serde".
This can be done with map
functions, but we provide a convenience source/sink pair that does serde and networking for you.
The source side, source_serde()
generates tuples of the type (T, SocketAddr)
,
where the first field is a deserialized item of type T
, and the second field is the address of the sender of the item.
The dest side, source_serde()
, takes in tuples of type (T, SocketAddr)
,
where the first field is an item of type T
to be serialized, and the second field is a destination address.