Embedded Mode
Hydro's standard deployment model compiles each location into a standalone binary and manages networking automatically. But sometimes you need more control — maybe you're integrating Hydro into an existing Rust application, or you need to wire up custom I/O like DPDK, a game engine's event loop, or a hardware interface. Embedded mode is designed for these cases.
Instead of producing self-contained binaries, embedded mode generates a Rust source file containing one plain function per location. Each function returns a Dfir dataflow graph that you drive manually. You decide when to tick it, how to feed it data, and where the outputs go.
When to Use Embedded Mode
- Incremental adoption: You have an existing Rust codebase and want to introduce Hydro for a specific computation without restructuring your application around Hydro Deploy.
- Custom I/O: You need to connect Hydro to a transport or runtime that Hydro Deploy doesn't support (DPDK, shared memory, a game loop, etc.).
- Library use: You want to ship a crate that uses Hydro internally but exposes a normal Rust API to consumers.
Embedded mode only supports local, single-process computation and process-to-process (o2o) networking. Cluster networking, external ports, and other multi-node patterns are not yet available.
When your Hydro program sends data between processes, the generated functions gain additional network_out and network_in parameters. You wire these up yourself — for example with in-memory channels, Unix sockets, or any transport you like. See Networking below for details.
How It Works
Embedded mode uses a build.rs script to compile your Hydro program at build time. The generated code is then include!-ed into your crate. The workflow has three parts:
- Define your Hydro logic using
embedded_inputandembedded_outputto mark where data enters and leaves the dataflow. - Generate code in
build.rsusinggenerate_embedded. - Call the generated function from your application, passing in streams and output callbacks.
Let's walk through a complete example that capitalizes strings using Hydro in embedded mode.
1. Define the Hydro Logic
Write your Hydro function as usual, but use embedded_input to create an input stream and embedded_output to mark where results should be emitted:
use hydro_lang::prelude::*;
pub fn capitalize<'a>(input: Stream<String, Process<'a, ()>>) {
input
.map(q!(|s| s.to_uppercase()))
.embedded_output("output");
}
embedded_input creates a stream parameter on the generated function — the name you pass ("input") becomes the parameter name. Similarly, embedded_output creates a field on a generated EmbeddedOutputs struct — the name ("output") becomes the field name. The output field accepts an impl FnMut(T) closure that will be called for each emitted element.
Your base crate also needs to declare an empty stageleft_macro_entrypoint feature. This is required by the Stageleft code generation machinery to set up the correct re-exports:
[features]
stageleft_macro_entrypoint = []
2. Generate Code in build.rs
Create a wrapper crate that depends on your Hydro crate.
Your wrapper crate needs the following dependencies:
[dependencies]
hydro_lang = { version = "...", features = ["runtime_support"] }
my_hydro_crate = { path = "../my_hydro_crate", features = ["stageleft_macro_entrypoint"] }
[build-dependencies]
hydro_lang = { version = "...", features = ["build"] }
my_hydro_crate = { path = "../my_hydro_crate" }
prettyplease = { version = "0.2.0", features = ["verbatim"] }
[dev-dependencies]
dfir_rs = { version = "..." }
tokio = { version = "1", features = ["full"] }
The runtime_support feature is needed at runtime to provide the DFIR runtime. The build feature is needed in build-dependencies for the code generation APIs.
In its build.rs, construct a FlowBuilder, wire up the logic, and call generate_embedded:
use hydro_lang::location::Location;
fn main() {
println!("cargo::rerun-if-changed=build.rs");
let mut flow = hydro_lang::compile::builder::FlowBuilder::new();
let process = flow.process::<()>();
// Wire up the Hydro logic with an embedded input.
my_hydro_crate::capitalize(process.embedded_input("input"));
// Compile and generate the embedded code.
let code = flow
.with_process(&process, "capitalize")
.generate_embedded("my_hydro_crate");
let out_dir = std::env::var("OUT_DIR").unwrap();
std::fs::write(
format!("{out_dir}/embedded.rs"),
prettyplease::unparse(&code),
)
.unwrap();
}
The string "capitalize" passed to with_process becomes the name of the generated function. The argument to generate_embedded is the name of the crate containing your Hydro logic (hyphens are automatically replaced with underscores).
3. Include and Use the Generated Code
In your wrapper crate's lib.rs, include the generated file:
#[allow(unused_imports, missing_docs)]
pub mod embedded {
include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
}
Now you can call the generated function from anywhere in your application:
use dfir_rs::futures;
async fn run() {
// Create an input stream from any source you like.
let input = futures::stream::iter(vec![
"hello".to_owned(),
"world".to_owned(),
]);
// Collect outputs via a closure.
let mut results = vec![];
let mut outputs = embedded::capitalize::EmbeddedOutputs {
output: |s: String| {
results.push(s);
},
};
// Build and run the dataflow.
let mut flow = embedded::capitalize(input, &mut outputs);
tokio::task::LocalSet::new()
.run_until(flow.run_available())
.await;
drop(flow);
assert_eq!(results, vec!["HELLO", "WORLD"]);
}
The generated function accepts your input streams as impl Stream<Item = T> + Unpin parameters and a mutable reference to the EmbeddedOutputs struct. It returns a Dfir graph that you run with run_available() (or tick manually).
Networking
Embedded mode supports process-to-process (o2o) networking. When your Hydro program uses .send() between two processes, the generated functions get extra parameters so you can wire up the transport yourself.
Network channels in embedded mode must be named. Use .name() on the networking config:
input.send(receiver, TCP.fail_stop().bincode().name("messages"))
The name becomes a field name in the generated structs, so it must be a valid Rust identifier. Consider a two-process program where Sender sends to Receiver over a channel named "messages". Sender gets an EmbeddedNetworkOut struct parameter with an FnMut(Bytes) field per outgoing channel:
pub mod echo_sender {
pub struct EmbeddedNetworkOut<F: FnMut(Bytes)> {
pub messages: F,
}
}
pub fn echo_sender<'a, F: FnMut(Bytes) + 'a>(
input: impl Stream<Item = String> + Unpin + 'a,
__network_out: &'a mut echo_sender::EmbeddedNetworkOut<F>,
) -> Dfir<'a> { ... }
On the other side, Receiver gets an EmbeddedNetworkIn struct parameter with a Stream<Item = Result<BytesMut, io::Error>> field per incoming channel:
pub mod echo_receiver {
pub struct EmbeddedNetworkIn<S: Stream<Item = Result<BytesMut, io::Error>> + Unpin> {
pub messages: S,
}
}
pub fn echo_receiver<'a, S: Stream<Item = Result<BytesMut, io::Error>> + Unpin + 'a>(
__outputs: &'a mut echo_receiver::EmbeddedOutputs<...>,
__network_in: echo_receiver::EmbeddedNetworkIn<S>,
) -> Dfir<'a> { ... }
Hydro automatically handles serialization — just shuttle the raw bytes between sender and receiver. You must ensure that you are preserving any guarantees of the network protocol specified in the Hydro program. For example, if the channel uses TCP, you must ensure that your networking mechanism preserves ordering and exactly-once delivery.