Struct dfir_rs::scheduled::graph::Dfir

source ·
pub struct Dfir<'a> { /* private fields */ }
Expand description

A DFIR graph. Owns, schedules, and runs the compiled subgraphs.

Implementations§

source§

impl Dfir<'_>

Methods for TeeingHandoff teeing and dropping.

source

pub fn teeing_handoff_tee<T>( &mut self, tee_parent_port: &RecvPort<TeeingHandoff<T>>, ) -> RecvPort<TeeingHandoff<T>>
where T: Clone,

Tees a TeeingHandoff.

source

pub fn teeing_handoff_drop<T>(&mut self, tee_port: RecvPort<TeeingHandoff<T>>)
where T: Clone,

Marks an output of a TeeingHandoff as dropped so that no more data will be sent to it.

It is recommended to not not use this method and instead simply avoid teeing a TeeingHandoff when it is not needed.

source§

impl<'a> Dfir<'a>

source

pub fn new() -> Self

Create a new empty Hydroflow graph.

source

pub fn meta_graph(&self) -> Option<&DfirGraph>

Return a handle to the meta HydroflowGraph if set. The HydroflowGraph is a representation of all the operators, subgraphs, and handoffs in this Hydroflow` instance. Will only be set if this graph was constructed using a surface syntax macro.

source

pub fn diagnostics(&self) -> Option<&[Diagnostic<SerdeSpan>]>

Returns any diagnostics generated by the surface syntax macro. Each diagnostic is a pair of (1) a Diagnostic with span info reset and (2) the ToString version of the diagnostic with original span info. Will only be set if this graph was constructed using a surface syntax macro.

source

pub fn reactor(&self) -> Reactor

Returns a reactor for externally scheduling subgraphs, possibly from another thread. Reactor events are considered to be external events.

source

pub fn current_tick(&self) -> TickInstant

Gets the current tick (local time) count.

source

pub fn current_stratum(&self) -> usize

Gets the current stratum nubmer.

source

pub fn run_tick(&mut self) -> bool

Runs the dataflow until the next tick begins. Returns true if any work was done.

source

pub fn run_available(&mut self) -> bool

Runs the dataflow until no more (externally-triggered) work is immediately available. Runs at least one tick of dataflow, even if no external events have been received. If the dataflow contains loops this method may run forever. Returns true if any work was done.

source

pub async fn run_available_async(&mut self) -> bool

Runs the dataflow until no more (externally-triggered) work is immediately available. Runs at least one tick of dataflow, even if no external events have been received. If the dataflow contains loops this method may run forever. Returns true if any work was done. Yields repeatedly to allow external events to happen.

source

pub fn run_stratum(&mut self) -> bool

Runs the current stratum of the dataflow until no more local work is available (does not receive events). Returns true if any work was done.

source

pub fn next_stratum(&mut self, current_tick_only: bool) -> bool

Go to the next stratum which has work available, possibly the current stratum. Return true if more work is available, otherwise false if no work is immediately available on any strata.

This will receive external events when at the start of a tick.

If current_tick_only is set to true, will only return true if work is immediately available on the current tick.

If this returns false then the graph will be at the start of a tick (at stratum 0, can receive more external events).

source

pub fn run(&mut self) -> Option<Never>

Runs the dataflow graph forever.

TODO(mingwei): Currently blocks forever, no notion of “completion.”

source

pub async fn run_async(&mut self) -> Option<Never>

Runs the dataflow graph forever.

TODO(mingwei): Currently blocks forever, no notion of “completion.”

source

pub fn try_recv_events(&mut self) -> usize

Enqueues subgraphs triggered by events without blocking.

Returns the number of subgraphs enqueued, and if any were external.

source

pub fn recv_events(&mut self) -> Option<usize>

Enqueues subgraphs triggered by external events, blocking until at least one subgraph is scheduled from an external event.

source

pub async fn recv_events_async(&mut self) -> Option<usize>

Enqueues subgraphs triggered by external events asynchronously, waiting until at least one subgraph is scheduled from an external event. Returns the number of subgraphs enqueued, which may be zero if an external event scheduled an already-scheduled subgraph.

Returns None if the event queue is closed, but that should not happen normally.

source

pub fn schedule_subgraph(&mut self, sg_id: SubgraphId) -> bool

Schedules a subgraph to be run. See also: Context::schedule_subgraph.

source

pub fn add_subgraph<Name, R, W, F>( &mut self, name: Name, recv_ports: R, send_ports: W, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, R: 'static + PortList<RECV>, W: 'static + PortList<SEND>, F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>),

Adds a new compiled subgraph with the specified inputs and outputs in stratum 0.

source

pub fn add_subgraph_stratified<Name, R, W, F>( &mut self, name: Name, stratum: usize, recv_ports: R, send_ports: W, laziness: bool, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, R: 'static + PortList<RECV>, W: 'static + PortList<SEND>, F: 'a + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>),

Adds a new compiled subgraph with the specified inputs, outputs, and stratum number.

TODO(mingwei): add example in doc.

source

pub fn add_subgraph_n_m<Name, R, W, F>( &mut self, name: Name, recv_ports: Vec<RecvPort<R>>, send_ports: Vec<SendPort<W>>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, R: 'static + Handoff, W: 'static + Handoff, F: 'static + for<'ctx> FnMut(&'ctx mut Context, &'ctx [&'ctx RecvCtx<R>], &'ctx [&'ctx SendCtx<W>]),

Adds a new compiled subgraph with a variable number of inputs and outputs of the same respective handoff types.

source

pub fn add_subgraph_stratified_n_m<Name, R, W, F>( &mut self, name: Name, stratum: usize, recv_ports: Vec<RecvPort<R>>, send_ports: Vec<SendPort<W>>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, R: 'static + Handoff, W: 'static + Handoff, F: 'static + for<'ctx> FnMut(&'ctx mut Context, &'ctx [&'ctx RecvCtx<R>], &'ctx [&'ctx SendCtx<W>]),

Adds a new compiled subgraph with a variable number of inputs and outputs of the same respective handoff types.

source

pub fn make_edge<Name, H>(&mut self, name: Name) -> (SendPort<H>, RecvPort<H>)
where Name: Into<Cow<'static, str>>, H: 'static + Handoff,

Creates a handoff edge and returns the corresponding send and receive ports.

source

pub fn add_state<T>(&mut self, state: T) -> StateHandle<T>
where T: Any,

Adds referenceable state into the Hydroflow instance. Returns a state handle which can be used externally or by operators to access the state.

This is part of the “state API”.

source

pub fn set_state_tick_hook<T>( &mut self, handle: StateHandle<T>, tick_hook_fn: impl 'static + FnMut(&mut T), )
where T: Any,

Sets a hook to modify the state at the end of each tick, using the supplied closure.

This is part of the “state API”.

source

pub fn context_mut(&mut self, sg_id: SubgraphId) -> &mut Context

Gets a exclusive (mut) ref to the internal context, setting the subgraph ID.

source§

impl Dfir<'_>

source

pub fn request_task<Fut>(&mut self, future: Fut)
where Fut: Future<Output = ()> + 'static,

source

pub fn abort_tasks(&mut self)

source

pub fn join_tasks(&mut self) -> impl '_ + Future

source§

impl Dfir<'_>

source

pub async fn inbound_tcp_vertex_port<T>( &mut self, port: u16, ) -> RecvPort<VecHandoff<T>>
where T: 'static + DeserializeOwned + Send,

source

pub async fn inbound_tcp_vertex<T>(&mut self) -> (u16, RecvPort<VecHandoff<T>>)
where T: 'static + DeserializeOwned + Send,

source

pub async fn outbound_tcp_vertex<T>( &mut self, ) -> SendPort<VecHandoff<(Address, T)>>
where T: 'static + Serialize + Send,

source§

impl Dfir<'_>

source

pub fn add_write_tcp_stream( &mut self, stream: TcpStream, ) -> SendPort<VecHandoff<Message>>

source

pub fn add_read_tcp_stream( &mut self, stream: TcpStream, ) -> RecvPort<VecHandoff<Message>>

source

pub fn add_tcp_stream( &mut self, stream: TcpStream, ) -> (SendPort<VecHandoff<Message>>, RecvPort<VecHandoff<Message>>)

Trait Implementations§

source§

impl<'a> Default for Dfir<'a>

source§

fn default() -> Dfir<'a>

Returns the “default value” for a type. Read more
source§

impl Drop for Dfir<'_>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl GraphExt for Dfir<'_>

source§

fn add_subgraph_sink<Name, F, R>( &mut self, name: Name, recv_port: RecvPort<R>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &RecvCtx<R>), R: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_subgraph_2sink<Name, F, R1, R2>( &mut self, name: Name, recv_port_1: RecvPort<R1>, recv_port_2: RecvPort<R2>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &RecvCtx<R1>, &RecvCtx<R2>), R1: 'static + Handoff, R2: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_subgraph_source<Name, F, W>( &mut self, name: Name, send_port: SendPort<W>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &SendCtx<W>), W: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_subgraph_in_out<Name, F, R, W>( &mut self, name: Name, recv_port: RecvPort<R>, send_port: SendPort<W>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &RecvCtx<R>, &SendCtx<W>), R: 'static + Handoff, W: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_subgraph_in_2out<Name, F, R, W1, W2>( &mut self, name: Name, recv_port: RecvPort<R>, send_port_1: SendPort<W1>, send_port_2: SendPort<W2>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &RecvCtx<R>, &SendCtx<W1>, &SendCtx<W2>), R: 'static + Handoff, W1: 'static + Handoff, W2: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_subgraph_2in_out<Name, F, R1, R2, W>( &mut self, name: Name, recv_port_1: RecvPort<R1>, recv_port_2: RecvPort<R2>, send_port: SendPort<W>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &RecvCtx<R1>, &RecvCtx<R2>, &SendCtx<W>), R1: 'static + Handoff, R2: 'static + Handoff, W: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_subgraph_2in_2out<Name, F, R1, R2, W1, W2>( &mut self, name: Name, recv_port_1: RecvPort<R1>, recv_port_2: RecvPort<R2>, send_port_1: SendPort<W1>, send_port_2: SendPort<W2>, subgraph: F, ) -> SubgraphId
where Name: Into<Cow<'static, str>>, F: 'static + FnMut(&Context, &RecvCtx<R1>, &RecvCtx<R2>, &SendCtx<W1>, &SendCtx<W2>), R1: 'static + Handoff, R2: 'static + Handoff, W1: 'static + Handoff, W2: 'static + Handoff,

Adds a subgraph with specific topology: Read more
source§

fn add_channel_input<Name, T, W>( &mut self, name: Name, send_port: SendPort<W>, ) -> Input<T, SyncSender<T>>
where Name: Into<Cow<'static, str>>, T: 'static, W: 'static + Handoff + CanReceive<T>,

Adds a channel input which sends to the send_port.
source§

fn add_input<Name, T, W>( &mut self, name: Name, send_port: SendPort<W>, ) -> Input<T, Buffer<T>>
where Name: Into<Cow<'static, str>>, T: 'static, W: 'static + Handoff + CanReceive<T>,

Adds an “input” operator, returning a handle to insert data into it. TODO(justin): make this thing work better
source§

fn add_input_from_stream<Name, T, W, S>( &mut self, name: Name, send_port: SendPort<W>, stream: S, )
where Name: Into<Cow<'static, str>>, S: 'static + Stream<Item = T>, W: 'static + Handoff + CanReceive<T>,

Adds a subgraph which pulls from the async stream and sends to the send_port.

Auto Trait Implementations§

§

impl<'a> Freeze for Dfir<'a>

§

impl<'a> !RefUnwindSafe for Dfir<'a>

§

impl<'a> !Send for Dfir<'a>

§

impl<'a> !Sync for Dfir<'a>

§

impl<'a> Unpin for Dfir<'a>

§

impl<'a> !UnwindSafe for Dfir<'a>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more