Skip to main content

Hydroflow's Operators

In our previous examples we made use of some of Hydroflow's operators. Here we document each operator in more detail. Most of these operators are based on the Rust equivalents for iterators; see the Rust documentation.

Maps: enumerate, identity, inspect, map, py_udf
Simple one-in-one-out operators.

Filters: filter, filter_map
One-in zero-or-one-out operators.

Flattens: flat_map, flatten
One-in multiple-out operators.

Folds: all_once, batch, fold, reduce
Operators which accumulate elements together.

Keyed Folds: fold_keyed, reduce_keyed
Keyed fold operators.

Lattice Folds: lattice_fold, lattice_reduce
Folds based on lattice-merge.

Persistent Operators: multiset_delta, defer_signal, persist, persist_mut, persist_mut_keyed, sort, sort_by_key, state, state_by, unique
Persistent (stateful) operators.

Multi-Input Operators: anti_join, anti_join_multiset, chain, cross_join, cross_join_multiset, cross_singleton, difference, difference_multiset, join, join_fused, join_fused_lhs, join_fused_rhs, join_multiset, lattice_bimorphism, union, zip, zip_longest
Operators with multiple inputs.

Multi-Output Operators: demux, demux_enum, partition, tee, unzip
Operators with multiple outputs.

Sources: initialize, null, spin, source_file, source_interval, source_iter, source_json, source_stdin, source_stream, source_stream_serde
Operators which produce output elements (and consume no inputs).

Sinks: dest_file, dest_sink, dest_sink_serde, for_each, null
Operators which consume input elements (and produce no outputs).

Control Flow Operators: assert, assert_eq, next_stratum, defer_tick, defer_tick_lazy
Operators which affect control flow/scheduling.

Compiler Fusion Operators: _lattice_fold_batch, _lattice_join_fused_join
Operators which are necessary to implement certain optimizations and rewrite rules

Windowing Operator: all_once, batch
Operators for windowing loop inputs.


all_once

InputsSyntaxOutputsFlow
exactly 1-> all_once()at least 0 and at most 1Blocking

TODO(mingwei): docs

anti_join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]anti_join() ->exactly 1Blocking

Input port names: pos (streaming), neg (blocking)

2 input streams the first of type (K, T), the second of type K, with output type (K, T)

For a given tick, computes the anti-join of the items in the input streams, returning unique items in the pos input that do not have matching keys in the neg input. Note this is set semantics only for the neg element. Order is preserved for new elements in a given tick, but not for elements processed in a previous tick with 'static.

source_iter(vec![("dog", 1), ("cat", 2), ("elephant", 3)]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = anti_join() -> assert_eq([("elephant", 3)]);

anti_join_multiset

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]anti_join_multiset() ->exactly 1Blocking

Input port names: pos (streaming), neg (blocking)

2 input streams the first of type (K, T), the second of type K, with output type (K, T)

For a given tick, computes the anti-join of the items in the input streams, returning items in the pos input --that do not have matching keys in the neg input. NOTE this uses multiset semantics on the positive side, so duplicated positive inputs will appear in the output either 0 times (if matched in neg) or as many times as they appear in the input (if not matched in neg)

source_iter(vec![("cat", 2), ("cat", 2), ("elephant", 3), ("elephant", 3)]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = anti_join_multiset() -> assert_eq([("elephant", 3), ("elephant", 3)]);

assert

InputsSyntaxOutputsFlow
exactly 1-> assert(A)at least 0 and at most 1Streaming

1 input stream, 1 optional output stream Arguments: a predicate function that will be applied to each item in the stream

If the predicate returns false for any input item then the operator will panic at runtime.

source_iter([1, 2, 3])
-> assert(|x| *x > 0)
-> assert_eq([1, 2, 3]);

assert_eq

InputsSyntaxOutputsFlow
exactly 1-> assert_eq(A)at least 0 and at most 1Streaming

1 input stream, 1 optional output stream Arguments: A Vector, Slice, or Array containing objects that will be compared to the input stream.

The input stream will be compared with the provided argument, element by element. If any elements do not match, assert_eq will panic. If the input stream produces more elements than are in the provided argument, assert_eq will panic.

The input stream is passed through assert_eq unchanged to the output stream.

assert_eq is mainly useful for testing and documenting the behavior of dfir code inline.

assert_eq will remember the stream position across ticks, see example.

unioned = union();

source_iter([1]) -> assert_eq([1]) -> unioned;
source_iter([2]) -> defer_tick() -> assert_eq([2]) -> unioned;

unioned -> assert_eq([1, 2]);

batch

InputsSyntaxOutputsFlow
exactly 1-> batch()at least 0 and at most 1Streaming

TODO(mingwei): docs

chain

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]chain() ->exactly 1Streaming

2 input streams of the same type, 1 output stream of the same type

Chains together a pair of streams, with all the elements of the first emitted before the second.

Since chain has multiple input streams, it needs to be assigned to a variable to reference its multiple input ports across statements.

source_iter(vec!["hello", "world"]) -> [0]my_chain;
source_iter(vec!["stay", "gold"]) -> [1]my_chain;
my_chain = chain()
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY", "GOLD"]);

cross_join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]cross_join() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type S and T, 1 output stream of type (S, T)

Forms the cross-join (Cartesian product) of the items in the input streams, returning all tupled pairs.

source_iter(vec!["happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat"]) -> [1]my_join;
my_join = cross_join() -> assert_eq([("happy", "dog"), ("sad", "dog"), ("happy", "cat"), ("sad", "cat")]);

cross_join can be provided with one or two generic lifetime persistence arguments in the same way as join, see join's documentation for more info.

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<&str>();
let mut flow = dfir_rs::dfir_syntax! {
my_join = cross_join::\<'tick>();
source_iter(["hello", "bye"]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join -> for_each(|(s, t)| println!("({}, {})", s, t));
};
input_send.send("oakland").unwrap();
flow.run_tick();
input_send.send("san francisco").unwrap();
flow.run_tick();

Prints only "(hello, oakland)" and "(bye, oakland)". The source_iter is only included in the first tick, then forgotten, so when "san francisco" arrives on input [1] in the second tick, there is nothing for it to match with from input [0], and therefore it does appear in the output.

cross_join_multiset

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]cross_join_multiset() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type S and T, 1 output stream of type (S, T)

Forms the multiset cross-join (Cartesian product) of the (possibly duplicated) items in the input streams, returning all tupled pairs regardless of duplicates.

source_iter(vec!["happy", "happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat", "cat"]) -> [1]my_join;
my_join = cross_join_multiset() -> sort() -> assert_eq([
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "dog"),
("happy", "dog"),
("sad", "cat"),
("sad", "cat"),
("sad", "dog"), ]);

cross_singleton

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]cross_singleton() ->exactly 1Blocking

Input port names: input (streaming), single (blocking)

2 input streams, 1 output stream, no arguments.

Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting ignoring everything after the first element. This operator blocks on the singleton input, and then joins it with all the elements in the other stream if an element is present. This operator is useful when a singleton input must be used to transform elements of a stream, since unlike cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional branches, since the operator short circuits if the singleton input produces no values.

There are two inputs to cross_singleton, they are input and single. input is the input data flow, and single is the singleton input.

join = cross_singleton();

source_iter([1, 2, 3]) -> [input]join;
source_iter([0]) -> [single]join;

join -> assert_eq([(1, 0), (2, 0), (3, 0)]);

