dfir_lang/graph/ops/
dest_sink.rs

1use quote::quote_spanned;
2
3use super::{
4    OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
5};
6
7/// > Arguments: An [async `Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html).
8///
9/// Consumes items by sending them to an [async `Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html).
10/// A `Sink` is a thing into which values can be sent, asynchronously. For example, sending items
11/// into a bounded channel.
12///
13/// Note this operator must be used within a Tokio runtime, and the DFIR program must be launched with `run_async`.
14///
15/// ```rustbook
16/// # #[dfir_rs::main]
17/// # async fn main() {
18/// // In this example we use a _bounded_ channel for our `Sink`. This is for demonstration only,
19/// // instead you should use [`dfir_rs::util::unbounded_channel`]. A bounded channel results in
20/// // buffering items internally instead of within the channel. (We can't use
21/// // unbounded here since unbounded channels are synchonous to write to and therefore not
22/// // `Sink`s.)
23/// let (send, recv) = tokio::sync::mpsc::channel::<usize>(5);
24/// // `PollSender` adapts the send half of the bounded channel into a `Sink`.
25/// let send = tokio_util::sync::PollSender::new(send);
26///
27/// let mut flow = dfir_rs::dfir_syntax! {
28///     source_iter(0..10) -> dest_sink(send);
29/// };
30/// // Call `run_async()` to allow async events to propagate, run for one second.
31/// tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
32///     .await
33///     .expect_err("Expected time out");
34///
35/// let mut recv = tokio_stream::wrappers::ReceiverStream::new(recv);
36/// // Only 5 elements received due to buffer size.
37/// // (Note that if we were using a multi-threaded executor instead of `current_thread` it would
38/// // be possible for more items to be added as they're removed, resulting in >5 collected.)
39/// let out: Vec<_> = dfir_rs::util::ready_iter(&mut recv).collect();
40/// assert_eq!(&[0, 1, 2, 3, 4], &*out);
41/// # }
42/// ```
43///
44/// `Sink` is different from [`AsyncWrite`](https://docs.rs/futures/latest/futures/io/trait.AsyncWrite.html).
45/// Instead of discrete values we send arbitrary streams of bytes into an `AsyncWrite` value. For
46/// example, writings a stream of bytes to a file, a socket, or stdout.
47///
48/// To handle those situations we can use a codec from [`tokio_util::codec`](https://docs.rs/tokio-util/latest/tokio_util/codec/index.html).
49/// These specify ways in which the byte stream is broken into individual items, such as with
50/// newlines or with length delineation.
51///
52/// If we only want to write a stream of bytes without delineation we can use the [`BytesCodec`](https://docs.rs/tokio-util/latest/tokio_util/codec/struct.BytesCodec.html).
53///
54/// In this example we use a [`duplex`](https://docs.rs/tokio/latest/tokio/io/fn.duplex.html) as our `AsyncWrite` with a
55/// `BytesCodec`.
56///
57/// ```rustbook
58/// # #[dfir_rs::main]
59/// # async fn main() {
60/// use bytes::Bytes;
61/// use tokio::io::AsyncReadExt;
62///
63/// // Like a channel, but for a stream of bytes instead of discrete objects.
64/// let (asyncwrite, mut asyncread) = tokio::io::duplex(256);
65/// // Now instead handle discrete byte strings by length-encoding them.
66/// let sink = tokio_util::codec::FramedWrite::new(asyncwrite, tokio_util::codec::BytesCodec::new());
67///
68/// let mut flow = dfir_rs::dfir_syntax! {
69///     source_iter([
70///         Bytes::from_static(b"hello"),
71///         Bytes::from_static(b"world"),
72///     ]) -> dest_sink(sink);
73/// };
74/// tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
75///     .await
76///     .expect_err("Expected time out");
77///
78/// let mut buf = Vec::<u8>::new();
79/// asyncread.read_buf(&mut buf).await.unwrap();
80/// assert_eq!(b"helloworld", &*buf);
81/// # }
82/// ```
83pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
84    name: "dest_sink",
85    categories: &[OperatorCategory::Sink],
86    hard_range_inn: RANGE_1,
87    soft_range_inn: RANGE_1,
88    hard_range_out: RANGE_0,
89    soft_range_out: RANGE_0,
90    num_args: 1,
91    persistence_args: RANGE_0,
92    type_args: RANGE_0,
93    is_external_input: false,
94    has_singleton_output: false,
95    flo_type: None,
96    ports_inn: None,
97    ports_out: None,
98    input_delaytype_fn: |_| None,
99    write_fn: |wc @ &WriteContextArgs {
100                   root,
101                   df_ident,
102                   op_span,
103                   ident,
104                   is_pull,
105                   arguments,
106                   work_fn,
107                   ..
108               },
109               _| {
110        assert!(!is_pull);
111
112        let sink_arg = &arguments[0];
113
114        let send_ident = wc.make_ident("item_send");
115        let recv_ident = wc.make_ident("item_recv");
116
117        let write_prologue = quote_spanned! {op_span=>
118            let (#send_ident, #recv_ident) = #root::tokio::sync::mpsc::unbounded_channel();
119            {
120                /// Function is needed so `Item` is so no ambiguity for what `Item` is used
121                /// when calling `.flush()`.
122                #[allow(non_snake_case)]
123                async fn #work_fn<Sink, Item>(
124                    mut recv: #root::tokio::sync::mpsc::UnboundedReceiver<Item>,
125                    mut sink: Sink,
126                ) where
127                    Sink: ::std::marker::Unpin + #root::futures::Sink<Item>,
128                    Sink::Error: ::std::fmt::Debug,
129                {
130                    use #root::futures::SinkExt;
131                    while let Some(item) = recv.recv().await {
132                        sink.feed(item)
133                            .await
134                            .expect("Error processing async sink item.");
135                        while let Ok(item) = recv.try_recv() {
136                            sink.feed(item)
137                                .await
138                                .expect("Error processing async sink item.");
139                        }
140                        sink.flush().await.expect("Failed to flush sink.");
141                    }
142                }
143                #df_ident
144                    .request_task(#work_fn(#recv_ident, #sink_arg));
145            }
146        };
147
148        let write_iterator = quote_spanned! {op_span=>
149            let #ident = #root::pusherator::for_each::ForEach::new(|item| {
150                if let Err(err) = #send_ident.send(item) {
151                    panic!("Failed to send async write item for processing.: {}", err);
152                }
153            });
154        };
155
156        Ok(OperatorWriteOutput {
157            write_prologue,
158            write_iterator,
159            ..Default::default()
160        })
161    },
162};