sinktools/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(not(any(test, feature = "std")), no_std)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![warn(missing_docs)]
5
6use core::marker::PhantomData;
7
8use futures_util::Stream;
9pub use futures_util::sink;
10pub use futures_util::sink::Sink;
11#[cfg(feature = "variadics")]
12#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
13pub use variadics;
14
15pub mod filter;
16pub mod filter_map;
17pub mod flat_map;
18pub mod flatten;
19pub mod for_each;
20pub mod inspect;
21pub mod lazy;
22pub mod lazy_sink_source;
23pub mod map;
24pub mod send_iter;
25pub mod send_stream;
26pub mod try_for_each;
27pub mod unzip;
28
29use filter::Filter;
30use filter_map::FilterMap;
31use flat_map::FlatMap;
32use flatten::Flatten;
33use for_each::ForEach;
34use inspect::Inspect;
35use map::Map;
36use send_iter::SendIter;
37use send_stream::SendStream;
38use try_for_each::TryForEach;
39use unzip::Unzip;
40
41#[cfg(feature = "std")]
42#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
43pub mod demux_map;
44#[cfg(feature = "std")]
45#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
46pub use demux_map::demux_map;
47
48#[cfg(feature = "std")]
49#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
50pub mod demux_map_lazy;
51#[cfg(feature = "std")]
52#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
53pub use demux_map_lazy::demux_map_lazy;
54
55#[cfg(feature = "variadics")]
56#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
57pub mod demux_var;
58#[cfg(feature = "variadics")]
59#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
60pub use demux_var::{SinkVariadic, demux_var};
61
62/// A helper trait for building [`Sink`]s in forward order, unlike with `Sinktools`.
63///
64/// To start a sink adaptor chain, use [`SinkBuilder`].
65pub trait SinkBuild {
66    /// The output item type.
67    type Item;
68
69    /// The output [`Sink`] type, if it is prepended to `Next`.
70    type Output<Next: Sink<Self::Item>>;
71    /// Complete this sink adaptor chain by connecting it to `next` as the output.
72    ///
73    /// This method may be used directly if you're trying to connect to an existing sink. Otherwise, use methods like
74    /// `Self::for_each` directly to complete a chain.
75    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
76    where
77        Next: Sink<Self::Item>;
78
79    /// Clone each item and send to both `sink0` and `sink1`, completing this sink adaptor chain.
80    fn fanout<Si0, Si1>(self, sink0: Si0, sink1: Si1) -> Self::Output<sink::Fanout<Si0, Si1>>
81    where
82        Self: Sized,
83        Self::Item: Clone,
84        Si0: Sink<Self::Item>,
85        Si1: Sink<Self::Item, Error = Si0::Error>,
86    {
87        self.send_to(sink::SinkExt::fanout(sink0, sink1))
88    }
89
90    /// Appends a function which consumes each element, completing this sink adaptor chain.
91    fn for_each<Func>(self, func: Func) -> Self::Output<ForEach<Func>>
92    where
93        Self: Sized,
94        Func: FnMut(Self::Item),
95    {
96        self.send_to(ForEach::new(func))
97    }
98
99    /// Appends a function which consumes each element and returns a result, completing this sink
100    /// adaptor chain.
101    fn try_for_each<Func, Error>(self, func: Func) -> Self::Output<TryForEach<Func>>
102    where
103        Self: Sized,
104        Func: FnMut(Self::Item) -> Result<(), Error>,
105    {
106        self.send_to(TryForEach::new(func))
107    }
108
109    /// Appends a function which is called on each element and pases along each output.
110    fn map<Func, Out>(self, func: Func) -> map::MapBuilder<Self, Func>
111    where
112        Self: Sized,
113        Func: FnMut(Self::Item) -> Out,
114    {
115        map::MapBuilder { prev: self, func }
116    }
117
118    /// Appends a predicate function which filters items.
119    fn filter<Func>(self, func: Func) -> filter::FilterBuilder<Self, Func>
120    where
121        Self: Sized,
122        Func: FnMut(&Self::Item) -> bool,
123    {
124        filter::FilterBuilder { prev: self, func }
125    }
126
127    /// Appends a function which both filters and maps items.
128    fn filter_map<Func, Out>(self, func: Func) -> filter_map::FilterMapBuilder<Self, Func>
129    where
130        Self: Sized,
131        Func: FnMut(Self::Item) -> Option<Out>,
132    {
133        filter_map::FilterMapBuilder { prev: self, func }
134    }
135
136    /// Appends a function which maps each item to an iterator and flattens the results.
137    fn flat_map<Func, IntoIter>(self, func: Func) -> flat_map::FlatMapBuilder<Self, Func>
138    where
139        Self: Sized,
140        Func: FnMut(Self::Item) -> IntoIter,
141        IntoIter: IntoIterator,
142    {
143        flat_map::FlatMapBuilder { prev: self, func }
144    }
145
146    /// Flattens items that are iterators.
147    fn flatten<IntoIter>(self) -> flatten::FlattenBuilder<Self>
148    where
149        Self: Sized,
150        Self::Item: IntoIterator,
151    {
152        flatten::FlattenBuilder { prev: self }
153    }
154
155    /// Appends a function which inspects each item without modifying it.
156    fn inspect<Func>(self, func: Func) -> inspect::InspectBuilder<Self, Func>
157    where
158        Self: Sized,
159        Func: FnMut(&Self::Item),
160    {
161        inspect::InspectBuilder { prev: self, func }
162    }
163
164    /// Splits items into two sinks based on tuple structure.
165    fn unzip<Si0, Si1, Item0, Item1>(self, sink0: Si0, sink1: Si1) -> Self::Output<Unzip<Si0, Si1>>
166    where
167        Self: Sized + SinkBuild<Item = (Item0, Item1)>,
168        Si0: Sink<Item0>,
169        Si1: Sink<Item1>,
170        Si0::Error: From<Si1::Error>,
171    {
172        self.send_to(Unzip::new(sink0, sink1))
173    }
174
175    /// Sends each item into one sink depending on the key, where the sinks are in a [`HashMap`](std::collections::HashMap).
176    ///
177    /// This requires sinks `Si` to be `Unpin`. If your sinks are not `Unpin`, first wrap them in `Box::pin` to make them `Unpin`.
178    #[cfg(feature = "std")]
179    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
180    fn demux_map<Key, ItemVal, Si>(
181        self,
182        sinks: impl Into<std::collections::HashMap<Key, Si>>,
183    ) -> Self::Output<demux_map::DemuxMap<Key, Si>>
184    where
185        Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
186        Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
187        Si: Sink<ItemVal> + Unpin,
188    {
189        self.send_to(demux_map(sinks))
190    }
191
192    /// Sends each item into one sink depending on the key, lazily creating sinks on first use.
193    ///
194    /// This requires sinks `Si` to be `Unpin`. If your sinks are not `Unpin`, first wrap them in `Box::pin` to make them `Unpin`.
195    #[cfg(feature = "std")]
196    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
197    fn demux_map_lazy<Key, ItemVal, Si, Func>(
198        self,
199        func: Func,
200    ) -> Self::Output<demux_map_lazy::LazyDemuxSink<Key, Si, Func>>
201    where
202        Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
203        Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
204        Si: Sink<ItemVal> + Unpin,
205        Func: FnMut(&Key) -> Si + Unpin,
206    {
207        self.send_to(demux_map_lazy(func))
208    }
209
210    /// Sends each item into one sink depending on the index, where the sinks are a variadic.
211    #[cfg(feature = "variadics")]
212    #[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
213    fn demux_var<Sinks, ItemVal, Error>(
214        self,
215        sinks: Sinks,
216    ) -> Self::Output<demux_var::DemuxVar<Sinks, Error>>
217    where
218        Self: Sized + SinkBuild<Item = (usize, ItemVal)>,
219        Sinks: SinkVariadic<ItemVal, Error>,
220    {
221        self.send_to(demux_var(sinks))
222    }
223}
224
225/// Start a [`SinkBuild`] adaptor chain, with `Item` as the input item type.
226pub struct SinkBuilder<Item>(PhantomData<fn() -> Item>);
227impl<Item> Default for SinkBuilder<Item> {
228    fn default() -> Self {
229        Self(PhantomData)
230    }
231}
232impl<Item> SinkBuilder<Item> {
233    /// Create a new sink builder.
234    pub fn new() -> Self {
235        Self::default()
236    }
237}
238impl<Item> SinkBuild for SinkBuilder<Item> {
239    type Item = Item;
240
241    type Output<Next: Sink<Self::Item>> = Next;
242    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
243    where
244        Next: Sink<Self::Item>,
245    {
246        next
247    }
248}
249
250/// Blanket trait for sending items from `Self` into a [`SinkBuild`].
251pub trait ToSinkBuild {
252    /// Starts a [`SinkBuild`] adaptor chain to send all items from `self` as an [`Iterator`].
253    fn iter_to_sink_build(self) -> send_iter::SendIterBuild<Self>
254    where
255        Self: Sized + Iterator,
256    {
257        send_iter::SendIterBuild { iter: self }
258    }
259
260    /// Starts a [`SinkBuild`] adaptor chain to send all items from `self` as a [`Stream`].
261    fn stream_to_sink_build(self) -> send_stream::SendStreamBuild<Self>
262    where
263        Self: Sized + Stream,
264    {
265        send_stream::SendStreamBuild { stream: self }
266    }
267}
268impl<T> ToSinkBuild for T {}
269
270/// Forwards sink methods to `self.project().sink`.
271macro_rules! forward_sink {
272    (
273        $( $method:ident ),+
274    ) => {
275        $(
276            fn $method(self: ::core::pin::Pin<&mut Self>, cx: &mut ::core::task::Context<'_>) -> ::core::task::Poll<::core::result::Result<(), Self::Error>> {
277                self.project().sink.$method(cx)
278            }
279        )+
280    }
281}
282use forward_sink;
283
284/// Evaluates both `Poll<()>` expressions and returns `Poll::Pending` if either is pending.
285macro_rules! ready_both {
286    ($a:expr, $b:expr $(,)?) => {
287        if !matches!(
288            ($a, $b),
289            (::core::task::Poll::Ready(()), ::core::task::Poll::Ready(())),
290        ) {
291            return ::core::task::Poll::Pending;
292        }
293    };
294}
295use ready_both;
296
297/// Creates a [`Map`] sink that applies a function to each item.
298pub fn map<Func, In, Out, Si>(func: Func, sink: Si) -> Map<Si, Func>
299where
300    Func: FnMut(In) -> Out,
301    Si: Sink<Out>,
302{
303    Map::new(func, sink)
304}
305
306/// Creates a [`Filter`] sink that filters items based on a predicate.
307pub fn filter<Func, Item, Si>(func: Func, sink: Si) -> Filter<Si, Func>
308where
309    Func: FnMut(&Item) -> bool,
310    Si: Sink<Item>,
311{
312    Filter::new(func, sink)
313}
314
315/// Creates a [`FilterMap`] sink that filters and maps items in one step.
316pub fn filter_map<Func, In, Out, Si>(func: Func, sink: Si) -> FilterMap<Si, Func>
317where
318    Func: FnMut(In) -> Option<Out>,
319    Si: Sink<Out>,
320{
321    FilterMap::new(func, sink)
322}
323
324/// Creates a [`FlatMap`] sink that maps each item to an iterator and flattens the results.
325pub fn flat_map<Func, In, IntoIter, Si>(func: Func, sink: Si) -> FlatMap<Si, Func, IntoIter>
326where
327    Func: FnMut(In) -> IntoIter,
328    IntoIter: IntoIterator,
329    Si: Sink<IntoIter::Item>,
330{
331    FlatMap::new(func, sink)
332}
333
334/// Creates a [`Flatten`] sink that flattens items that are iterators.
335///
336/// Note: Due to type inference limitations, you may need to specify the item type:
337/// `flatten::<Vec<i32>, _>(sink)` where `Vec<i32>` is the input item type.
338pub fn flatten<IntoIter, Si>(sink: Si) -> Flatten<Si, IntoIter>
339where
340    IntoIter: IntoIterator,
341    Si: Sink<IntoIter::Item>,
342{
343    Flatten::new(sink)
344}
345
346/// Creates an [`Inspect`] sink that inspects each item without modifying it.
347pub fn inspect<Func, Item, Si>(func: Func, sink: Si) -> Inspect<Si, Func>
348where
349    Func: FnMut(&Item),
350    Si: Sink<Item>,
351{
352    Inspect::new(func, sink)
353}
354
355/// Creates an [`Unzip`] sink that splits tuple items into two separate sinks.
356pub fn unzip<Si0, Si1, Item0, Item1>(sink0: Si0, sink1: Si1) -> Unzip<Si0, Si1>
357where
358    Si0: Sink<Item0>,
359    Si1: Sink<Item1>,
360    Si0::Error: From<Si1::Error>,
361{
362    Unzip::new(sink0, sink1)
363}
364
365/// Creates a [`ForEach`] sink that consumes each item with a function.
366pub fn for_each<Func, Item>(func: Func) -> ForEach<Func>
367where
368    Func: FnMut(Item),
369{
370    ForEach::new(func)
371}
372
373/// Creates a [`TryForEach`] sink that consumes each item with a fallible function.
374pub fn try_for_each<Func, Item, Error>(func: Func) -> TryForEach<Func>
375where
376    Func: FnMut(Item) -> Result<(), Error>,
377{
378    TryForEach::new(func)
379}
380
381/// Creates a [`SendIter`] future that sends all items from an iterator to a sink.
382pub fn send_iter<I, Si>(iter: I, sink: Si) -> SendIter<I::IntoIter, Si>
383where
384    I: IntoIterator,
385    Si: Sink<I::Item>,
386{
387    SendIter::new(iter.into_iter(), sink)
388}
389
390/// Creates a [`SendStream`] future that sends all items from a stream to a sink.
391pub fn send_stream<St, Si>(stream: St, sink: Si) -> SendStream<St, Si>
392where
393    St: Stream,
394    Si: Sink<St::Item>,
395{
396    SendStream::new(stream, sink)
397}