defer_signal

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]defer_signal() ->exactly 1Blocking

Input port names: input (blocking), signal (blocking)

2 input streams, 1 output stream, no arguments.

Defers streaming input and releases it downstream when a signal is delivered. The order of input is preserved. This allows for buffering data and delivering it at a later, chosen, tick.

There are two inputs to defer_signal, they are input and signal. input is the input data flow. Data that is delivered on this input is collected in order inside of the defer_signal operator. When anything is sent to signal the collected data is released downstream. The entire signal input is consumed each tick, so sending 5 things on signal will not release inputs on the next 5 consecutive ticks.

gate = defer_signal();

source_iter([1, 2, 3]) -> [input]gate;
source_iter([()]) -> [signal]gate;

gate -> assert_eq([1, 2, 3]);

defer_tick

InputsSyntaxOutputsFlow
exactly 1-> defer_tick() ->exactly 1Blocking

Buffers all input items and releases them in the next tick. the state of the current tick. For example, See the book discussion of Hydroflow time for details on ticks. A tick may be divided into multiple strata; see the next_stratum() operator.

defer_tick is sometimes needed to separate conflicting data across time, in order to preserve invariants. Consider the following example, which implements a flip-flop -- the invariant is that it emit one of true or false in a given tick (but never both!)

pub fn main() {
let mut df = dfir_rs::dfir_syntax! {
source_iter(vec!(true))
-> state;
state = union()
-> assert(|x| if context.current_tick().0 % 2 == 0 { *x == true } else { *x == false })
-> map(|x| !x)
-> defer_tick()
-> state;
};
for i in 1..100 {
println!("tick {}", i);
df.run_tick();
}
}

defer_tick can also be handy for comparing stream content across ticks. In the example below defer_tick() is used alongside difference() to filter out any items that arrive from inp in the current tick which match an item from inp in the previous tick.

// Outputs 1 2 3 4 5 6 (on separate lines).
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<usize>();
let mut flow = dfir_rs::dfir_syntax! {
inp = source_stream(input_recv) -> tee();
inp -> [pos]diff;
inp -> defer_tick() -> [neg]diff;
diff = difference() -> for_each(|x| println!("{}", x));
};

for x in [1, 2, 3, 4] {
input_send.send(x).unwrap();
}
flow.run_tick();

for x in [3, 4, 5, 6] {
input_send.send(x).unwrap();
}
flow.run_tick();

You can also supply a type parameter defer_tick::\<MyType>() to specify what items flow through the the pipeline. This can be useful for helping the compiler infer types.

defer_tick_lazy

InputsSyntaxOutputsFlow
exactly 1-> defer_tick_lazy() ->exactly 1Blocking

See defer_tick This operator is identical to defer_tick except that it does not eagerly cause a new tick to be scheduled.

demux

InputsSyntaxOutputsFlow
exactly 1-> demux(A)[<output_port>] ->at least 2Streaming

Output port names: Variadic, as specified in arguments.

Arguments: A Rust closure, the first argument is a received item and the second argument is a variadic var_args! tuple list where each item name is an output port.

Takes the input stream and allows the user to determine which items to deliver to any number of output streams.

Note: Downstream operators may need explicit type annotations.

Note: The Pusherator trait is automatically imported to enable the .give(...) method.

Note: The closure has access to the context object.

my_demux = source_iter(1..=100) -> demux(|v, var_args!(fzbz, fizz, buzz, rest)|
match (v % 3, v % 5) {
(0, 0) => fzbz.give(v),
(0, _) => fizz.give(v),
(_, 0) => buzz.give(v),
(_, _) => rest.give(v),
}
);
my_demux[fzbz] -> for_each(|v| println!("{}: fizzbuzz", v));
my_demux[fizz] -> for_each(|v| println!("{}: fizz", v));
my_demux[buzz] -> for_each(|v| println!("{}: buzz", v));
my_demux[rest] -> for_each(|v| println!("{}", v));

demux_enum

InputsSyntaxOutputsFlow
exactly 1-> demux_enum()any number ofStreaming

Output port names: Variadic, as specified in arguments.

Generic Argument: A enum type which has #[derive(DemuxEnum)]. Must match the items in the input stream.

Takes an input stream of enum instances and splits them into their variants.

#[derive(DemuxEnum)]
enum Shape {
Square(f64),
Rectangle(f64, f64),
Circle { r: f64 },
Triangle { w: f64, h: f64 }
}

let mut df = dfir_syntax! {
my_demux = source_iter([
Shape::Square(9.0),
Shape::Rectangle(10.0, 8.0),
Shape::Circle { r: 5.0 },
Shape::Triangle { w: 12.0, h: 13.0 },
]) -> demux_enum::\<Shape>();

my_demux[Square] -> map(|s| s * s) -> out;
my_demux[Circle] -> map(|(r,)| std::f64::consts::PI * r * r) -> out;
my_demux[Rectangle] -> map(|(w, h)| w * h) -> out;
my_demux[Circle] -> map(|(w, h)| 0.5 * w * h) -> out;

out = union() -> for_each(|area| println!("Area: {}", area));
};
df.run_available();

dest_file

InputsSyntaxOutputsFlow
exactly 1-> dest_file(A, B)exactly 0Streaming

0 input streams, 1 output stream

Arguments: (1) An AsRef\<Path> for a file to write to, and (2) a bool append.

Consumes Strings by writing them as lines to a file. The file will be created if it doesn't exist. Lines will be appended to the file if append is true, otherwise the file will be truncated before lines are written.

Note this operator must be used within a Tokio runtime.

source_iter(1..=10) -> map(|n| format!("Line {}", n)) -> dest_file("dest.txt", false);

dest_sink

InputsSyntaxOutputsFlow
exactly 1-> dest_sink(A)exactly 0Streaming

Arguments: An async Sink.

Consumes items by sending them to an async Sink. A Sink is a thing into which values can be sent, asynchronously. For example, sending items into a bounded channel.

Note this operator must be used within a Tokio runtime, and the Hydroflow program must be launched with run_async.

// In this example we use a _bounded_ channel for our `Sink`. This is for demonstration only,
// instead you should use [`dfir_rs::util::unbounded_channel`]. A bounded channel results in
// `Hydroflow` buffering items internally instead of within the channel. (We can't use
// unbounded here since unbounded channels are synchonous to write to and therefore not
// `Sink`s.)
let (send, recv) = tokio::sync::mpsc::channel::\<usize>(5);
// `PollSender` adapts the send half of the bounded channel into a `Sink`.
let send = tokio_util::sync::PollSender::new(send);

let mut flow = dfir_rs::dfir_syntax! {
source_iter(0..10) -> dest_sink(send);
};
// Call `run_async()` to allow async events to propagate, run for one second.
tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
.await
.expect_err("Expected time out");

let mut recv = tokio_stream::wrappers::ReceiverStream::new(recv);
// Only 5 elements received due to buffer size.
// (Note that if we were using a multi-threaded executor instead of `current_thread` it would
// be possible for more items to be added as they're removed, resulting in >5 collected.)
let out: Vec\<_> = dfir_rs::util::ready_iter(&mut recv).collect();
assert_eq!(&[0, 1, 2, 3, 4], &*out);

Sink is different from AsyncWrite. Instead of discrete values we send arbitrary streams of bytes into an AsyncWrite value. For example, writings a stream of bytes to a file, a socket, or stdout.

