DFIR Operators
In our previous examples we made use of some of DFIR's operators. Here we document each operator in more detail. Many of these operators are based on the Rust equivalents for iterators; see the Rust documentation.
Maps:
enumerate
, identity
, inspect
, map
, py_udf
Simple one-in-one-out operators.
Filters:
filter
, filter_map
One-in zero-or-one-out operators.
Flattens:
flat_map
, flatten
One-in multiple-out operators.
Folds:
all_once
, batch
, fold
, reduce
Operators which accumulate elements together.
Keyed Folds:
fold_keyed
, reduce_keyed
Keyed fold operators.
Lattice Folds:
lattice_fold
, lattice_reduce
Folds based on lattice-merge.
Persistent Operators:
multiset_delta
, defer_signal
, persist
, persist_mut
, persist_mut_keyed
, sort
, sort_by_key
, state
, state_by
, unique
Persistent (stateful) operators.
Multi-Input Operators:
anti_join
, anti_join_multiset
, chain
, cross_join
, cross_join_multiset
, cross_singleton
, difference
, difference_multiset
, join
, join_fused
, join_fused_lhs
, join_fused_rhs
, join_multiset
, lattice_bimorphism
, union
, zip
, zip_longest
Operators with multiple inputs.
Multi-Output Operators:
demux
, demux_enum
, partition
, tee
, unzip
Operators with multiple outputs.
Sources:
initialize
, null
, spin
, source_file
, source_interval
, source_iter
, source_json
, source_stdin
, source_stream
, source_stream_serde
Operators which produce output elements (and consume no inputs).
Sinks:
dest_file
, dest_sink
, dest_sink_serde
, for_each
, null
Operators which consume input elements (and produce no outputs).
Control Flow Operators:
assert
, assert_eq
, next_stratum
, defer_tick
, defer_tick_lazy
Operators which affect control flow/scheduling.
Compiler Fusion Operators:
_lattice_fold_batch
, _lattice_join_fused_join
Operators which are necessary to implement certain optimizations and rewrite rules
Windowing Operator:
all_once
, batch
Operators for windowing loop
inputs.
all_once
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> all_once() | at least 0 and at most 1 | Blocking |
TODO(mingwei): docs
anti_join
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]anti_join() -> | exactly 1 | Blocking |
Input port names:
pos
(streaming),neg
(blocking)
2 input streams the first of type (K, T), the second of type K, with output type (K, T)
For a given tick, computes the anti-join of the items in the input
streams, returning unique items in the pos
input that do not have matching keys
in the neg
input. Note this is set semantics only for the neg element
. Order
is preserved for new elements in a given tick, but not for elements processed
in a previous tick with 'static
.
source_iter(vec![("dog", 1), ("cat", 2), ("elephant", 3)]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = anti_join() -> assert_eq([("elephant", 3)]);
anti_join_multiset
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]anti_join_multiset() -> | exactly 1 | Blocking |
Input port names:
pos
(streaming),neg
(blocking)
2 input streams the first of type (K, T), the second of type K, with output type (K, T)
For a given tick, computes the anti-join of the items in the input
streams, returning items in the pos
input --that do not have matching keys
in the neg
input. NOTE this uses multiset semantics on the positive side,
so duplicated positive inputs will appear in the output either 0 times (if matched in neg
)
or as many times as they appear in the input (if not matched in neg
)
source_iter(vec![("cat", 2), ("cat", 2), ("elephant", 3), ("elephant", 3)]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = anti_join_multiset() -> assert_eq([("elephant", 3), ("elephant", 3)]);
assert
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> assert(A) | at least 0 and at most 1 | Streaming |
1 input stream, 1 optional output stream Arguments: a predicate function that will be applied to each item in the stream
If the predicate returns false for any input item then the operator will panic at runtime.
source_iter([1, 2, 3])
-> assert(|x| *x > 0)
-> assert_eq([1, 2, 3]);
assert_eq
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> assert_eq(A) | at least 0 and at most 1 | Streaming |
1 input stream, 1 optional output stream Arguments: A Vector, Slice, or Array containing objects that will be compared to the input stream.
The input stream will be compared with the provided argument, element by element. If any elements do not match, assert_eq
will panic.
If the input stream produces more elements than are in the provided argument, assert_eq
will panic.
The input stream is passed through assert_eq
unchanged to the output stream.
assert_eq
is mainly useful for testing and documenting the behavior of dfir code inline.
assert_eq
will remember the stream position across ticks, see example.
unioned = union();
source_iter([1]) -> assert_eq([1]) -> unioned;
source_iter([2]) -> defer_tick() -> assert_eq([2]) -> unioned;
unioned -> assert_eq([1, 2]);
batch
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> batch() | at least 0 and at most 1 | Streaming |
TODO(mingwei): docs
chain
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]chain() -> | exactly 1 | Streaming |
2 input streams of the same type, 1 output stream of the same type
Chains together a pair of streams, with all the elements of the first emitted before the second.
Since chain
has multiple input streams, it needs to be assigned to
a variable to reference its multiple input ports across statements.
source_iter(vec!["hello", "world"]) -> [0]my_chain;
source_iter(vec!["stay", "gold"]) -> [1]my_chain;
my_chain = chain()
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY", "GOLD"]);
cross_join
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]cross_join() -> | exactly 1 | Streaming |
Input port names:
0
(streaming),1
(streaming)
2 input streams of type S and T, 1 output stream of type (S, T)
Forms the cross-join (Cartesian product) of the items in the input streams, returning all tupled pairs.
source_iter(vec!["happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat"]) -> [1]my_join;
my_join = cross_join() -> assert_eq([("happy", "dog"), ("sad", "dog"), ("happy", "cat"), ("sad", "cat")]);
cross_join
can be provided with one or two generic lifetime persistence arguments
in the same way as join
, see join
's documentation for more info.
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<&str>();
let mut flow = dfir_rs::dfir_syntax! {
my_join = cross_join::\<'tick>();
source_iter(["hello", "bye"]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join -> for_each(|(s, t)| println!("({}, {})", s, t));
};
input_send.send("oakland").unwrap();
flow.run_tick();
input_send.send("san francisco").unwrap();
flow.run_tick();
Prints only "(hello, oakland)"
and "(bye, oakland)"
. The source_iter
is only included in
the first tick, then forgotten, so when "san francisco"
arrives on input [1]
in the second tick,
there is nothing for it to match with from input [0]
, and therefore it does appear in the output.
cross_join_multiset
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]cross_join_multiset() -> | exactly 1 | Streaming |
Input port names:
0
(streaming),1
(streaming)
2 input streams of type S and T, 1 output stream of type (S, T)
Forms the multiset cross-join (Cartesian product) of the (possibly duplicated) items in the input streams, returning all tupled pairs regardless of duplicates.
source_iter(vec!["happy", "happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat", "cat"]) -> [1]my_join;
my_join = cross_join_multiset() -> sort() -> assert_eq([
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "dog"),
("happy", "dog"),
("sad", "cat"),
("sad", "cat"),
("sad", "dog"), ]);
cross_singleton
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]cross_singleton() -> | exactly 1 | Blocking |
Input port names:
input
(streaming),single
(blocking)
2 input streams, 1 output stream, no arguments.
Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting ignoring everything after the first element. This operator blocks on the singleton input, and then joins it with all the elements in the other stream if an element is present. This operator is useful when a singleton input must be used to transform elements of a stream, since unlike cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional branches, since the operator short circuits if the singleton input produces no values.
There are two inputs to cross_singleton
, they are input
and single
.
input
is the input data flow, and single
is the singleton input.
join = cross_singleton();
source_iter([1, 2, 3]) -> [input]join;
source_iter([0]) -> [single]join;
join -> assert_eq([(1, 0), (2, 0), (3, 0)]);
defer_signal
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]defer_signal() -> | exactly 1 | Blocking |
Input port names:
input
(blocking),signal
(blocking)
2 input streams, 1 output stream, no arguments.
Defers streaming input and releases it downstream when a signal is delivered. The order of input is preserved. This allows for buffering data and delivering it at a later, chosen, tick.
There are two inputs to defer_signal
, they are input
and signal
.
input
is the input data flow. Data that is delivered on this input is collected in order inside of the defer_signal
operator.
When anything is sent to signal
the collected data is released downstream. The entire signal
input is consumed each tick, so sending 5 things on signal
will not release inputs on the next 5 consecutive ticks.
gate = defer_signal();
source_iter([1, 2, 3]) -> [input]gate;
source_iter([()]) -> [signal]gate;
gate -> assert_eq([1, 2, 3]);
defer_tick
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> defer_tick() -> | exactly 1 | Blocking |
Buffers all input items and releases them in the next tick.
the state of the current tick. For example,
See the book discussion of Hydroflow time for details on ticks.
A tick may be divided into multiple strata; see the next_stratum()
operator.
defer_tick
is sometimes needed to separate conflicting data across time,
in order to preserve invariants. Consider the following example, which implements
a flip-flop -- the invariant is that it emit one of true or false in a given tick
(but never both!)
pub fn main() {
let mut df = dfir_rs::dfir_syntax! {
source_iter(vec!(true))
-> state;
state = union()
-> assert(|x| if context.current_tick().0 % 2 == 0 { *x == true } else { *x == false })
-> map(|x| !x)
-> defer_tick()
-> state;
};
for i in 1..100 {
println!("tick {}", i);
df.run_tick();
}
}
defer_tick
can also be handy for comparing stream content across ticks.
In the example below defer_tick()
is used alongside difference()
to
filter out any items that arrive from inp
in the current tick which match
an item from inp
in the previous
tick.
// Outputs 1 2 3 4 5 6 (on separate lines).
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<usize>();
let mut flow = dfir_rs::dfir_syntax! {
inp = source_stream(input_recv) -> tee();
inp -> [pos]diff;
inp -> defer_tick() -> [neg]diff;
diff = difference() -> for_each(|x| println!("{}", x));
};
for x in [1, 2, 3, 4] {
input_send.send(x).unwrap();
}
flow.run_tick();
for x in [3, 4, 5, 6] {
input_send.send(x).unwrap();
}
flow.run_tick();
You can also supply a type parameter defer_tick::\<MyType>()
to specify what items flow
through the the pipeline. This can be useful for helping the compiler infer types.
defer_tick_lazy
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> defer_tick_lazy() -> | exactly 1 | Blocking |
See defer_tick
This operator is identical to defer_tick except that it does not eagerly cause a new tick to be scheduled.
demux
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> demux(A)[<output_port>] -> | at least 2 | Streaming |
Output port names: Variadic, as specified in arguments.
Arguments: A Rust closure, the first argument is a received item and the second argument is a variadic
var_args!
tuple list where each item name is an output port.
Takes the input stream and allows the user to determine which items to deliver to any number of output streams.
Note: Downstream operators may need explicit type annotations.
Note: The
Pusherator
trait is automatically imported to enable the.give(...)
method.
Note: The closure has access to the
context
object.
my_demux = source_iter(1..=100) -> demux(|v, var_args!(fzbz, fizz, buzz, rest)|
match (v % 3, v % 5) {
(0, 0) => fzbz.give(v),
(0, _) => fizz.give(v),
(_, 0) => buzz.give(v),
(_, _) => rest.give(v),
}
);
my_demux[fzbz] -> for_each(|v| println!("{}: fizzbuzz", v));
my_demux[fizz] -> for_each(|v| println!("{}: fizz", v));
my_demux[buzz] -> for_each(|v| println!("{}: buzz", v));
my_demux[rest] -> for_each(|v| println!("{}", v));
demux_enum
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> demux_enum() | any number of | Streaming |
Output port names: Variadic, as specified in arguments.
Generic Argument: A enum type which has
#[derive(DemuxEnum)]
. Must match the items in the input stream.
Takes an input stream of enum instances and splits them into their variants.
#[derive(DemuxEnum)]
enum Shape {
Square(f64),
Rectangle(f64, f64),
Circle { r: f64 },
Triangle { w: f64, h: f64 }
}
let mut df = dfir_syntax! {
my_demux = source_iter([
Shape::Square(9.0),
Shape::Rectangle(10.0, 8.0),
Shape::Circle { r: 5.0 },
Shape::Triangle { w: 12.0, h: 13.0 },
]) -> demux_enum::\<Shape>();
my_demux[Square] -> map(|s| s * s) -> out;
my_demux[Circle] -> map(|(r,)| std::f64::consts::PI * r * r) -> out;
my_demux[Rectangle] -> map(|(w, h)| w * h) -> out;
my_demux[Circle] -> map(|(w, h)| 0.5 * w * h) -> out;
out = union() -> for_each(|area| println!("Area: {}", area));
};
df.run_available();
dest_file
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> dest_file(A, B) | exactly 0 | Streaming |
0 input streams, 1 output stream
Arguments: (1) An
AsRef
\<
Path
>
for a file to write to, and (2) a boolappend
.
Consumes String
s by writing them as lines to a file. The file will be created if it doesn't
exist. Lines will be appended to the file if append
is true, otherwise the file will be
truncated before lines are written.
Note this operator must be used within a Tokio runtime.
source_iter(1..=10) -> map(|n| format!("Line {}", n)) -> dest_file("dest.txt", false);
dest_sink
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> dest_sink(A) | exactly 0 | Streaming |
Arguments: An async
Sink
.
Consumes items by sending them to an async Sink
.
A Sink
is a thing into which values can be sent, asynchronously. For example, sending items
into a bounded channel.
Note this operator must be used within a Tokio runtime, and the Hydroflow program must be launched with run_async
.
// In this example we use a _bounded_ channel for our `Sink`. This is for demonstration only,
// instead you should use [`dfir_rs::util::unbounded_channel`]. A bounded channel results in
// `Hydroflow` buffering items internally instead of within the channel. (We can't use
// unbounded here since unbounded channels are synchonous to write to and therefore not
// `Sink`s.)
let (send, recv) = tokio::sync::mpsc::channel::\<usize>(5);
// `PollSender` adapts the send half of the bounded channel into a `Sink`.
let send = tokio_util::sync::PollSender::new(send);
let mut flow = dfir_rs::dfir_syntax! {
source_iter(0..10) -> dest_sink(send);
};
// Call `run_async()` to allow async events to propagate, run for one second.
tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
.await
.expect_err("Expected time out");
let mut recv = tokio_stream::wrappers::ReceiverStream::new(recv);
// Only 5 elements received due to buffer size.
// (Note that if we were using a multi-threaded executor instead of `current_thread` it would
// be possible for more items to be added as they're removed, resulting in >5 collected.)
let out: Vec\<_> = dfir_rs::util::ready_iter(&mut recv).collect();
assert_eq!(&[0, 1, 2, 3, 4], &*out);
Sink
is different from AsyncWrite
.
Instead of discrete values we send arbitrary streams of bytes into an AsyncWrite
value. For
example, writings a stream of bytes to a file, a socket, or stdout.
To handle those situations we can use a codec from tokio_util::codec
.
These specify ways in which the byte stream is broken into individual items, such as with
newlines or with length delineation.
If we only want to write a stream of bytes without delineation we can use the BytesCodec
.
In this example we use a duplex
as our AsyncWrite
with a
BytesCodec
.
use bytes::Bytes;
use tokio::io::AsyncReadExt;
// Like a channel, but for a stream of bytes instead of discrete objects.
let (asyncwrite, mut asyncread) = tokio::io::duplex(256);
// Now instead handle discrete byte strings by length-encoding them.
let sink = tokio_util::codec::FramedWrite::new(asyncwrite, tokio_util::codec::BytesCodec::new());
let mut flow = dfir_rs::dfir_syntax! {
source_iter([
Bytes::from_static(b"hello"),
Bytes::from_static(b"world"),
]) -> dest_sink(sink);
};
tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
.await
.expect_err("Expected time out");
let mut buf = Vec::\<u8>::new();
asyncread.read_buf(&mut buf).await.unwrap();
assert_eq!(b"helloworld", &*buf);
dest_sink_serde
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> dest_sink_serde(A) | exactly 0 | Streaming |
Arguments: A serializing async
Sink
.
Consumes (payload, addr) pairs by serializing the payload and sending the resulting pair to an async Sink
that delivers them to the SocketAddr
specified by addr
.
Note this operator must be used within a Tokio runtime.
async fn serde_out() {
let addr = dfir_rs::util::ipv4_resolve("localhost:9000".into()).unwrap();
let (outbound, inbound, _) = dfir_rs::util::bind_udp_bytes(addr).await;
let remote = dfir_rs::util::ipv4_resolve("localhost:9001".into()).unwrap();
let mut flow = dfir_rs::dfir_syntax! {
source_iter(vec![("hello".to_string(), 1), ("world".to_string(), 2)])
-> map (|m| (m, remote)) -> dest_sink_serde(outbound);
};
flow.run_available();
}
difference
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]difference() -> | exactly 1 | Blocking |
Input port names:
pos
(streaming),neg
(blocking)
2 input streams of the same type T, 1 output stream of type T
Forms the set difference of the items in the input
streams, returning items in the pos
input that are not found in the
neg
input.
difference
can be provided with one or two generic lifetime persistence arguments
in the same way as join
, see join
's documentation for more info.
Note set semantics only for the neg
input.
source_iter(vec!["dog", "cat", "elephant"]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = difference() -> assert_eq(["elephant"]);
difference_multiset
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]difference_multiset() -> | exactly 1 | Blocking |
Input port names:
pos
(streaming),neg
(blocking)
2 input streams of the same type T, 1 output stream of type T
Forms the set difference of the items in the input
streams, returning items in the pos
input that are not found in the
neg
input.
difference
can be provided with one or two generic lifetime persistence arguments
in the same way as join
, see join
's documentation for more info.
Note multiset semantics here: each (possibly duplicated) item in the pos
input
that has no match in neg
is sent to the output.
source_iter(vec!["cat", "cat", "elephant", "elephant"]) -> [pos]diff;
source_iter(vec!["cat", "gorilla"]) -> [neg]diff;
diff = difference_multiset() -> assert_eq(["elephant", "elephant"]);
enumerate
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> enumerate() -> | exactly 1 | Streaming |
1 input stream of type
T
, 1 output stream of type(usize, T)
For each item passed in, enumerate it with its index: (0, x_0)
, (1, x_1)
, etc.
enumerate
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify if indexing resets. If 'tick
(the default) is specified, indexing will
restart at zero at the start of each tick. Otherwise 'static
will never reset
and count monotonically upwards.
source_iter(vec!["hello", "world"])
-> enumerate()
-> assert_eq([(0, "hello"), (1, "world")]);
filter
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> filter(A) -> | exactly 1 | Streaming |
Filter outputs a subsequence of the items it receives at its input, according to a Rust boolean closure passed in as an argument.
The closure receives a reference &T
rather than an owned value T
because filtering does
not modify or take ownership of the values. If you need to modify the values while filtering
use filter_map
instead.
Note: The closure has access to the
context
object.
source_iter(vec!["hello", "world"]) -> filter(|x| x.starts_with('w'))
-> assert_eq(["world"]);
filter_map
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> filter_map(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
An operator that both filters and maps. It yields only the items for which the supplied closure returns Some(value)
.
Note: The closure has access to the
context
object.
source_iter(vec!["1", "hello", "world", "2"])
-> filter_map(|s| s.parse::\<usize>().ok())
-> assert_eq([1, 2]);
flat_map
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> flat_map(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
Arguments: A Rust closure that handles an iterator
For each item i
passed in, treat i
as an iterator and map the closure to that
iterator to produce items one by one. The type of the input items must be iterable.
Note: The closure has access to the
context
object.
// should print out each character of each word on a separate line
source_iter(vec!["hello", "world"])
-> flat_map(|x| x.chars())
-> assert_eq(['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']);
flatten
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> flatten() -> | exactly 1 | Streaming |
1 input stream, 1 output stream
For each item i
passed in, treat i
as an iterator and produce its items one by one.
The type of the input items must be iterable.
source_iter(vec![[1, 2], [3, 4], [5, 6]])
-> flatten()
-> assert_eq([1, 2, 3, 4, 5, 6]);
fold
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> fold(A, B) | at least 0 and at most 1 | Blocking |
1 input stream, 1 output stream
Arguments: two arguments, both closures. The first closure is used to create the initial value for the accumulator, and the second is used to combine new items with the existing accumulator value. The second closure takes two two arguments: an
&mut Accum
accumulated value, and anItem
.
Akin to Rust's built-in fold
operator, except that it takes the accumulator by &mut
instead of by value. Folds every item
into an accumulator by applying a closure, returning the final result.
Note: The closures have access to the
context
object.
fold
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. With 'tick
, Items will only be collected
within the same tick. With 'static
, the accumulated value will be remembered across ticks and
will be aggregated with items arriving in later ticks. When not explicitly specified
persistence defaults to 'tick
.
// should print `Reassembled vector [1,2,3,4,5]`
source_iter([1,2,3,4,5])
-> fold::\<'tick>(Vec::new, |accum: &mut Vec\<_>, elem| {
accum.push(elem);
})
-> assert_eq([vec![1, 2, 3, 4, 5]]);
fold_keyed
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> fold_keyed(A, B) -> | exactly 1 | Blocking |
1 input stream of type
(K, V1)
, 1 output stream of type(K, V2)
. The output will have one tuple for each distinctK
, with an accumulated value of typeV2
.
If the input and output value types are the same and do not require initialization then use
reduce_keyed
.
Arguments: two Rust closures. The first generates an initial value per group. The second itself takes two arguments: an 'accumulator', and an element. The second closure returns the value that the accumulator should have for the next iteration.
A special case of fold
, in the spirit of SQL's GROUP BY and aggregation constructs. The input
is partitioned into groups by the first field ("keys"), and for each group the values in the second
field are accumulated via the closures in the arguments.
Note: The closures have access to the
context
object.
source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)])
-> fold_keyed(|| 0, |old: &mut u32, val: u32| *old += val)
-> assert_eq([("toy", 3), ("shoe", 46), ("haberdashery", 7)]);
fold_keyed
can be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. With 'tick
, values will only be collected
within the same tick. With 'static
, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'tick
.
fold_keyed
can also be provided with two type arguments, the key type K
and aggregated
output value type V2
. This is required when using 'static
persistence if the compiler
cannot infer the types.
source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)])
-> fold_keyed(|| 0, |old: &mut u32, val: u32| *old += val)
-> assert_eq([("toy", 3), ("shoe", 46), ("haberdashery", 7)]);
Example using 'tick
persistence:
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> fold_keyed::\<'tick, &str, String>(String::new, |old: &mut _, val| {
*old += val;
*old += ", ";
})
-> for_each(|(k, v)| println!("({:?}, {:?})", k, v));
};
input_send.send(("hello", "oakland")).unwrap();
input_send.send(("hello", "berkeley")).unwrap();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_available();
// ("hello", "oakland, berkeley, san francisco, ")
input_send.send(("hello", "palo alto")).unwrap();
flow.run_available();
// ("hello", "palo alto, ")
for_each
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> for_each(A) | exactly 0 | Streaming |
1 input stream, 0 output streams
Arguments: a Rust closure
Iterates through a stream passing each element to the closure in the argument.
Note: The closure has access to the
context
object.
source_iter(vec!["Hello", "World"])
-> for_each(|x| println!("{}", x));
identity
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> identity() -> | exactly 1 | Streaming |
1 input stream of type T, 1 output stream of type T
For each item passed in, pass it out without any change.
source_iter(vec!["hello", "world"])
-> identity()
-> assert_eq(["hello", "world"]);
You can also supply a type parameter identity::\<MyType>()
to specify what items flow through the
the pipeline. This can be useful for helping the compiler infer types.
// Use type parameter to ensure items are `i32`s.
source_iter(0..10)
-> identity::\<i32>()
-> assert_eq([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
initialize
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | initialize() -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: None.
Emits a single unit ()
at the start of the first tick.
initialize()
-> assert_eq([()]);
inspect
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> inspect(A) | at least 0 and at most 1 | Streaming |
Arguments: A single closure
FnMut(&Item)
.
An operator which allows you to "inspect" each element of a stream without modifying it. The closure is called on a reference to each item. This is mainly useful for debugging as in the example below, and it is generally an anti-pattern to provide a closure with side effects.
Note: The closure has access to the
context
object.
source_iter([1, 2, 3, 4])
-> inspect(|x| println!("{}", x))
-> assert_eq([1, 2, 3, 4]);
join
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]join() -> | exactly 1 | Streaming |
Input port names:
0
(streaming),1
(streaming)
2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
Forms the equijoin of the tuples in the input streams by their first (key) attribute. Note that the result nests the 2nd input field (values) into a tuple in the 2nd output field.
source_iter(vec![("hello", "world"), ("stay", "gold"), ("hello", "world")]) -> [0]my_join;
source_iter(vec![("hello", "cleveland")]) -> [1]my_join;
my_join = join()
-> assert_eq([("hello", ("world", "cleveland"))]);
join
can also be provided with one or two generic lifetime persistence arguments, either
'tick
or 'static
, to specify how join data persists. With 'tick
, pairs will only be
joined with corresponding pairs within the same tick. With 'static
, pairs will be remembered
across ticks and will be joined with pairs arriving in later ticks. When not explicitly
specified persistence defaults to `tick.
When two persistence arguments are supplied the first maps to port 0
and the second maps to
port 1
.
When a single persistence argument is supplied, it is applied to both input ports.
When no persistence arguments are applied it defaults to 'tick
for both.
The syntax is as follows:
join(); // Or
join::\<'static>();
join::\<'tick>();
join::\<'static, 'tick>();
join::\<'tick, 'static>();
// etc.
join
is defined to treat its inputs as sets, meaning that it
eliminates duplicated values in its inputs. If you do not want
duplicates eliminated, use the join_multiset
operator.
Examples
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_iter([("hello", "world")]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join = join::\<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
};
input_send.send(("hello", "oakland")).unwrap();
flow.run_tick();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_tick();
Prints out "(hello, (world, oakland))"
since source_iter([("hello", "world")])
is only
included in the first tick, then forgotten.
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_iter([("hello", "world")]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join = join::\<'static>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
};
input_send.send(("hello", "oakland")).unwrap();
flow.run_tick();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_tick();
Prints out "(hello, (world, oakland))"
and "(hello, (world, san francisco))"
since the
inputs are peristed across ticks.
join_fused
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]join_fused(A, B) -> | exactly 1 | Blocking |
Input port names:
0
(blocking),1
(blocking)
2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
join_fused
takes two arguments, they are the configuration options for the left hand side and right hand side inputs respectively.
There are three available configuration options, they are Reduce
: if the input type is the same as the accumulator type,
Fold
: if the input type is different from the accumulator type, and the accumulator type has a sensible default value, and
FoldFrom
: if the input type is different from the accumulator type, and the accumulator needs to be derived from the first input value.
Examples of all three configuration options are below:
// Left hand side input will use fold, right hand side input will use reduce,
join_fused(Fold(|| "default value", |x, y| *x += y), Reduce(|x, y| *x -= y))
// Left hand side input will use FoldFrom, and the right hand side input will use Reduce again
join_fused(FoldFrom(|x| "conversion function", |x, y| *x += y), Reduce(|x, y| *x *= y))
The three currently supported fused operator types are Fold(Fn() -> A, Fn(A, T) -> A)
, Reduce(Fn(A, A) -> A)
, and FoldFrom(Fn(T) -> A, Fn(A, T) -> A)
join_fused
first performs a fold_keyed/reduce_keyed operation on each input stream before performing joining. See join()
. There is currently no equivalent for FoldFrom
in dfir operators.
For example, the following two dfir programs are equivalent, the former would optimize into the latter:
source_iter(vec![("key", 0), ("key", 1), ("key", 2)])
-> reduce_keyed(|x: &mut _, y| *x += y)
-> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)])
-> fold_keyed(|| 1, |x: &mut _, y| *x *= y)
-> [1]my_join;
my_join = join_multiset()
-> assert_eq([("key", (3, 6))]);
source_iter(vec![("key", 0), ("key", 1), ("key", 2)])
-> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)])
-> [1]my_join;
my_join = join_fused(Reduce(|x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (3, 6))]);
Here is an example of using FoldFrom to derive the accumulator from the first value:
source_iter(vec![("key", 0), ("key", 1), ("key", 2)])
-> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)])
-> [1]my_join;
my_join = join_fused(FoldFrom(|x| x + 3, |x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (6, 6))]);
The benefit of this is that the state between the reducing/folding operator and the join is merged together.
join_fused
follows the same persistence rules as join
and all other operators. By default, both the left hand side and right hand side are 'tick
persistence. They can be set to 'static
persistence
by specifying 'static
in the type arguments of the operator.
for join_fused::\<'static>
, the operator will replay all keys that the join has ever seen each tick, and not only the new matches from that specific tick.
This means that it behaves identically to if persist::\<'static>()
were placed before the inputs and the persistence of
for example, the two following examples have identical behavior:
source_iter(vec![("key", 0), ("key", 1), ("key", 2)]) -> persist::\<'static>() -> [0]my_join;
source_iter(vec![("key", 2)]) -> my_union;
source_iter(vec![("key", 3)]) -> defer_tick() -> my_union;
my_union = union() -> persist::\<'static>() -> [1]my_join;
my_join = join_fused(Reduce(|x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (3, 2)), ("key", (3, 6))]);
source_iter(vec![("key", 0), ("key", 1), ("key", 2)]) -> [0]my_join;
source_iter(vec![("key", 2)]) -> my_union;
source_iter(vec![("key", 3)]) -> defer_tick() -> my_union;
my_union = union() -> [1]my_join;
my_join = join_fused::\<'static>(Reduce(|x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (3, 2)), ("key", (3, 6))]);
join_fused_lhs
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]join_fused_lhs(A) -> | exactly 1 | Blocking |
Input port names:
0
(blocking),1
(streaming)
See join_fused
This operator is identical to join_fused
except that the right hand side input 1
is a regular join_multiset
input.
This means that join_fused_lhs
only takes one argument input, which is the reducing/folding operation for the left hand side only.
For example:
source_iter(vec![("key", 0), ("key", 1), ("key", 2)]) -> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)]) -> [1]my_join;
my_join = join_fused_lhs(Reduce(|x, y| *x += y))
-> assert_eq([("key", (3, 2)), ("key", (3, 3))]);
join_fused_rhs
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]join_fused_rhs(A) -> | exactly 1 | Blocking |
Input port names:
0
(streaming),1
(blocking)
See join_fused_lhs
This operator is identical to join_fused_lhs
except that it is the right hand side that is fused instead of the left hand side.
join_multiset
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]join_multiset() -> | exactly 1 | Streaming |
Input port names:
0
(streaming),1
(streaming)
2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
This operator is equivalent to join
except that the LHS and RHS are collected into multisets rather than sets before joining.
If you want
duplicates eliminated from the inputs, use the join
operator.
For example:
lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
rhs = source_iter([("a", "hydro")]) -> tee();
lhs -> [0]multiset_join;
rhs -> [1]multiset_join;
multiset_join = join_multiset() -> assert_eq([("a", (0, "hydro")), ("a", (0, "hydro"))]);
lhs -> [0]set_join;
rhs -> [1]set_join;
set_join = join() -> assert_eq([("a", (0, "hydro"))]);
lattice_bimorphism
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]lattice_bimorphism(A, B, C) -> | exactly 1 | Streaming |
Input port names:
0
(streaming),1
(streaming)
An operator representing a lattice bimorphism.
2 input streams, of type
LhsItem
andRhsItem
.
Three argument, one
LatticeBimorphism
functionFunc
, anLhsState
singleton reference, and anRhsState
singleton reference.
1 output stream of the output type of the
LatticeBimorphism
function.
The function must be a lattice bimorphism for both (LhsState, RhsItem)
and (RhsState, LhsItem)
.
use std::collections::HashSet;
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
lhs = source_iter(0..3)
-> map(SetUnionSingletonSet::new_from)
-> state::\<'static, SetUnionHashSet\<u32>>();
rhs = source_iter(3..5)
-> map(SetUnionSingletonSet::new_from)
-> state::\<'static, SetUnionHashSet\<u32>>();
lhs -> [0]my_join;
rhs -> [1]my_join;
my_join = lattice_bimorphism(CartesianProductBimorphism::\<HashSet\<_>>::default(), #lhs, #rhs)
-> assert_eq([SetUnionHashSet::new(HashSet::from_iter([
(0, 3),
(0, 4),
(1, 3),
(1, 4),
(2, 3),
(2, 4),
]))]);
lattice_fold
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> lattice_fold(A) -> | exactly 1 | Blocking |
1 input stream, 1 output stream
A specialized operator for merging lattices together into a accumulated value. Like fold()
but specialized for lattice types. lattice_fold(MyLattice::default)
is equivalent to fold(MyLattice::default, dfir_rs::lattices::Merge::merge)
.
lattice_fold
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. With 'tick
, values will only be collected
within the same tick. With 'static
, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'tick
.
lattice_fold
is differentiated from lattice_reduce
in that lattice_fold
can accumulate into a different type from its input.
But it also means that the accumulating type must have a sensible default value
use dfir_rs::lattices::set_union::SetUnionSingletonSet;
use dfir_rs::lattices::set_union::SetUnionHashSet;
source_iter([SetUnionSingletonSet::new_from(7)])
-> lattice_fold(SetUnionHashSet::\<usize>::default)
-> assert_eq([SetUnionHashSet::new_from([7])]);
lattice_reduce
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> lattice_reduce() -> | exactly 1 | Blocking |
1 input stream, 1 output stream
A specialized operator for merging lattices together into an accumulated value. Like reduce()
but specialized for lattice types. lattice_reduce()
is equivalent to reduce(dfir_rs::lattices::Merge::merge)
.
lattice_reduce
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. With 'tick
, values will only be collected
within the same tick. With 'static
, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'tick
.
lattice_reduce
is differentiated from lattice_fold
in that lattice_reduce
does not require the accumulating type to have a sensible default value.
But it also means that the accumulating function inputs and the accumulating type must be the same.
use dfir_rs::lattices::Max;
source_iter([1, 2, 3, 4, 5])
-> map(Max::new)
-> lattice_reduce()
-> assert_eq([Max::new(5)]);
map
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> map(A) -> | exactly 1 | Streaming |
1 input stream, 1 output stream
Arguments: A Rust closure
For each item passed in, apply the closure to generate an item to emit.
If you do not want to modify the item stream and instead only want to view
each item use the inspect
operator instead.
Note: The closure has access to the
context
object.
source_iter(vec!["hello", "world"]) -> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD"]);
multiset_delta
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> multiset_delta() -> | exactly 1 | Streaming |
Multiset delta from the previous tick.
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<u32>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> multiset_delta()
-> for_each(|n| println!("{}", n));
};
input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
flow.run_tick();
// 3, 4,
input_send.send(3).unwrap();
input_send.send(5).unwrap();
input_send.send(3).unwrap();
input_send.send(3).unwrap();
flow.run_tick();
// 5, 3
// First two "3"s are removed due to previous tick.
next_stratum
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> next_stratum() -> | exactly 1 | Blocking |
Delays all elements which pass through to the next stratum (in the same tick).
You can also supply a type parameter next_stratum::\<MyType>()
to specify what items flow
through the the pipeline. This can be useful for helping the compiler infer types.
null
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
at least 0 and at most 1 | null() | at least 0 and at most 1 | Streaming |
unbounded number of input streams of any type, unbounded number of output streams of any type.
As a source, generates nothing. As a sink, absorbs anything with no effect.
// should print `1, 2, 3, 4, 5, 6, a, b, c` across 9 lines
null() -> for_each(|_: ()| panic!());
source_iter([1,2,3]) -> map(|i| println!("{}", i)) -> null();
null_src = null();
null_sink = null();
null_src[0] -> for_each(|_: ()| panic!());
// note: use `for_each()` (or `inspect()`) instead of this:
source_iter([4,5,6]) -> map(|i| println!("{}", i)) -> [0]null_sink;
partition
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> partition(A)[<output_port>] -> | at least 2 | Streaming |
Output port names: Variadic, as specified in arguments.
This operator takes the input pipeline and allows the user to determine which singular output pipeline each item should be delivered to.
Arguments: A Rust closure, the first argument is a reference to the item and the second argument corresponds to one of two modes, either named or indexed.
Note: The closure has access to the
context
object.
Named mode
With named ports, the closure's second argument must be a Rust 'slice pattern' of names, such as
[port_a, port_b, port_c]
, where each name is an output port. The closure should return the
name of the desired output port.
my_partition = source_iter(1..=100) -> partition(|val: &usize, [fzbz, fizz, buzz, rest]|
match (val % 3, val % 5) {
(0, 0) => fzbz,
(0, _) => fizz,
(_, 0) => buzz,
(_, _) => rest,
}
);
my_partition[fzbz] -> for_each(|v| println!("{}: fizzbuzz", v));
my_partition[fizz] -> for_each(|v| println!("{}: fizz", v));
my_partition[buzz] -> for_each(|v| println!("{}: buzz", v));
my_partition[rest] -> for_each(|v| println!("{}", v));
Indexed mode
With indexed mode, the closure's second argument is a the number of output ports. This is a single usize value, useful for e.g. round robin partitioning. Each output pipeline port must be numbered with an index, starting from zero and with no gaps. The closure returns the index of the desired output port.
my_partition = source_iter(1..=100) -> partition(|val, num_outputs| val % num_outputs);
my_partition[0] -> for_each(|v| println!("0: {}", v));
my_partition[1] -> for_each(|v| println!("1: {}", v));
my_partition[2] -> for_each(|v| println!("2: {}", v));
persist
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> persist() -> | exactly 1 | Streaming |
Stores each item as it passes through, and replays all item every tick.
// Normally `source_iter(...)` only emits once, but `persist::\<'static>()` will replay the `"hello"`
// on every tick.
source_iter(["hello"])
-> persist::\<'static>()
-> assert_eq(["hello"]);
persist()
can be used to introduce statefulness into stateless pipelines. In the example below, the
join only stores data for single tick. The persist::\<'static>()
operator introduces statefulness
across ticks. This can be useful for optimization transformations within the dfir
compiler. Equivalently, we could specify that the join has static
persistence (my_join = join::\<'static>()
).
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_iter([("hello", "world")]) -> persist::\<'static>() -> [0]my_join;
source_stream(input_recv) -> persist::\<'static>() -> [1]my_join;
my_join = join::\<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
};
input_send.send(("hello", "oakland")).unwrap();
flow.run_tick();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_tick();
// (hello, (world, oakland))
// (hello, (world, oakland))
// (hello, (world, san francisco))
persist_mut
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> persist_mut() -> | exactly 1 | Blocking |
persist_mut()
is similar to persist()
except that it also enables deletions.
persist_mut()
expects an input of type Persistence\<T>
,
and it is this enumeration that enables the user to communicate deletion.
Deletions/persists happen in the order they are received in the stream.
For example, [Persist(1), Delete(1), Persist(1)]
will result in a a single 1
value being stored.
use dfir_rs::util::Persistence;
source_iter([
Persistence::Persist(1),
Persistence::Persist(2),
Persistence::Delete(1),
])
-> persist_mut::\<'static>()
-> assert_eq([2]);
persist_mut_keyed
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> persist_mut_keyed() -> | exactly 1 | Blocking |
persist_mut_keyed()
is similar to persist_mut()
except that it also enables key-based deletions
persist_mut()
expects an input of type PersistenceKeyed\<T>
,
and it is this enumeration that enables the user to communicate deletion.
Deletions/persists happen in the order they are received in the stream.
For example, [Persist(1), Delete(1), Persist(1)]
will result in a a single 1
value being stored.
use dfir_rs::util::PersistenceKeyed;
source_iter([
PersistenceKeyed::Persist(0, 1),
PersistenceKeyed::Persist(1, 1),
PersistenceKeyed::Delete(1),
])
-> persist_mut_keyed::\<'static>()
-> assert_eq([(0, 1)]);
py_udf
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> py_udf(A, B) -> | exactly 1 | Streaming |
Arguments: First, the source code for a python module, second, the name of a unary function defined within the module source code.
Requires the "python" feature to be enabled.
An operator which allows you to run a python udf. Input arguments must be a stream of tuples
whose items implement IntoPy
.
See the relevant pyo3 docs here.
Output items are of type PyResult\<Py\<PyAny>>
. Rust native types can be extracted using
.extract()
, see the relevant pyo3 docs here
or the examples below.
use pyo3::prelude::*;
source_iter(0..10)
-> map(|x| (x,))
-> py_udf("
def fib(n):
if n \< 2:
return n
else:
return fib(n - 2) + fib(n - 1)
", "fib")
-> map(|x: PyResult\<Py\<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert_eq([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
use pyo3::prelude::*;
source_iter([(5,1)])
-> py_udf("
def add(a, b):
return a + b
", "add")
-> map(|x: PyResult\<Py\<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert_eq([6]);
reduce
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> reduce(A) | at least 0 and at most 1 | Blocking |
1 input stream, 1 output stream
Arguments: a closure which itself takes two arguments: an
&mut Accum
accumulator mutable reference, and anItem
. The closure should merge the item into the accumulator.
Akin to Rust's built-in reduce
operator, except that it takes the accumulator by &mut
instead of by value. Reduces every
item into an accumulator by applying a closure, returning the final result.
Note: The closure has access to the
context
object.
reduce
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. With 'tick
, values will only be collected
within the same tick. With 'static
, the accumulated value will be remembered across ticks and
items are aggregated with items arriving in later ticks. When not explicitly specified
persistence defaults to 'tick
.
source_iter([1,2,3,4,5])
-> reduce::\<'tick>(|accum: &mut _, elem| {
*accum *= elem;
})
-> assert_eq([120]);
reduce_keyed
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> reduce_keyed(A) -> | exactly 1 | Blocking |
1 input stream of type
(K, V)
, 1 output stream of type(K, V)
. The output will have one tuple for each distinctK
, with an accumulated (reduced) value of typeV
.
If you need the accumulated value to have a different type than the input, use fold_keyed
.
Arguments: one Rust closures. The closure takes two arguments: an
&mut
'accumulator', and an element. Accumulator should be updated based on the element.
A special case of reduce
, in the spirit of SQL's GROUP BY and aggregation constructs. The input
is partitioned into groups by the first field, and for each group the values in the second
field are accumulated via the closures in the arguments.
Note: The closures have access to the
context
object.
reduce_keyed
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. With 'tick
, values will only be collected
within the same tick. With 'static
, values will be remembered across ticks and will be
aggregated with pairs arriving in later ticks. When not explicitly specified persistence
defaults to 'tick
.
reduce_keyed
can also be provided with two type arguments, the key and value type. This is
required when using 'static
persistence if the compiler cannot infer the types.
source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)])
-> reduce_keyed(|old: &mut u32, val: u32| *old += val)
-> assert_eq([("toy", 3), ("shoe", 46), ("haberdashery", 7)]);
Example using 'tick
persistence and type arguments:
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> reduce_keyed::\<'tick, &str>(|old: &mut _, val| *old = std::cmp::max(*old, val))
-> for_each(|(k, v)| println!("({:?}, {:?})", k, v));
};
input_send.send(("hello", "oakland")).unwrap();
input_send.send(("hello", "berkeley")).unwrap();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_available();
// ("hello", "oakland, berkeley, san francisco, ")
input_send.send(("hello", "palo alto")).unwrap();
flow.run_available();
// ("hello", "palo alto, ")
sort
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> sort() -> | exactly 1 | Blocking |
Takes a stream as input and produces a sorted version of the stream as output.
source_iter(vec![2, 3, 1])
-> sort()
-> assert_eq([1, 2, 3]);
sort
is blocking. Only the values collected within a single tick will be sorted and
emitted.
sort_by_key
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> sort_by_key(A) -> | exactly 1 | Blocking |
Like sort, takes a stream as input and produces a version of the stream as output. This operator sorts according to the key extracted by the closure.
Note: The closure has access to the
context
object.
source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')])
-> sort_by_key(|(k, _v)| k)
-> assert_eq([(1, 'z'), (2, 'y'), (3, 'x')]);
source_file
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_file(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Reads the referenced file one line at a time. The line will NOT include the line ending.
Will panic if the file could not be read, or if the file contains bytes that are not valid UTF-8.
source_file("Cargo.toml") -> for_each(|line| println!("{}", line));
source_interval
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_interval(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: A
Duration
for this interval.
Emits units ()
on a repeated interval. The first tick completes immediately. Missed ticks will
be scheduled as soon as possible.
Note that this requires the dfir instance be run within a Tokio Runtime
.
The easiest way to do this is with a #[dfir_rs::main]
annotation on async fn main() { ... }
as in the example below.
use std::time::Duration;
use std::time::Instant;
use dfir_rs::dfir_syntax;
#[dfir_rs::main]
async fn main() {
let mut hf = dfir_syntax! {
source_interval(Duration::from_secs(1))
-> map(|_| { Instant::now() } )
-> for_each(|time| println!("This runs every second: {:?}", time));
};
// Will print 4 times (fencepost counting).
tokio::time::timeout(Duration::from_secs_f32(3.5), hf.run_async())
.await
.expect_err("Expected time out");
// Example output:
// This runs every second: Instant { t: 27471.704813s }
// This runs every second: Instant { t: 27472.704813s }
// This runs every second: Instant { t: 27473.704813s }
// This runs every second: Instant { t: 27474.704813s }
}
source_iter
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_iter(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: An iterable Rust object.
Takes the iterable object and delivers its elements downstream one by one.
Note that all elements are emitted during the first tick.
source_iter(vec!["Hello", "World"])
-> for_each(|x| println!("{}", x));
source_json
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_json(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: An
AsRef
\<
Path
>
for a file to read as json. This will emit the parsed value one time.
source_json
may take one generic type argument, the type of the value to be parsed, which must implement Deserialize
.
source_json("example.json") -> for_each(|json: dfir_rs::serde_json::Value| println!("{:#?}", json));
source_stdin
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_stdin() -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: port number
source_stdin
receives a Stream of lines from stdin
and emits each of the elements it receives downstream.
source_stdin()
-> map(|x| x.unwrap().to_uppercase())
-> for_each(|x| println!("{}", x));
source_stream
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_stream(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments: The receive end of a tokio channel
Given a Stream
created in Rust code, source_stream
is passed the receive endpoint of the channel and emits each of the
elements it receives downstream.
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<&str>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv) -> map(|x| x.to_uppercase())
-> for_each(|x| println!("{}", x));
};
input_send.send("Hello").unwrap();
input_send.send("World").unwrap();
flow.run_available();
source_stream_serde
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | source_stream_serde(A) -> | exactly 1 | Streaming |
0 input streams, 1 output stream
Arguments:
Stream
Given a Stream
of (serialized payload, addr)
pairs, deserializes the payload and emits each of the
elements it receives downstream.
async fn serde_in() {
let addr = dfir_rs::util::ipv4_resolve("localhost:9000".into()).unwrap();
let (outbound, inbound, _) = dfir_rs::util::bind_udp_bytes(addr).await;
let mut flow = dfir_rs::dfir_syntax! {
source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(x, a): (String, std::net::SocketAddr)| x.to_uppercase())
-> for_each(|x| println!("{}", x));
};
flow.run_available();
}
spin
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 0 | spin() -> | exactly 1 | Streaming |
This operator emits Unit, and triggers the start of a new tick at the end of each tick,
which will cause spinning-like behavior. Note that run_available
will run forever,
so in the example below we illustrate running manually for 100 ticks.
let mut flow = dfir_rs::dfir_syntax! {
spin() -> for_each(|x| println!("tick {}: {:?}", context.current_tick(), x));
};
for _ in 1..100 {
flow.run_tick();
}
state
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> state() | at least 0 and at most 1 | Streaming |
A lattice-based state operator, used for accumulating lattice state
Emits both a referenceable singleton and (optionally) a pass-through stream. In the future the pass-through stream may be deduplicated.
use std::collections::HashSet;
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
my_state = source_iter(0..3)
-> map(SetUnionSingletonSet::new_from)
-> state::\<SetUnionHashSet\<usize>>();
state_by
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> state_by(A) | at least 0 and at most 1 | Streaming |
List state operator, but with a closure to map the input to the state lattice.
The emitted outputs (both the referencable singleton and the optional pass-through stream) are of the same type as the inputs to the state_by operator and are not required to be a lattice type. This is useful receiving pass-through context information on the output side.
use std::collections::HashSet;
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
my_state = source_iter(0..3)
-> state_by::\<SetUnionHashSet\<usize>>(SetUnionSingletonSet::new_from);
tee
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> tee()[<output_port>] -> | at least 2 | Streaming |
1 input stream, n output streams
Takes the input stream and delivers a copy of each item to each output.
Note: Downstream operators may need explicit type annotations.
my_tee = source_iter(vec!["Hello", "World"]) -> tee();
my_tee -> map(|x: &str| x.to_uppercase()) -> assert_eq(["HELLO", "WORLD"]);
my_tee -> map(|x: &str| x.to_lowercase()) -> assert_eq(["hello", "world"]);
my_tee -> assert_eq(["Hello", "World"]);
union
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
at least 2 | -> [<input_port>]union() -> | exactly 1 | Streaming |
n input streams of the same type, 1 output stream of the same type
Unions an arbitrary number of input streams into a single stream. Each input sequence is a subsequence of the output, but no guarantee is given on how the inputs are interleaved.
Since union
has multiple input streams, it needs to be assigned to
a variable to reference its multiple input ports across statements.
source_iter(vec!["hello", "world"]) -> my_union;
source_iter(vec!["stay", "gold"]) -> my_union;
source_iter(vec!["don't", "give", "up"]) -> my_union;
my_union = union()
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY", "GOLD", "DON'T", "GIVE", "UP"]);
unique
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> unique() -> | exactly 1 | Streaming |
Takes one stream as input and filters out any duplicate occurrences. The output contains all unique values from the input.
source_iter(vec![1, 1, 2, 3, 2, 1, 3])
-> unique()
-> assert_eq([1, 2, 3]);
unique
can also be provided with one generic lifetime persistence argument, either
'tick
or 'static
, to specify how data persists. The default is 'tick
.
With 'tick
, uniqueness is only considered within the current tick, so across multiple ticks
duplicate values may be emitted.
With 'static
, values will be remembered across ticks and no duplicates will ever be emitted.
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<usize>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> unique::\<'tick>()
-> for_each(|n| println!("{}", n));
};
input_send.send(3).unwrap();
input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
flow.run_available();
// 3, 4
input_send.send(3).unwrap();
input_send.send(5).unwrap();
flow.run_available();
// 3, 5
// Note: 3 is emitted again.
unzip
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 1 | -> unzip()[<output_port>] -> | exactly 2 | Streaming |
Output port names:
0
,1
1 input stream of pair tuples
(A, B)
, 2 output streams
Takes the input stream of pairs and unzips each one, delivers each item to its corresponding side.
my_unzip = source_iter(vec![("Hello", "Foo"), ("World", "Bar")]) -> unzip();
my_unzip[0] -> assert_eq(["Hello", "World"]);
my_unzip[1] -> assert_eq(["Foo", "Bar"]);
zip
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]zip() -> | exactly 1 | Streaming |
Input port names:
0
(streaming),1
(streaming)
2 input streams of type
V1
andV2
, 1 output stream of type(V1, V2)
Zips the streams together, forming paired tuples of the inputs. Note that zipping is done
per-tick. Excess items from one input or the other will be discarded. If you do not want to
discard the excess, use zip_longest
instead.
source_iter(0..3) -> [0]my_zip;
source_iter(0..5) -> [1]my_zip;
my_zip = zip() -> assert_eq([(0, 0), (1, 1), (2, 2)]);
zip_longest
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]zip_longest() -> | exactly 1 | Blocking |
Input port names:
0
(blocking),1
(blocking)
2 input streams of type
V1
andV2
, 1 output stream of typeitertools::EitherOrBoth\<V1, V2>
Zips the streams together, forming paired tuples of the inputs. Note that zipping is done
per-tick. Excess items are returned as EitherOrBoth::Left(V1)
or EitherOrBoth::Right(V2)
.
If you intead want to discard the excess, use zip
instead.
source_iter(0..2) -> [0]my_zip_longest;
source_iter(0..3) -> [1]my_zip_longest;
my_zip_longest = zip_longest()
-> assert_eq([
itertools::EitherOrBoth::Both(0, 0),
itertools::EitherOrBoth::Both(1, 1),
itertools::EitherOrBoth::Right(2)]);
_lattice_fold_batch
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]_lattice_fold_batch() -> | exactly 1 | Blocking |
Input port names:
input
(blocking),signal
(blocking)
2 input streams, 1 output stream, no arguments.
Batches streaming input and releases it downstream when a signal is delivered. This allows for buffering data and delivering it later while also folding it into a single lattice data structure.
This operator is similar to defer_signal
in that it batches input and releases it when a signal is given. It is also similar to lattice_fold
in that it folds the input into a single lattice.
So, _lattice_fold_batch
is a combination of both defer_signal
and lattice_fold
. This operator is useful when trying to combine a sequence of defer_signal
and lattice_fold
operators without unnecessary memory consumption.
There are two inputs to _lattice_fold_batch
, they are input
and signal
.
input
is the input data flow. Data that is delivered on this input is collected in order inside of the _lattice_fold_batch
operator.
When anything is sent to signal
the collected data is released downstream. The entire signal
input is consumed each tick, so sending 5 things on signal
will not release inputs on the next 5 consecutive ticks.
use lattices::set_union::SetUnionHashSet;
use lattices::set_union::SetUnionSingletonSet;
source_iter([1, 2, 3])
-> map(SetUnionSingletonSet::new_from)
-> [input]batcher;
source_iter([()])
-> [signal]batcher;
batcher = _lattice_fold_batch::\<SetUnionHashSet\<usize>>()
-> assert_eq([SetUnionHashSet::new_from([1, 2, 3])]);
_lattice_join_fused_join
Inputs | Syntax | Outputs | Flow |
---|---|---|---|
exactly 2 | -> [<input_port>]_lattice_join_fused_join() -> | exactly 1 | Blocking |
Input port names:
0
(blocking),1
(blocking)
2 input streams of type
(K, V1)
and(K, V2)
, 1 output stream of type(K, (V1', V2'))
whereV1
,V2
,V1'
,V2'
are lattice types
Performs a fold_keyed
with lattice-merge aggregate function on each input and then forms the
equijoin of the resulting key/value pairs in the input streams by their first (key) attribute.
Unlike join
, the result is not a stream of tuples, it's a stream of MapUnionSingletonMap
lattices. You can (non-monotonically) "reveal" these as tuples if desired via map
; see the examples below.
You must specify the the accumulating lattice types, they cannot be inferred. The first type argument corresponds to the [0]
input of the join, and the second to the [1]
input.
Type arguments are specified in dfir using the rust turbofish syntax ::\<>
, for example _lattice_join_fused_join::\<Min\<_>, Max\<_>>()
The accumulating lattice type is not necessarily the same type as the input, see the below example involving SetUnion for such a case.
Like join
, _lattice_join_fused_join
can also be provided with one or two generic lifetime persistence arguments, either
'tick
or 'static
, to specify how join data persists. With 'tick
, pairs will only be
joined with corresponding pairs within the same tick. With 'static
, pairs will be remembered
across ticks and will be joined with pairs arriving in later ticks. When not explicitly
specified persistence defaults to `tick.
Like join
, when two persistence arguments are supplied the first maps to port 0
and the second maps to
port 1
.
When a single persistence argument is supplied, it is applied to both input ports.
When no persistence arguments are applied it defaults to 'tick
for both.
It is important to specify all persistence arguments before any type arguments, otherwise the persistence arguments will be ignored.
The syntax is as follows:
_lattice_join_fused_join::\<MaxRepr\<usize>, MaxRepr\<usize>>(); // Or
_lattice_join_fused_join::\<'static, MaxRepr\<usize>, MaxRepr\<usize>>();
_lattice_join_fused_join::\<'tick, MaxRepr\<usize>, MaxRepr\<usize>>();
_lattice_join_fused_join::\<'static, 'tick, MaxRepr\<usize>, MaxRepr\<usize>>();
_lattice_join_fused_join::\<'tick, 'static, MaxRepr\<usize>, MaxRepr\<usize>>();
// etc.
Examples
use dfir_rs::lattices::Min;
use dfir_rs::lattices::Max;
source_iter([("key", Min::new(1)), ("key", Min::new(2))]) -> [0]my_join;
source_iter([("key", Max::new(1)), ("key", Max::new(2))]) -> [1]my_join;
my_join = _lattice_join_fused_join::\<'tick, Min\<usize>, Max\<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (Min::new(1), Max::new(2)))]);
use dfir_rs::lattices::set_union::SetUnionSingletonSet;
use dfir_rs::lattices::set_union::SetUnionHashSet;
source_iter([("key", SetUnionSingletonSet::new_from(0)), ("key", SetUnionSingletonSet::new_from(1))]) -> [0]my_join;
source_iter([("key", SetUnionHashSet::new_from([0])), ("key", SetUnionHashSet::new_from([1]))]) -> [1]my_join;
my_join = _lattice_join_fused_join::\<'tick, SetUnionHashSet\<usize>, SetUnionHashSet\<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (SetUnionHashSet::new_from([0, 1]), SetUnionHashSet::new_from([0, 1])))]);