dfir_lang/graph/ops/
dest_sink_serde.rs

1use quote::quote_spanned;
2
3use super::{
4    make_missing_runtime_msg, OperatorCategory, OperatorConstraints,
5    OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
6};
7
8/// > Arguments: A [serializing async `Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html).
9///
10/// Consumes (payload, addr) pairs by serializing the payload and sending the resulting pair to an [async `Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html)
11/// that delivers them to the `SocketAddr` specified by `addr`.
12///
13/// Note this operator must be used within a Tokio runtime.
14/// ```rustbook
15/// async fn serde_out() {
16///     let addr = dfir_rs::util::ipv4_resolve("localhost:9000".into()).unwrap();
17///     let (outbound, inbound, _) = dfir_rs::util::bind_udp_bytes(addr).await;
18///     let remote = dfir_rs::util::ipv4_resolve("localhost:9001".into()).unwrap();
19///     let mut flow = dfir_rs::dfir_syntax! {
20///         source_iter(vec![("hello".to_string(), 1), ("world".to_string(), 2)])
21///             -> map (|m| (m, remote)) -> dest_sink_serde(outbound);
22///     };
23///     flow.run_available();
24/// }
25/// ```
26pub const DEST_SINK_SERDE: OperatorConstraints = OperatorConstraints {
27    name: "dest_sink_serde",
28    categories: &[OperatorCategory::Sink],
29    hard_range_inn: RANGE_1,
30    soft_range_inn: RANGE_1,
31    hard_range_out: RANGE_0,
32    soft_range_out: RANGE_0,
33    num_args: 1,
34    persistence_args: RANGE_0,
35    type_args: RANGE_0,
36    is_external_input: false,
37    has_singleton_output: false,
38    flo_type: None,
39    ports_inn: None,
40    ports_out: None,
41    input_delaytype_fn: |_| None,
42    write_fn: |wc @ &WriteContextArgs {
43                   root,
44                   op_span,
45                   ident,
46                   op_name,
47                   ..
48               },
49               diagnostics| {
50        let missing_runtime_msg = make_missing_runtime_msg(op_name);
51
52        let OperatorWriteOutput {
53            write_prologue,
54            write_iterator,
55            write_iterator_after,
56        } = (super::dest_sink::DEST_SINK.write_fn)(wc, diagnostics)?;
57
58        let write_iterator = quote_spanned! {op_span=>
59            ::std::debug_assert!(#root::tokio::runtime::Handle::try_current().is_ok(), #missing_runtime_msg);
60            #write_iterator
61            let #ident = #root::pusherator::map::Map::new(
62                |(payload, addr)| (#root::util::serialize_to_bytes(payload), addr),
63                #ident,
64            );
65        };
66
67        Ok(OperatorWriteOutput {
68            write_prologue,
69            write_iterator,
70            write_iterator_after,
71        })
72    },
73};