To handle those situations we can use a codec from tokio_util::codec. These specify ways in which the byte stream is broken into individual items, such as with newlines or with length delineation.

If we only want to write a stream of bytes without delineation we can use the BytesCodec.

In this example we use a duplex as our AsyncWrite with a BytesCodec.

use bytes::Bytes;
use tokio::io::AsyncReadExt;

// Like a channel, but for a stream of bytes instead of discrete objects.
let (asyncwrite, mut asyncread) = tokio::io::duplex(256);
// Now instead handle discrete byte strings by length-encoding them.
let sink = tokio_util::codec::FramedWrite::new(asyncwrite, tokio_util::codec::BytesCodec::new());

let mut flow = dfir_rs::dfir_syntax! {
source_iter([
Bytes::from_static(b"hello"),
Bytes::from_static(b"world"),
]) -> dest_sink(sink);
};
tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
.await
.expect_err("Expected time out");

let mut buf = Vec::\<u8>::new();
asyncread.read_buf(&mut buf).await.unwrap();
assert_eq!(b"helloworld", &*buf);

dest_sink_serde

InputsSyntaxOutputsFlow
exactly 1-> dest_sink_serde(A)exactly 0Streaming

Arguments: A serializing async Sink.

Consumes (payload, addr) pairs by serializing the payload and sending the resulting pair to an async Sink that delivers them to the SocketAddr specified by addr.

Note this operator must be used within a Tokio runtime.

async fn serde_out() {
let addr = dfir_rs::util::ipv4_resolve("localhost:9000".into()).unwrap();
let (outbound, inbound, _) = dfir_rs::util::bind_udp_bytes(addr).await;
let remote = dfir_rs::util::ipv4_resolve("localhost:9001".into()).unwrap();
let mut flow = dfir_rs::dfir_syntax! {
source_iter(vec![("hello".to_string(), 1), ("world".to_string(), 2)])
-> map (|m| (m, remote)) -> dest_sink_serde(outbound);
};
flow.run_available();
}

difference

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]difference() ->exactly 1Blocking

Input port names: pos (streaming), neg (blocking)

2 input streams of the same type T, 1 output stream of type T

Forms the set difference of the items in the input streams, returning items in the pos input that are not found in the neg input.

difference can be provided with one or two generic lifetime persistence arguments in the same way as join, see join's documentation for more info.

Note set semantics only for the neg input.

source_iter(vec!["dog", "cat", "elephant"]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = difference() -> assert_eq(["elephant"]);

difference_multiset

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]difference_multiset() ->exactly 1Blocking

Input port names: pos (streaming), neg (blocking)

2 input streams of the same type T, 1 output stream of type T

Forms the set difference of the items in the input streams, returning items in the pos input that are not found in the neg input.

difference can be provided with one or two generic lifetime persistence arguments in the same way as join, see join's documentation for more info.

Note multiset semantics here: each (possibly duplicated) item in the pos input that has no match in neg is sent to the output.

source_iter(vec!["cat", "cat", "elephant", "elephant"]) -> [pos]diff;
source_iter(vec!["cat", "gorilla"]) -> [neg]diff;
diff = difference_multiset() -> assert_eq(["elephant", "elephant"]);

enumerate

InputsSyntaxOutputsFlow
exactly 1-> enumerate() ->exactly 1Streaming

1 input stream of type T, 1 output stream of type (usize, T)

For each item passed in, enumerate it with its index: (0, x_0), (1, x_1), etc.

enumerate can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify if indexing resets. If 'tick (the default) is specified, indexing will restart at zero at the start of each tick. Otherwise 'static will never reset and count monotonically upwards.

source_iter(vec!["hello", "world"])
-> enumerate()
-> assert_eq([(0, "hello"), (1, "world")]);

filter

InputsSyntaxOutputsFlow
exactly 1-> filter(A) ->exactly 1Streaming

Filter outputs a subsequence of the items it receives at its input, according to a Rust boolean closure passed in as an argument.

The closure receives a reference &T rather than an owned value T because filtering does not modify or take ownership of the values. If you need to modify the values while filtering use filter_map instead.

Note: The closure has access to the context object.

source_iter(vec!["hello", "world"]) -> filter(|x| x.starts_with('w'))
-> assert_eq(["world"]);

filter_map

InputsSyntaxOutputsFlow
exactly 1-> filter_map(A) ->exactly 1Streaming

1 input stream, 1 output stream

An operator that both filters and maps. It yields only the items for which the supplied closure returns Some(value).

Note: The closure has access to the context object.

source_iter(vec!["1", "hello", "world", "2"])
-> filter_map(|s| s.parse::\<usize>().ok())
-> assert_eq([1, 2]);

flat_map

InputsSyntaxOutputsFlow
exactly 1-> flat_map(A) ->exactly 1Streaming

1 input stream, 1 output stream

Arguments: A Rust closure that handles an iterator

For each item i passed in, treat i as an iterator and map the closure to that iterator to produce items one by one. The type of the input items must be iterable.

Note: The closure has access to the context object.

// should print out each character of each word on a separate line
source_iter(vec!["hello", "world"])
-> flat_map(|x| x.chars())
-> assert_eq(['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']);

flatten

InputsSyntaxOutputsFlow
exactly 1-> flatten() ->exactly 1Streaming

1 input stream, 1 output stream

For each item i passed in, treat i as an iterator and produce its items one by one. The type of the input items must be iterable.

source_iter(vec![[1, 2], [3, 4], [5, 6]])
-> flatten()
-> assert_eq([1, 2, 3, 4, 5, 6]);

fold

InputsSyntaxOutputsFlow
exactly 1-> fold(A, B)at least 0 and at most 1Blocking

1 input stream, 1 output stream

Arguments: two arguments, both closures. The first closure is used to create the initial value for the accumulator, and the second is used to combine new items with the existing accumulator value. The second closure takes two two arguments: an &mut Accum accumulated value, and an Item.

Akin to Rust's built-in fold operator, except that it takes the accumulator by &mut instead of by value. Folds every item into an accumulator by applying a closure, returning the final result.

Note: The closures have access to the context object.

fold can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. With 'tick, Items will only be collected within the same tick. With 'static, the accumulated value will be remembered across ticks and will be aggregated with items arriving in later ticks. When not explicitly specified persistence defaults to 'tick.

// should print `Reassembled vector [1,2,3,4,5]`
source_iter([1,2,3,4,5])
-> fold::\<'tick>(Vec::new, |accum: &mut Vec\<_>, elem| {
accum.push(elem);
})
-> assert_eq([vec![1, 2, 3, 4, 5]]);

fold_keyed

InputsSyntaxOutputsFlow
exactly 1-> fold_keyed(A, B) ->exactly 1Blocking

1 input stream of type (K, V1), 1 output stream of type (K, V2). The output will have one tuple for each distinct K, with an accumulated value of type V2.

If the input and output value types are the same and do not require initialization then use reduce_keyed.

Arguments: two Rust closures. The first generates an initial value per group. The second itself takes two arguments: an 'accumulator', and an element. The second closure returns the value that the accumulator should have for the next iteration.

A special case of fold, in the spirit of SQL's GROUP BY and aggregation constructs. The input is partitioned into groups by the first field ("keys"), and for each group the values in the second field are accumulated via the closures in the arguments.

Note: The closures have access to the context object.

source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)])
-> fold_keyed(|| 0, |old: &mut u32, val: u32| *old += val)
-> assert_eq([("toy", 3), ("shoe", 46), ("haberdashery", 7)]);

