dfir_lang/graph/ops/dest_sink.rs
1use quote::quote_spanned;
2
3use super::{
4 OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
5};
6
7/// > Arguments: An [async `Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html).
8///
9/// Consumes items by sending them to an [async `Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html).
10/// A `Sink` is a thing into which values can be sent, asynchronously. For example, sending items
11/// into a bounded channel.
12///
13/// Note this operator must be used within a Tokio runtime, and the DFIR program must be launched with `run_async`.
14///
15/// ```rustbook
16/// # #[dfir_rs::main]
17/// # async fn main() {
18/// // In this example we use a _bounded_ channel for our `Sink`. This is for demonstration only,
19/// // instead you should use [`dfir_rs::util::unbounded_channel`]. A bounded channel results in
20/// // buffering items internally instead of within the channel. (We can't use
21/// // unbounded here since unbounded channels are synchonous to write to and therefore not
22/// // `Sink`s.)
23/// let (send, recv) = tokio::sync::mpsc::channel::<usize>(5);
24/// // `PollSender` adapts the send half of the bounded channel into a `Sink`.
25/// let send = tokio_util::sync::PollSender::new(send);
26///
27/// let mut flow = dfir_rs::dfir_syntax! {
28/// source_iter(0..10) -> dest_sink(send);
29/// };
30/// // Call `run_async()` to allow async events to propagate, run for one second.
31/// tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
32/// .await
33/// .expect_err("Expected time out");
34///
35/// let mut recv = tokio_stream::wrappers::ReceiverStream::new(recv);
36/// // Only 5 elements received due to buffer size.
37/// // (Note that if we were using a multi-threaded executor instead of `current_thread` it would
38/// // be possible for more items to be added as they're removed, resulting in >5 collected.)
39/// let out: Vec<_> = dfir_rs::util::ready_iter(&mut recv).collect();
40/// assert_eq!(&[0, 1, 2, 3, 4], &*out);
41/// # }
42/// ```
43///
44/// `Sink` is different from [`AsyncWrite`](https://docs.rs/futures/latest/futures/io/trait.AsyncWrite.html).
45/// Instead of discrete values we send arbitrary streams of bytes into an `AsyncWrite` value. For
46/// example, writings a stream of bytes to a file, a socket, or stdout.
47///
48/// To handle those situations we can use a codec from [`tokio_util::codec`](https://docs.rs/tokio-util/latest/tokio_util/codec/index.html).
49/// These specify ways in which the byte stream is broken into individual items, such as with
50/// newlines or with length delineation.
51///
52/// If we only want to write a stream of bytes without delineation we can use the [`BytesCodec`](https://docs.rs/tokio-util/latest/tokio_util/codec/struct.BytesCodec.html).
53///
54/// In this example we use a [`duplex`](https://docs.rs/tokio/latest/tokio/io/fn.duplex.html) as our `AsyncWrite` with a
55/// `BytesCodec`.
56///
57/// ```rustbook
58/// # #[dfir_rs::main]
59/// # async fn main() {
60/// use bytes::Bytes;
61/// use tokio::io::AsyncReadExt;
62///
63/// // Like a channel, but for a stream of bytes instead of discrete objects.
64/// let (asyncwrite, mut asyncread) = tokio::io::duplex(256);
65/// // Now instead handle discrete byte strings by length-encoding them.
66/// let sink = tokio_util::codec::FramedWrite::new(asyncwrite, tokio_util::codec::BytesCodec::new());
67///
68/// let mut flow = dfir_rs::dfir_syntax! {
69/// source_iter([
70/// Bytes::from_static(b"hello"),
71/// Bytes::from_static(b"world"),
72/// ]) -> dest_sink(sink);
73/// };
74/// tokio::time::timeout(std::time::Duration::from_secs(1), flow.run_async())
75/// .await
76/// .expect_err("Expected time out");
77///
78/// let mut buf = Vec::<u8>::new();
79/// asyncread.read_buf(&mut buf).await.unwrap();
80/// assert_eq!(b"helloworld", &*buf);
81/// # }
82/// ```
83pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
84 name: "dest_sink",
85 categories: &[OperatorCategory::Sink],
86 hard_range_inn: RANGE_1,
87 soft_range_inn: RANGE_1,
88 hard_range_out: RANGE_0,
89 soft_range_out: RANGE_0,
90 num_args: 1,
91 persistence_args: RANGE_0,
92 type_args: RANGE_0,
93 is_external_input: false,
94 has_singleton_output: false,
95 flo_type: None,
96 ports_inn: None,
97 ports_out: None,
98 input_delaytype_fn: |_| None,
99 write_fn: |wc @ &WriteContextArgs {
100 root,
101 df_ident,
102 op_span,
103 ident,
104 is_pull,
105 arguments,
106 work_fn,
107 ..
108 },
109 _| {
110 assert!(!is_pull);
111
112 let sink_arg = &arguments[0];
113
114 let send_ident = wc.make_ident("item_send");
115 let recv_ident = wc.make_ident("item_recv");
116
117 let write_prologue = quote_spanned! {op_span=>
118 let (#send_ident, #recv_ident) = #root::tokio::sync::mpsc::unbounded_channel();
119 {
120 /// Function is needed so `Item` is so no ambiguity for what `Item` is used
121 /// when calling `.flush()`.
122 #[allow(non_snake_case)]
123 async fn #work_fn<Sink, Item>(
124 mut recv: #root::tokio::sync::mpsc::UnboundedReceiver<Item>,
125 mut sink: Sink,
126 ) where
127 Sink: ::std::marker::Unpin + #root::futures::Sink<Item>,
128 Sink::Error: ::std::fmt::Debug,
129 {
130 use #root::futures::SinkExt;
131 while let Some(item) = recv.recv().await {
132 sink.feed(item)
133 .await
134 .expect("Error processing async sink item.");
135 while let Ok(item) = recv.try_recv() {
136 sink.feed(item)
137 .await
138 .expect("Error processing async sink item.");
139 }
140 sink.flush().await.expect("Failed to flush sink.");
141 }
142 }
143 #df_ident
144 .request_task(#work_fn(#recv_ident, #sink_arg));
145 }
146 };
147
148 let write_iterator = quote_spanned! {op_span=>
149 let #ident = #root::pusherator::for_each::ForEach::new(|item| {
150 if let Err(err) = #send_ident.send(item) {
151 panic!("Failed to send async write item for processing.: {}", err);
152 }
153 });
154 };
155
156 Ok(OperatorWriteOutput {
157 write_prologue,
158 write_iterator,
159 ..Default::default()
160 })
161 },
162};