pub struct Dfir<'a> { /* private fields */ }
Expand description
A DFIR graph. Owns, schedules, and runs the compiled subgraphs.
Implementations§
source§impl Dfir<'_>
impl Dfir<'_>
Methods for TeeingHandoff
teeing and dropping.
sourcepub fn teeing_handoff_tee<T>(
&mut self,
tee_parent_port: &RecvPort<TeeingHandoff<T>>,
) -> RecvPort<TeeingHandoff<T>>where
T: Clone,
pub fn teeing_handoff_tee<T>(
&mut self,
tee_parent_port: &RecvPort<TeeingHandoff<T>>,
) -> RecvPort<TeeingHandoff<T>>where
T: Clone,
Tees a TeeingHandoff
.
sourcepub fn teeing_handoff_drop<T>(&mut self, tee_port: RecvPort<TeeingHandoff<T>>)where
T: Clone,
pub fn teeing_handoff_drop<T>(&mut self, tee_port: RecvPort<TeeingHandoff<T>>)where
T: Clone,
Marks an output of a TeeingHandoff
as dropped so that no more data will be sent to it.
It is recommended to not not use this method and instead simply avoid teeing a
TeeingHandoff
when it is not needed.
source§impl<'a> Dfir<'a>
impl<'a> Dfir<'a>
sourcepub fn meta_graph(&self) -> Option<&DfirGraph>
pub fn meta_graph(&self) -> Option<&DfirGraph>
Return a handle to the meta HydroflowGraph
if set. The HydroflowGraph is a representation of all the operators, subgraphs, and handoffs in this
Hydroflow` instance.
Will only be set if this graph was constructed using a surface syntax macro.
sourcepub fn diagnostics(&self) -> Option<&[Diagnostic<SerdeSpan>]>
pub fn diagnostics(&self) -> Option<&[Diagnostic<SerdeSpan>]>
Returns any diagnostics generated by the surface syntax macro. Each diagnostic is a pair of
(1) a Diagnostic
with span info reset and (2) the ToString
version of the diagnostic
with original span info.
Will only be set if this graph was constructed using a surface syntax macro.
sourcepub fn reactor(&self) -> Reactor
pub fn reactor(&self) -> Reactor
Returns a reactor for externally scheduling subgraphs, possibly from another thread. Reactor events are considered to be external events.
sourcepub fn current_tick(&self) -> TickInstant
pub fn current_tick(&self) -> TickInstant
Gets the current tick (local time) count.
sourcepub fn current_stratum(&self) -> usize
pub fn current_stratum(&self) -> usize
Gets the current stratum nubmer.
sourcepub fn run_tick(&mut self) -> bool
pub fn run_tick(&mut self) -> bool
Runs the dataflow until the next tick begins. Returns true if any work was done.
sourcepub fn run_available(&mut self) -> bool
pub fn run_available(&mut self) -> bool
Runs the dataflow until no more (externally-triggered) work is immediately available. Runs at least one tick of dataflow, even if no external events have been received. If the dataflow contains loops this method may run forever. Returns true if any work was done.
sourcepub async fn run_available_async(&mut self) -> bool
pub async fn run_available_async(&mut self) -> bool
Runs the dataflow until no more (externally-triggered) work is immediately available. Runs at least one tick of dataflow, even if no external events have been received. If the dataflow contains loops this method may run forever. Returns true if any work was done. Yields repeatedly to allow external events to happen.
sourcepub fn run_stratum(&mut self) -> bool
pub fn run_stratum(&mut self) -> bool
Runs the current stratum of the dataflow until no more local work is available (does not receive events). Returns true if any work was done.
sourcepub fn next_stratum(&mut self, current_tick_only: bool) -> bool
pub fn next_stratum(&mut self, current_tick_only: bool) -> bool
Go to the next stratum which has work available, possibly the current stratum. Return true if more work is available, otherwise false if no work is immediately available on any strata.
This will receive external events when at the start of a tick.
If current_tick_only
is set to true
, will only return true
if work is immediately
available on the current tick.
If this returns false then the graph will be at the start of a tick (at stratum 0, can receive more external events).
sourcepub fn run(&mut self) -> Option<Never>
pub fn run(&mut self) -> Option<Never>
Runs the dataflow graph forever.
TODO(mingwei): Currently blocks forever, no notion of “completion.”
sourcepub async fn run_async(&mut self) -> Option<Never>
pub async fn run_async(&mut self) -> Option<Never>
Runs the dataflow graph forever.
TODO(mingwei): Currently blocks forever, no notion of “completion.”
sourcepub fn try_recv_events(&mut self) -> usize
pub fn try_recv_events(&mut self) -> usize
Enqueues subgraphs triggered by events without blocking.
Returns the number of subgraphs enqueued, and if any were external.
sourcepub fn recv_events(&mut self) -> Option<usize>
pub fn recv_events(&mut self) -> Option<usize>
Enqueues subgraphs triggered by external events, blocking until at least one subgraph is scheduled from an external event.
sourcepub async fn recv_events_async(&mut self) -> Option<usize>
pub async fn recv_events_async(&mut self) -> Option<usize>
Enqueues subgraphs triggered by external events asynchronously, waiting until at least one subgraph is scheduled from an external event. Returns the number of subgraphs enqueued, which may be zero if an external event scheduled an already-scheduled subgraph.
Returns None
if the event queue is closed, but that should not happen normally.
sourcepub fn schedule_subgraph(&mut self, sg_id: SubgraphId) -> bool
pub fn schedule_subgraph(&mut self, sg_id: SubgraphId) -> bool
Schedules a subgraph to be run. See also: Context::schedule_subgraph
.
sourcepub fn add_subgraph<Name, R, W, F>(
&mut self,
name: Name,
recv_ports: R,
send_ports: W,
subgraph: F,
) -> SubgraphId
pub fn add_subgraph<Name, R, W, F>( &mut self, name: Name, recv_ports: R, send_ports: W, subgraph: F, ) -> SubgraphId
Adds a new compiled subgraph with the specified inputs and outputs in stratum 0.
sourcepub fn add_subgraph_stratified<Name, R, W, F>(
&mut self,
name: Name,
stratum: usize,
recv_ports: R,
send_ports: W,
laziness: bool,
subgraph: F,
) -> SubgraphId
pub fn add_subgraph_stratified<Name, R, W, F>( &mut self, name: Name, stratum: usize, recv_ports: R, send_ports: W, laziness: bool, subgraph: F, ) -> SubgraphId
Adds a new compiled subgraph with the specified inputs, outputs, and stratum number.
TODO(mingwei): add example in doc.
sourcepub fn add_subgraph_n_m<Name, R, W, F>(
&mut self,
name: Name,
recv_ports: Vec<RecvPort<R>>,
send_ports: Vec<SendPort<W>>,
subgraph: F,
) -> SubgraphId
pub fn add_subgraph_n_m<Name, R, W, F>( &mut self, name: Name, recv_ports: Vec<RecvPort<R>>, send_ports: Vec<SendPort<W>>, subgraph: F, ) -> SubgraphId
Adds a new compiled subgraph with a variable number of inputs and outputs of the same respective handoff types.
sourcepub fn add_subgraph_stratified_n_m<Name, R, W, F>(
&mut self,
name: Name,
stratum: usize,
recv_ports: Vec<RecvPort<R>>,
send_ports: Vec<SendPort<W>>,
subgraph: F,
) -> SubgraphId
pub fn add_subgraph_stratified_n_m<Name, R, W, F>( &mut self, name: Name, stratum: usize, recv_ports: Vec<RecvPort<R>>, send_ports: Vec<SendPort<W>>, subgraph: F, ) -> SubgraphId
Adds a new compiled subgraph with a variable number of inputs and outputs of the same respective handoff types.
sourcepub fn make_edge<Name, H>(&mut self, name: Name) -> (SendPort<H>, RecvPort<H>)
pub fn make_edge<Name, H>(&mut self, name: Name) -> (SendPort<H>, RecvPort<H>)
Creates a handoff edge and returns the corresponding send and receive ports.
sourcepub fn add_state<T>(&mut self, state: T) -> StateHandle<T>where
T: Any,
pub fn add_state<T>(&mut self, state: T) -> StateHandle<T>where
T: Any,
Adds referenceable state into the Hydroflow
instance. Returns a state handle which can be
used externally or by operators to access the state.
This is part of the “state API”.
sourcepub fn set_state_tick_hook<T>(
&mut self,
handle: StateHandle<T>,
tick_hook_fn: impl 'static + FnMut(&mut T),
)where
T: Any,
pub fn set_state_tick_hook<T>(
&mut self,
handle: StateHandle<T>,
tick_hook_fn: impl 'static + FnMut(&mut T),
)where
T: Any,
Sets a hook to modify the state at the end of each tick, using the supplied closure.
This is part of the “state API”.
sourcepub fn context_mut(&mut self, sg_id: SubgraphId) -> &mut Context
pub fn context_mut(&mut self, sg_id: SubgraphId) -> &mut Context
Gets a exclusive (mut) ref to the internal context, setting the subgraph ID.
source§impl Dfir<'_>
impl Dfir<'_>
sourcepub fn request_task<Fut>(&mut self, future: Fut)
pub fn request_task<Fut>(&mut self, future: Fut)
Alias for Context::request_task
.
sourcepub fn abort_tasks(&mut self)
pub fn abort_tasks(&mut self)
Alias for Context::abort_tasks
.
sourcepub fn join_tasks(&mut self) -> impl '_ + Future
pub fn join_tasks(&mut self) -> impl '_ + Future
Alias for Context::join_tasks
.
source§impl Dfir<'_>
impl Dfir<'_>
pub async fn inbound_tcp_vertex_port<T>(
&mut self,
port: u16,
) -> RecvPort<VecHandoff<T>>where
T: 'static + DeserializeOwned + Send,
pub async fn inbound_tcp_vertex<T>(&mut self) -> (u16, RecvPort<VecHandoff<T>>)where
T: 'static + DeserializeOwned + Send,
pub async fn outbound_tcp_vertex<T>( &mut self, ) -> SendPort<VecHandoff<(Address, T)>>
source§impl Dfir<'_>
impl Dfir<'_>
pub fn add_write_tcp_stream( &mut self, stream: TcpStream, ) -> SendPort<VecHandoff<Message>>
pub fn add_read_tcp_stream( &mut self, stream: TcpStream, ) -> RecvPort<VecHandoff<Message>>
pub fn add_tcp_stream( &mut self, stream: TcpStream, ) -> (SendPort<VecHandoff<Message>>, RecvPort<VecHandoff<Message>>)
Trait Implementations§
source§impl GraphExt for Dfir<'_>
impl GraphExt for Dfir<'_>
source§fn add_subgraph_sink<Name, F, R>(
&mut self,
name: Name,
recv_port: RecvPort<R>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_sink<Name, F, R>( &mut self, name: Name, recv_port: RecvPort<R>, subgraph: F, ) -> SubgraphId
source§fn add_subgraph_2sink<Name, F, R1, R2>(
&mut self,
name: Name,
recv_port_1: RecvPort<R1>,
recv_port_2: RecvPort<R2>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_2sink<Name, F, R1, R2>( &mut self, name: Name, recv_port_1: RecvPort<R1>, recv_port_2: RecvPort<R2>, subgraph: F, ) -> SubgraphId
source§fn add_subgraph_source<Name, F, W>(
&mut self,
name: Name,
send_port: SendPort<W>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_source<Name, F, W>( &mut self, name: Name, send_port: SendPort<W>, subgraph: F, ) -> SubgraphId
source§fn add_subgraph_in_out<Name, F, R, W>(
&mut self,
name: Name,
recv_port: RecvPort<R>,
send_port: SendPort<W>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_in_out<Name, F, R, W>( &mut self, name: Name, recv_port: RecvPort<R>, send_port: SendPort<W>, subgraph: F, ) -> SubgraphId
source§fn add_subgraph_in_2out<Name, F, R, W1, W2>(
&mut self,
name: Name,
recv_port: RecvPort<R>,
send_port_1: SendPort<W1>,
send_port_2: SendPort<W2>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_in_2out<Name, F, R, W1, W2>( &mut self, name: Name, recv_port: RecvPort<R>, send_port_1: SendPort<W1>, send_port_2: SendPort<W2>, subgraph: F, ) -> SubgraphId
source§fn add_subgraph_2in_out<Name, F, R1, R2, W>(
&mut self,
name: Name,
recv_port_1: RecvPort<R1>,
recv_port_2: RecvPort<R2>,
send_port: SendPort<W>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_2in_out<Name, F, R1, R2, W>( &mut self, name: Name, recv_port_1: RecvPort<R1>, recv_port_2: RecvPort<R2>, send_port: SendPort<W>, subgraph: F, ) -> SubgraphId
source§fn add_subgraph_2in_2out<Name, F, R1, R2, W1, W2>(
&mut self,
name: Name,
recv_port_1: RecvPort<R1>,
recv_port_2: RecvPort<R2>,
send_port_1: SendPort<W1>,
send_port_2: SendPort<W2>,
subgraph: F,
) -> SubgraphId
fn add_subgraph_2in_2out<Name, F, R1, R2, W1, W2>( &mut self, name: Name, recv_port_1: RecvPort<R1>, recv_port_2: RecvPort<R2>, send_port_1: SendPort<W1>, send_port_2: SendPort<W2>, subgraph: F, ) -> SubgraphId
source§fn add_channel_input<Name, T, W>(
&mut self,
name: Name,
send_port: SendPort<W>,
) -> Input<T, SyncSender<T>>
fn add_channel_input<Name, T, W>( &mut self, name: Name, send_port: SendPort<W>, ) -> Input<T, SyncSender<T>>
send_port
.source§fn add_input<Name, T, W>(
&mut self,
name: Name,
send_port: SendPort<W>,
) -> Input<T, Buffer<T>>
fn add_input<Name, T, W>( &mut self, name: Name, send_port: SendPort<W>, ) -> Input<T, Buffer<T>>
source§fn add_input_from_stream<Name, T, W, S>(
&mut self,
name: Name,
send_port: SendPort<W>,
stream: S,
)where
Name: Into<Cow<'static, str>>,
S: 'static + Stream<Item = T>,
W: 'static + Handoff + CanReceive<T>,
fn add_input_from_stream<Name, T, W, S>(
&mut self,
name: Name,
send_port: SendPort<W>,
stream: S,
)where
Name: Into<Cow<'static, str>>,
S: 'static + Stream<Item = T>,
W: 'static + Handoff + CanReceive<T>,
send_port
.Auto Trait Implementations§
impl<'a> Freeze for Dfir<'a>
impl<'a> !RefUnwindSafe for Dfir<'a>
impl<'a> !Send for Dfir<'a>
impl<'a> !Sync for Dfir<'a>
impl<'a> Unpin for Dfir<'a>
impl<'a> !UnwindSafe for Dfir<'a>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more