fold_keyed can be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. With 'tick, values will only be collected within the same tick. With 'static, values will be remembered across ticks and will be aggregated with pairs arriving in later ticks. When not explicitly specified persistence defaults to 'tick.

fold_keyed can also be provided with two type arguments, the key type K and aggregated output value type V2. This is required when using 'static persistence if the compiler cannot infer the types.

source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)])
-> fold_keyed(|| 0, |old: &mut u32, val: u32| *old += val)
-> assert_eq([("toy", 3), ("shoe", 46), ("haberdashery", 7)]);

Example using 'tick persistence:

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> fold_keyed::\<'tick, &str, String>(String::new, |old: &mut _, val| {
*old += val;
*old += ", ";
})
-> for_each(|(k, v)| println!("({:?}, {:?})", k, v));
};

input_send.send(("hello", "oakland")).unwrap();
input_send.send(("hello", "berkeley")).unwrap();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_available();
// ("hello", "oakland, berkeley, san francisco, ")

input_send.send(("hello", "palo alto")).unwrap();
flow.run_available();
// ("hello", "palo alto, ")

for_each

InputsSyntaxOutputsFlow
exactly 1-> for_each(A)exactly 0Streaming

1 input stream, 0 output streams

Arguments: a Rust closure

Iterates through a stream passing each element to the closure in the argument.

Note: The closure has access to the context object.

    source_iter(vec!["Hello", "World"])
-> for_each(|x| println!("{}", x));

identity

InputsSyntaxOutputsFlow
exactly 1-> identity() ->exactly 1Streaming

1 input stream of type T, 1 output stream of type T

For each item passed in, pass it out without any change.

source_iter(vec!["hello", "world"])
-> identity()
-> assert_eq(["hello", "world"]);

You can also supply a type parameter identity::\<MyType>() to specify what items flow through the the pipeline. This can be useful for helping the compiler infer types.

// Use type parameter to ensure items are `i32`s.
source_iter(0..10)
-> identity::\<i32>()
-> assert_eq([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

initialize

InputsSyntaxOutputsFlow
exactly 0initialize() ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: None.

Emits a single unit () at the start of the first tick.

initialize()
-> assert_eq([()]);

inspect

InputsSyntaxOutputsFlow
exactly 1-> inspect(A)at least 0 and at most 1Streaming

Arguments: A single closure FnMut(&Item).

An operator which allows you to "inspect" each element of a stream without modifying it. The closure is called on a reference to each item. This is mainly useful for debugging as in the example below, and it is generally an anti-pattern to provide a closure with side effects.

Note: The closure has access to the context object.

source_iter([1, 2, 3, 4])
-> inspect(|x| println!("{}", x))
-> assert_eq([1, 2, 3, 4]);

join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>

Forms the equijoin of the tuples in the input streams by their first (key) attribute. Note that the result nests the 2nd input field (values) into a tuple in the 2nd output field.

source_iter(vec![("hello", "world"), ("stay", "gold"), ("hello", "world")]) -> [0]my_join;
source_iter(vec![("hello", "cleveland")]) -> [1]my_join;
my_join = join()
-> assert_eq([("hello", ("world", "cleveland"))]);

join can also be provided with one or two generic lifetime persistence arguments, either 'tick or 'static, to specify how join data persists. With 'tick, pairs will only be joined with corresponding pairs within the same tick. With 'static, pairs will be remembered across ticks and will be joined with pairs arriving in later ticks. When not explicitly specified persistence defaults to `tick.

When two persistence arguments are supplied the first maps to port 0 and the second maps to port 1. When a single persistence argument is supplied, it is applied to both input ports. When no persistence arguments are applied it defaults to 'tick for both.

The syntax is as follows:

join(); // Or
join::\<'static>();

join::\<'tick>();

join::\<'static, 'tick>();

join::\<'tick, 'static>();
// etc.

join is defined to treat its inputs as sets, meaning that it eliminates duplicated values in its inputs. If you do not want duplicates eliminated, use the join_multiset operator.

Examples

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_iter([("hello", "world")]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join = join::\<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
};
input_send.send(("hello", "oakland")).unwrap();
flow.run_tick();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_tick();

Prints out "(hello, (world, oakland))" since source_iter([("hello", "world")]) is only included in the first tick, then forgotten.


let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_iter([("hello", "world")]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join = join::\<'static>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
};
input_send.send(("hello", "oakland")).unwrap();
flow.run_tick();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_tick();

Prints out "(hello, (world, oakland))" and "(hello, (world, san francisco))" since the inputs are peristed across ticks.

join_fused

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_fused(A, B) ->exactly 1Blocking

Input port names: 0 (blocking), 1 (blocking)

2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>

join_fused takes two arguments, they are the configuration options for the left hand side and right hand side inputs respectively. There are three available configuration options, they are Reduce: if the input type is the same as the accumulator type, Fold: if the input type is different from the accumulator type, and the accumulator type has a sensible default value, and FoldFrom: if the input type is different from the accumulator type, and the accumulator needs to be derived from the first input value. Examples of all three configuration options are below:

// Left hand side input will use fold, right hand side input will use reduce,
join_fused(Fold(|| "default value", |x, y| *x += y), Reduce(|x, y| *x -= y))

// Left hand side input will use FoldFrom, and the right hand side input will use Reduce again
join_fused(FoldFrom(|x| "conversion function", |x, y| *x += y), Reduce(|x, y| *x *= y))

The three currently supported fused operator types are Fold(Fn() -> A, Fn(A, T) -> A), Reduce(Fn(A, A) -> A), and FoldFrom(Fn(T) -> A, Fn(A, T) -> A)

join_fused first performs a fold_keyed/reduce_keyed operation on each input stream before performing joining. See join(). There is currently no equivalent for FoldFrom in dfir operators.

For example, the following two dfir programs are equivalent, the former would optimize into the latter:

source_iter(vec![("key", 0), ("key", 1), ("key", 2)])
-> reduce_keyed(|x: &mut _, y| *x += y)
-> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)])
-> fold_keyed(|| 1, |x: &mut _, y| *x *= y)
-> [1]my_join;
my_join = join_multiset()
-> assert_eq([("key", (3, 6))]);
source_iter(vec![("key", 0), ("key", 1), ("key", 2)])
-> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)])
-> [1]my_join;
my_join = join_fused(Reduce(|x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (3, 6))]);

Here is an example of using FoldFrom to derive the accumulator from the first value:

source_iter(vec![("key", 0), ("key", 1), ("key", 2)])
-> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)])
-> [1]my_join;
my_join = join_fused(FoldFrom(|x| x + 3, |x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (6, 6))]);

The benefit of this is that the state between the reducing/folding operator and the join is merged together.

join_fused follows the same persistence rules as join and all other operators. By default, both the left hand side and right hand side are 'tick persistence. They can be set to 'static persistence by specifying 'static in the type arguments of the operator.

for join_fused::\<'static>, the operator will replay all keys that the join has ever seen each tick, and not only the new matches from that specific tick. This means that it behaves identically to if persist::\<'static>() were placed before the inputs and the persistence of for example, the two following examples have identical behavior:

source_iter(vec![("key", 0), ("key", 1), ("key", 2)]) -> persist::\<'static>() -> [0]my_join;
source_iter(vec![("key", 2)]) -> my_union;
source_iter(vec![("key", 3)]) -> defer_tick() -> my_union;
my_union = union() -> persist::\<'static>() -> [1]my_join;

my_join = join_fused(Reduce(|x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (3, 2)), ("key", (3, 6))]);
source_iter(vec![("key", 0), ("key", 1), ("key", 2)]) -> [0]my_join;
source_iter(vec![("key", 2)]) -> my_union;
source_iter(vec![("key", 3)]) -> defer_tick() -> my_union;
my_union = union() -> [1]my_join;

my_join = join_fused::\<'static>(Reduce(|x, y| *x += y), Fold(|| 1, |x, y| *x *= y))
-> assert_eq([("key", (3, 2)), ("key", (3, 6))]);

join_fused_lhs

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_fused_lhs(A) ->exactly 1Blocking

Input port names: 0 (blocking), 1 (streaming)

See join_fused

This operator is identical to join_fused except that the right hand side input 1 is a regular join_multiset input.

This means that join_fused_lhs only takes one argument input, which is the reducing/folding operation for the left hand side only.

For example:

source_iter(vec![("key", 0), ("key", 1), ("key", 2)]) -> [0]my_join;
source_iter(vec![("key", 2), ("key", 3)]) -> [1]my_join;
my_join = join_fused_lhs(Reduce(|x, y| *x += y))
-> assert_eq([("key", (3, 2)), ("key", (3, 3))]);

join_fused_rhs

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_fused_rhs(A) ->exactly 1Blocking

Input port names: 0 (streaming), 1 (blocking)

See join_fused_lhs

This operator is identical to join_fused_lhs except that it is the right hand side that is fused instead of the left hand side.

join_multiset

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_multiset() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>

This operator is equivalent to join except that the LHS and RHS are collected into multisets rather than sets before joining.

If you want duplicates eliminated from the inputs, use the join operator.

For example:

lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
rhs = source_iter([("a", "hydro")]) -> tee();

lhs -> [0]multiset_join;
rhs -> [1]multiset_join;
multiset_join = join_multiset() -> assert_eq([("a", (0, "hydro")), ("a", (0, "hydro"))]);

lhs -> [0]set_join;
rhs -> [1]set_join;
set_join = join() -> assert_eq([("a", (0, "hydro"))]);

lattice_bimorphism

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]lattice_bimorphism(A, B, C) ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

An operator representing a lattice bimorphism.

2 input streams, of type LhsItem and RhsItem.

Three argument, one LatticeBimorphism function Func, an LhsState singleton reference, and an RhsState singleton reference.

1 output stream of the output type of the LatticeBimorphism function.

The function must be a lattice bimorphism for both (LhsState, RhsItem) and (RhsState, LhsItem).

use std::collections::HashSet;
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};

lhs = source_iter(0..3)
-> map(SetUnionSingletonSet::new_from)
-> state::\<'static, SetUnionHashSet\<u32>>();
rhs = source_iter(3..5)
-> map(SetUnionSingletonSet::new_from)
-> state::\<'static, SetUnionHashSet\<u32>>();

lhs -> [0]my_join;
rhs -> [1]my_join;

my_join = lattice_bimorphism(CartesianProductBimorphism::\<HashSet\<_>>::default(), #lhs, #rhs)
-> assert_eq([SetUnionHashSet::new(HashSet::from_iter([
(0, 3),
(0, 4),
(1, 3),
(1, 4),
(2, 3),
(2, 4),
]))]);

lattice_fold

InputsSyntaxOutputsFlow
exactly 1-> lattice_fold(A) ->exactly 1Blocking

1 input stream, 1 output stream

A specialized operator for merging lattices together into a accumulated value. Like fold() but specialized for lattice types. lattice_fold(MyLattice::default) is equivalent to fold(MyLattice::default, dfir_rs::lattices::Merge::merge).

lattice_fold can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. With 'tick, values will only be collected within the same tick. With 'static, values will be remembered across ticks and will be aggregated with pairs arriving in later ticks. When not explicitly specified persistence defaults to 'tick.

lattice_fold is differentiated from lattice_reduce in that lattice_fold can accumulate into a different type from its input. But it also means that the accumulating type must have a sensible default value

use dfir_rs::lattices::set_union::SetUnionSingletonSet;
use dfir_rs::lattices::set_union::SetUnionHashSet;

source_iter([SetUnionSingletonSet::new_from(7)])
-> lattice_fold(SetUnionHashSet::\<usize>::default)
-> assert_eq([SetUnionHashSet::new_from([7])]);

lattice_reduce

InputsSyntaxOutputsFlow
exactly 1-> lattice_reduce() ->exactly 1Blocking

1 input stream, 1 output stream

A specialized operator for merging lattices together into an accumulated value. Like reduce() but specialized for lattice types. lattice_reduce() is equivalent to reduce(dfir_rs::lattices::Merge::merge).

lattice_reduce can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. With 'tick, values will only be collected within the same tick. With 'static, values will be remembered across ticks and will be aggregated with pairs arriving in later ticks. When not explicitly specified persistence defaults to 'tick.

lattice_reduce is differentiated from lattice_fold in that lattice_reduce does not require the accumulating type to have a sensible default value. But it also means that the accumulating function inputs and the accumulating type must be the same.

use dfir_rs::lattices::Max;

source_iter([1, 2, 3, 4, 5])
-> map(Max::new)
-> lattice_reduce()
-> assert_eq([Max::new(5)]);

map

InputsSyntaxOutputsFlow
exactly 1-> map(A) ->exactly 1Streaming

1 input stream, 1 output stream

Arguments: A Rust closure

For each item passed in, apply the closure to generate an item to emit.

If you do not want to modify the item stream and instead only want to view each item use the inspect operator instead.

Note: The closure has access to the context object.

source_iter(vec!["hello", "world"]) -> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD"]);

multiset_delta

InputsSyntaxOutputsFlow
exactly 1-> multiset_delta() ->exactly 1Streaming

Multiset delta from the previous tick.

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<u32>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> multiset_delta()
-> for_each(|n| println!("{}", n));
};

input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
flow.run_tick();
// 3, 4,

input_send.send(3).unwrap();
input_send.send(5).unwrap();
input_send.send(3).unwrap();
input_send.send(3).unwrap();
flow.run_tick();
// 5, 3
// First two "3"s are removed due to previous tick.

next_stratum

InputsSyntaxOutputsFlow
exactly 1-> next_stratum() ->exactly 1Blocking

Delays all elements which pass through to the next stratum (in the same tick).

You can also supply a type parameter next_stratum::\<MyType>() to specify what items flow through the the pipeline. This can be useful for helping the compiler infer types.

null

InputsSyntaxOutputsFlow
at least 0 and at most 1null()at least 0 and at most 1Streaming

unbounded number of input streams of any type, unbounded number of output streams of any type.

As a source, generates nothing. As a sink, absorbs anything with no effect.

// should print `1, 2, 3, 4, 5, 6, a, b, c` across 9 lines
null() -> for_each(|_: ()| panic!());
source_iter([1,2,3]) -> map(|i| println!("{}", i)) -> null();
null_src = null();
null_sink = null();
null_src[0] -> for_each(|_: ()| panic!());
// note: use `for_each()` (or `inspect()`) instead of this:
source_iter([4,5,6]) -> map(|i| println!("{}", i)) -> [0]null_sink;

partition

InputsSyntaxOutputsFlow
exactly 1-> partition(A)[<output_port>] ->at least 2Streaming

Output port names: Variadic, as specified in arguments.

This operator takes the input pipeline and allows the user to determine which singular output pipeline each item should be delivered to.

Arguments: A Rust closure, the first argument is a reference to the item and the second argument corresponds to one of two modes, either named or indexed.

Note: The closure has access to the context object.

Named mode

With named ports, the closure's second argument must be a Rust 'slice pattern' of names, such as [port_a, port_b, port_c], where each name is an output port. The closure should return the name of the desired output port.

my_partition = source_iter(1..=100) -> partition(|val: &usize, [fzbz, fizz, buzz, rest]|
match (val % 3, val % 5) {
(0, 0) => fzbz,
(0, _) => fizz,
(_, 0) => buzz,
(_, _) => rest,
}
);
my_partition[fzbz] -> for_each(|v| println!("{}: fizzbuzz", v));
my_partition[fizz] -> for_each(|v| println!("{}: fizz", v));
my_partition[buzz] -> for_each(|v| println!("{}: buzz", v));
my_partition[rest] -> for_each(|v| println!("{}", v));

Indexed mode

With indexed mode, the closure's second argument is a the number of output ports. This is a single usize value, useful for e.g. round robin partitioning. Each output pipeline port must be numbered with an index, starting from zero and with no gaps. The closure returns the index of the desired output port.

my_partition = source_iter(1..=100) -> partition(|val, num_outputs| val % num_outputs);
my_partition[0] -> for_each(|v| println!("0: {}", v));
my_partition[1] -> for_each(|v| println!("1: {}", v));
my_partition[2] -> for_each(|v| println!("2: {}", v));

persist

InputsSyntaxOutputsFlow
exactly 1-> persist() ->exactly 1Streaming

Stores each item as it passes through, and replays all item every tick.

// Normally `source_iter(...)` only emits once, but `persist::\<'static>()` will replay the `"hello"`
// on every tick.
source_iter(["hello"])
-> persist::\<'static>()
-> assert_eq(["hello"]);

persist() can be used to introduce statefulness into stateless pipelines. In the example below, the join only stores data for single tick. The persist::\<'static>() operator introduces statefulness across ticks. This can be useful for optimization transformations within the dfir compiler. Equivalently, we could specify that the join has static persistence (my_join = join::\<'static>()).

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_iter([("hello", "world")]) -> persist::\<'static>() -> [0]my_join;
source_stream(input_recv) -> persist::\<'static>() -> [1]my_join;
my_join = join::\<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2));
};
input_send.send(("hello", "oakland")).unwrap();
flow.run_tick();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_tick();
// (hello, (world, oakland))
// (hello, (world, oakland))
// (hello, (world, san francisco))

persist_mut

InputsSyntaxOutputsFlow
exactly 1-> persist_mut() ->exactly 1Blocking

persist_mut() is similar to persist() except that it also enables deletions. persist_mut() expects an input of type Persistence\<T>, and it is this enumeration that enables the user to communicate deletion. Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result in a a single 1 value being stored.

use dfir_rs::util::Persistence;

source_iter([
Persistence::Persist(1),
Persistence::Persist(2),
Persistence::Delete(1),
])
-> persist_mut::\<'static>()
-> assert_eq([2]);

persist_mut_keyed

InputsSyntaxOutputsFlow
exactly 1-> persist_mut_keyed() ->exactly 1Blocking

persist_mut_keyed() is similar to persist_mut() except that it also enables key-based deletions persist_mut() expects an input of type PersistenceKeyed\<T>, and it is this enumeration that enables the user to communicate deletion. Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result in a a single 1 value being stored.

use dfir_rs::util::PersistenceKeyed;

source_iter([
PersistenceKeyed::Persist(0, 1),
PersistenceKeyed::Persist(1, 1),
PersistenceKeyed::Delete(1),
])
-> persist_mut_keyed::\<'static>()
-> assert_eq([(0, 1)]);

py_udf

InputsSyntaxOutputsFlow
exactly 1-> py_udf(A, B) ->exactly 1Streaming

Arguments: First, the source code for a python module, second, the name of a unary function defined within the module source code.

Requires the "python" feature to be enabled.

An operator which allows you to run a python udf. Input arguments must be a stream of tuples whose items implement IntoPy. See the relevant pyo3 docs here.

Output items are of type PyResult\<Py\<PyAny>>. Rust native types can be extracted using .extract(), see the relevant pyo3 docs here or the examples below.

use pyo3::prelude::*;

source_iter(0..10)
-> map(|x| (x,))
-> py_udf("
def fib(n):
if n \< 2:
return n
else:
return fib(n - 2) + fib(n - 1)
", "fib")
-> map(|x: PyResult\<Py\<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert_eq([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
use pyo3::prelude::*;

source_iter([(5,1)])
-> py_udf("
def add(a, b):
return a + b
", "add")
-> map(|x: PyResult\<Py\<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert_eq([6]);

reduce

InputsSyntaxOutputsFlow
exactly 1-> reduce(A)at least 0 and at most 1Blocking

1 input stream, 1 output stream

Arguments: a closure which itself takes two arguments: an &mut Accum accumulator mutable reference, and an Item. The closure should merge the item into the accumulator.

Akin to Rust's built-in reduce operator, except that it takes the accumulator by &mut instead of by value. Reduces every item into an accumulator by applying a closure, returning the final result.

Note: The closure has access to the context object.

reduce can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. With 'tick, values will only be collected within the same tick. With 'static, the accumulated value will be remembered across ticks and items are aggregated with items arriving in later ticks. When not explicitly specified persistence defaults to 'tick.

source_iter([1,2,3,4,5])
-> reduce::\<'tick>(|accum: &mut _, elem| {
*accum *= elem;
})
-> assert_eq([120]);

reduce_keyed

InputsSyntaxOutputsFlow
exactly 1-> reduce_keyed(A) ->exactly 1Blocking

1 input stream of type (K, V), 1 output stream of type (K, V). The output will have one tuple for each distinct K, with an accumulated (reduced) value of type V.

If you need the accumulated value to have a different type than the input, use fold_keyed.

Arguments: one Rust closures. The closure takes two arguments: an &mut 'accumulator', and an element. Accumulator should be updated based on the element.

A special case of reduce, in the spirit of SQL's GROUP BY and aggregation constructs. The input is partitioned into groups by the first field, and for each group the values in the second field are accumulated via the closures in the arguments.

Note: The closures have access to the context object.

reduce_keyed can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. With 'tick, values will only be collected within the same tick. With 'static, values will be remembered across ticks and will be aggregated with pairs arriving in later ticks. When not explicitly specified persistence defaults to 'tick.

reduce_keyed can also be provided with two type arguments, the key and value type. This is required when using 'static persistence if the compiler cannot infer the types.

source_iter([("toy", 1), ("toy", 2), ("shoe", 11), ("shoe", 35), ("haberdashery", 7)])
-> reduce_keyed(|old: &mut u32, val: u32| *old += val)
-> assert_eq([("toy", 3), ("shoe", 46), ("haberdashery", 7)]);

Example using 'tick persistence and type arguments:

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<(&str, &str)>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> reduce_keyed::\<'tick, &str>(|old: &mut _, val| *old = std::cmp::max(*old, val))
-> for_each(|(k, v)| println!("({:?}, {:?})", k, v));
};

input_send.send(("hello", "oakland")).unwrap();
input_send.send(("hello", "berkeley")).unwrap();
input_send.send(("hello", "san francisco")).unwrap();
flow.run_available();
// ("hello", "oakland, berkeley, san francisco, ")

input_send.send(("hello", "palo alto")).unwrap();
flow.run_available();
// ("hello", "palo alto, ")

sort

InputsSyntaxOutputsFlow
exactly 1-> sort() ->exactly 1Blocking

Takes a stream as input and produces a sorted version of the stream as output.

source_iter(vec![2, 3, 1])
-> sort()
-> assert_eq([1, 2, 3]);

sort is blocking. Only the values collected within a single tick will be sorted and emitted.

sort_by_key

InputsSyntaxOutputsFlow
exactly 1-> sort_by_key(A) ->exactly 1Blocking

Like sort, takes a stream as input and produces a version of the stream as output. This operator sorts according to the key extracted by the closure.

Note: The closure has access to the context object.

source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')])
-> sort_by_key(|(k, _v)| k)
-> assert_eq([(1, 'z'), (2, 'y'), (3, 'x')]);

source_file

InputsSyntaxOutputsFlow
exactly 0source_file(A) ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: An AsRef\<Path> for a file to read.

Reads the referenced file one line at a time. The line will NOT include the line ending.

Will panic if the file could not be read, or if the file contains bytes that are not valid UTF-8.

source_file("Cargo.toml") -> for_each(|line| println!("{}", line));

source_interval

InputsSyntaxOutputsFlow
exactly 0source_interval(A) ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: A Duration for this interval.

Emits units () on a repeated interval. The first tick completes immediately. Missed ticks will be scheduled as soon as possible.

Note that this requires the dfir instance be run within a Tokio Runtime. The easiest way to do this is with a #[dfir_rs::main] annotation on async fn main() { ... } as in the example below.

use std::time::Duration;
use std::time::Instant;
use dfir_rs::dfir_syntax;

#[dfir_rs::main]
async fn main() {
let mut hf = dfir_syntax! {
source_interval(Duration::from_secs(1))
-> map(|_| { Instant::now() } )
-> for_each(|time| println!("This runs every second: {:?}", time));
};

// Will print 4 times (fencepost counting).
tokio::time::timeout(Duration::from_secs_f32(3.5), hf.run_async())
.await
.expect_err("Expected time out");

// Example output:
// This runs every second: Instant { t: 27471.704813s }
// This runs every second: Instant { t: 27472.704813s }
// This runs every second: Instant { t: 27473.704813s }
// This runs every second: Instant { t: 27474.704813s }
}

source_iter

InputsSyntaxOutputsFlow
exactly 0source_iter(A) ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: An iterable Rust object.

Takes the iterable object and delivers its elements downstream one by one.

Note that all elements are emitted during the first tick.

    source_iter(vec!["Hello", "World"])
-> for_each(|x| println!("{}", x));

source_json

InputsSyntaxOutputsFlow
exactly 0source_json(A) ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: An AsRef\<Path> for a file to read as json. This will emit the parsed value one time.

source_json may take one generic type argument, the type of the value to be parsed, which must implement Deserialize.

source_json("example.json") -> for_each(|json: dfir_rs::serde_json::Value| println!("{:#?}", json));

source_stdin

InputsSyntaxOutputsFlow
exactly 0source_stdin() ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: port number

source_stdin receives a Stream of lines from stdin and emits each of the elements it receives downstream.

source_stdin()
-> map(|x| x.unwrap().to_uppercase())
-> for_each(|x| println!("{}", x));

source_stream

InputsSyntaxOutputsFlow
exactly 0source_stream(A) ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: The receive end of a tokio channel

Given a Stream created in Rust code, source_stream is passed the receive endpoint of the channel and emits each of the elements it receives downstream.

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<&str>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv) -> map(|x| x.to_uppercase())
-> for_each(|x| println!("{}", x));
};
input_send.send("Hello").unwrap();
input_send.send("World").unwrap();
flow.run_available();

source_stream_serde

InputsSyntaxOutputsFlow
exactly 0source_stream_serde(A) ->exactly 1Streaming

0 input streams, 1 output stream

Arguments: Stream

Given a Stream of (serialized payload, addr) pairs, deserializes the payload and emits each of the elements it receives downstream.

async fn serde_in() {
let addr = dfir_rs::util::ipv4_resolve("localhost:9000".into()).unwrap();
let (outbound, inbound, _) = dfir_rs::util::bind_udp_bytes(addr).await;
let mut flow = dfir_rs::dfir_syntax! {
source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(x, a): (String, std::net::SocketAddr)| x.to_uppercase())
-> for_each(|x| println!("{}", x));
};
flow.run_available();
}

spin

InputsSyntaxOutputsFlow
exactly 0spin() ->exactly 1Streaming

This operator emits Unit, and triggers the start of a new tick at the end of each tick, which will cause spinning-like behavior. Note that run_available will run forever, so in the example below we illustrate running manually for 100 ticks.

let mut flow = dfir_rs::dfir_syntax! {
spin() -> for_each(|x| println!("tick {}: {:?}", context.current_tick(), x));
};
for _ in 1..100 {
flow.run_tick();
}

state

InputsSyntaxOutputsFlow
exactly 1-> state()at least 0 and at most 1Streaming

A lattice-based state operator, used for accumulating lattice state

Emits both a referenceable singleton and (optionally) a pass-through stream. In the future the pass-through stream may be deduplicated.

use std::collections::HashSet;

use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};

my_state = source_iter(0..3)
-> map(SetUnionSingletonSet::new_from)
-> state::\<SetUnionHashSet\<usize>>();

state_by

InputsSyntaxOutputsFlow
exactly 1-> state_by(A)at least 0 and at most 1Streaming

List state operator, but with a closure to map the input to the state lattice.

The emitted outputs (both the referencable singleton and the optional pass-through stream) are of the same type as the inputs to the state_by operator and are not required to be a lattice type. This is useful receiving pass-through context information on the output side.

use std::collections::HashSet;

use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};

my_state = source_iter(0..3)
-> state_by::\<SetUnionHashSet\<usize>>(SetUnionSingletonSet::new_from);

tee

InputsSyntaxOutputsFlow
exactly 1-> tee()[<output_port>] ->at least 2Streaming

1 input stream, n output streams

Takes the input stream and delivers a copy of each item to each output.

Note: Downstream operators may need explicit type annotations.

my_tee = source_iter(vec!["Hello", "World"]) -> tee();
my_tee -> map(|x: &str| x.to_uppercase()) -> assert_eq(["HELLO", "WORLD"]);
my_tee -> map(|x: &str| x.to_lowercase()) -> assert_eq(["hello", "world"]);
my_tee -> assert_eq(["Hello", "World"]);

union

InputsSyntaxOutputsFlow
at least 2-> [<input_port>]union() ->exactly 1Streaming

n input streams of the same type, 1 output stream of the same type

Unions an arbitrary number of input streams into a single stream. Each input sequence is a subsequence of the output, but no guarantee is given on how the inputs are interleaved.

Since union has multiple input streams, it needs to be assigned to a variable to reference its multiple input ports across statements.

source_iter(vec!["hello", "world"]) -> my_union;
source_iter(vec!["stay", "gold"]) -> my_union;
source_iter(vec!["don't", "give", "up"]) -> my_union;
my_union = union()
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY", "GOLD", "DON'T", "GIVE", "UP"]);

unique

InputsSyntaxOutputsFlow
exactly 1-> unique() ->exactly 1Streaming

Takes one stream as input and filters out any duplicate occurrences. The output contains all unique values from the input.

source_iter(vec![1, 1, 2, 3, 2, 1, 3])
-> unique()
-> assert_eq([1, 2, 3]);

unique can also be provided with one generic lifetime persistence argument, either 'tick or 'static, to specify how data persists. The default is 'tick. With 'tick, uniqueness is only considered within the current tick, so across multiple ticks duplicate values may be emitted. With 'static, values will be remembered across ticks and no duplicates will ever be emitted.

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::\<usize>();
let mut flow = dfir_rs::dfir_syntax! {
source_stream(input_recv)
-> unique::\<'tick>()
-> for_each(|n| println!("{}", n));
};

input_send.send(3).unwrap();
input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
flow.run_available();
// 3, 4

input_send.send(3).unwrap();
input_send.send(5).unwrap();
flow.run_available();
// 3, 5
// Note: 3 is emitted again.

unzip

InputsSyntaxOutputsFlow
exactly 1-> unzip()[<output_port>] ->exactly 2Streaming

Output port names: 0, 1

1 input stream of pair tuples (A, B), 2 output streams

Takes the input stream of pairs and unzips each one, delivers each item to its corresponding side.

my_unzip = source_iter(vec![("Hello", "Foo"), ("World", "Bar")]) -> unzip();
my_unzip[0] -> assert_eq(["Hello", "World"]);
my_unzip[1] -> assert_eq(["Foo", "Bar"]);

zip

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]zip() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type V1 and V2, 1 output stream of type (V1, V2)

Zips the streams together, forming paired tuples of the inputs. Note that zipping is done per-tick. Excess items from one input or the other will be discarded. If you do not want to discard the excess, use zip_longest instead.

source_iter(0..3) -> [0]my_zip;
source_iter(0..5) -> [1]my_zip;
my_zip = zip() -> assert_eq([(0, 0), (1, 1), (2, 2)]);

zip_longest

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]zip_longest() ->exactly 1Blocking

Input port names: 0 (blocking), 1 (blocking)

2 input streams of type V1 and V2, 1 output stream of type itertools::EitherOrBoth\<V1, V2>

Zips the streams together, forming paired tuples of the inputs. Note that zipping is done per-tick. Excess items are returned as EitherOrBoth::Left(V1) or EitherOrBoth::Right(V2). If you intead want to discard the excess, use zip instead.

source_iter(0..2) -> [0]my_zip_longest;
source_iter(0..3) -> [1]my_zip_longest;
my_zip_longest = zip_longest()
-> assert_eq([
itertools::EitherOrBoth::Both(0, 0),
itertools::EitherOrBoth::Both(1, 1),
itertools::EitherOrBoth::Right(2)]);

_lattice_fold_batch

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]_lattice_fold_batch() ->exactly 1Blocking

Input port names: input (blocking), signal (blocking)

2 input streams, 1 output stream, no arguments.

Batches streaming input and releases it downstream when a signal is delivered. This allows for buffering data and delivering it later while also folding it into a single lattice data structure. This operator is similar to defer_signal in that it batches input and releases it when a signal is given. It is also similar to lattice_fold in that it folds the input into a single lattice. So, _lattice_fold_batch is a combination of both defer_signal and lattice_fold. This operator is useful when trying to combine a sequence of defer_signal and lattice_fold operators without unnecessary memory consumption.

There are two inputs to _lattice_fold_batch, they are input and signal. input is the input data flow. Data that is delivered on this input is collected in order inside of the _lattice_fold_batch operator. When anything is sent to signal the collected data is released downstream. The entire signal input is consumed each tick, so sending 5 things on signal will not release inputs on the next 5 consecutive ticks.

use lattices::set_union::SetUnionHashSet;
use lattices::set_union::SetUnionSingletonSet;

source_iter([1, 2, 3])
-> map(SetUnionSingletonSet::new_from)
-> [input]batcher;

source_iter([()])
-> [signal]batcher;

batcher = _lattice_fold_batch::\<SetUnionHashSet\<usize>>()
-> assert_eq([SetUnionHashSet::new_from([1, 2, 3])]);

_lattice_join_fused_join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]_lattice_join_fused_join() ->exactly 1Blocking

Input port names: 0 (blocking), 1 (blocking)

2 input streams of type (K, V1) and (K, V2), 1 output stream of type (K, (V1', V2')) where V1, V2, V1', V2' are lattice types

Performs a fold_keyed with lattice-merge aggregate function on each input and then forms the equijoin of the resulting key/value pairs in the input streams by their first (key) attribute. Unlike join, the result is not a stream of tuples, it's a stream of MapUnionSingletonMap lattices. You can (non-monotonically) "reveal" these as tuples if desired via map; see the examples below.

You must specify the the accumulating lattice types, they cannot be inferred. The first type argument corresponds to the [0] input of the join, and the second to the [1] input. Type arguments are specified in dfir using the rust turbofish syntax ::\<>, for example _lattice_join_fused_join::\<Min\<_>, Max\<_>>() The accumulating lattice type is not necessarily the same type as the input, see the below example involving SetUnion for such a case.

Like join, _lattice_join_fused_join can also be provided with one or two generic lifetime persistence arguments, either 'tick or 'static, to specify how join data persists. With 'tick, pairs will only be joined with corresponding pairs within the same tick. With 'static, pairs will be remembered across ticks and will be joined with pairs arriving in later ticks. When not explicitly specified persistence defaults to `tick.

Like join, when two persistence arguments are supplied the first maps to port 0 and the second maps to port 1. When a single persistence argument is supplied, it is applied to both input ports. When no persistence arguments are applied it defaults to 'tick for both. It is important to specify all persistence arguments before any type arguments, otherwise the persistence arguments will be ignored.

The syntax is as follows:

_lattice_join_fused_join::\<MaxRepr\<usize>, MaxRepr\<usize>>(); // Or
_lattice_join_fused_join::\<'static, MaxRepr\<usize>, MaxRepr\<usize>>();

_lattice_join_fused_join::\<'tick, MaxRepr\<usize>, MaxRepr\<usize>>();

_lattice_join_fused_join::\<'static, 'tick, MaxRepr\<usize>, MaxRepr\<usize>>();

_lattice_join_fused_join::\<'tick, 'static, MaxRepr\<usize>, MaxRepr\<usize>>();
// etc.

Examples

use dfir_rs::lattices::Min;
use dfir_rs::lattices::Max;

source_iter([("key", Min::new(1)), ("key", Min::new(2))]) -> [0]my_join;
source_iter([("key", Max::new(1)), ("key", Max::new(2))]) -> [1]my_join;

my_join = _lattice_join_fused_join::\<'tick, Min\<usize>, Max\<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (Min::new(1), Max::new(2)))]);
use dfir_rs::lattices::set_union::SetUnionSingletonSet;
use dfir_rs::lattices::set_union::SetUnionHashSet;

source_iter([("key", SetUnionSingletonSet::new_from(0)), ("key", SetUnionSingletonSet::new_from(1))]) -> [0]my_join;
source_iter([("key", SetUnionHashSet::new_from([0])), ("key", SetUnionHashSet::new_from([1]))]) -> [1]my_join;

my_join = _lattice_join_fused_join::\<'tick, SetUnionHashSet\<usize>, SetUnionHashSet\<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (SetUnionHashSet::new_from([0, 1]), SetUnionHashSet::new_from([0, 1])))]);