sinktools/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(not(any(test, feature = "std")), no_std)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![warn(missing_docs)]
5
6use core::marker::PhantomData;
7
8use futures_util::Stream;
9pub use futures_util::sink;
10pub use futures_util::sink::Sink;
11#[cfg(feature = "variadics")]
12#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
13pub use variadics;
14
15pub mod filter;
16pub mod filter_map;
17pub mod flat_map;
18pub mod flatten;
19pub mod for_each;
20pub mod inspect;
21pub mod map;
22pub mod send_iter;
23pub mod send_stream;
24pub mod try_for_each;
25pub mod unzip;
26
27use filter::Filter;
28use filter_map::FilterMap;
29use flat_map::FlatMap;
30use flatten::Flatten;
31use for_each::ForEach;
32use inspect::Inspect;
33use map::Map;
34use send_iter::SendIter;
35use send_stream::SendStream;
36use try_for_each::TryForEach;
37use unzip::Unzip;
38
39#[cfg(feature = "std")]
40#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
41pub mod demux_map;
42#[cfg(feature = "std")]
43#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
44pub use demux_map::demux_map;
45
46#[cfg(feature = "variadics")]
47#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
48pub mod demux_var;
49#[cfg(feature = "variadics")]
50#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
51pub use demux_var::{SinkVariadic, demux_var};
52
53/// A helper trait for building [`Sink`]s in forward order, unlike with `Sinktools`.
54///
55/// To start a sink adaptor chain, use [`SinkBuilder`].
56pub trait SinkBuild {
57    /// The output item type.
58    type Item;
59
60    /// The output [`Sink`] type, if it is prepended to `Next`.
61    type Output<Next: Sink<Self::Item>>;
62    /// Complete this sink adaptor chain by connecting it to `next` as the output.
63    ///
64    /// This method may be used directly if you're trying to connect to an existing sink. Otherwise, use methods like
65    /// `Self::for_each` directly to complete a chain.
66    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
67    where
68        Next: Sink<Self::Item>;
69
70    /// Clone each item and send to both `sink0` and `sink1`, completing this sink adaptor chain.
71    fn fanout<Si0, Si1>(self, sink0: Si0, sink1: Si1) -> Self::Output<sink::Fanout<Si0, Si1>>
72    where
73        Self: Sized,
74        Self::Item: Clone,
75        Si0: Sink<Self::Item>,
76        Si1: Sink<Self::Item, Error = Si0::Error>,
77    {
78        self.send_to(sink::SinkExt::fanout(sink0, sink1))
79    }
80
81    /// Appends a function which consumes each element, completing this sink adaptor chain.
82    fn for_each<Func>(self, func: Func) -> Self::Output<ForEach<Func>>
83    where
84        Self: Sized,
85        Func: FnMut(Self::Item),
86    {
87        self.send_to(ForEach::new(func))
88    }
89
90    /// Appends a function which consumes each element and returns a result, completing this sink
91    /// adaptor chain.
92    fn try_for_each<Func, Error>(self, func: Func) -> Self::Output<TryForEach<Func>>
93    where
94        Self: Sized,
95        Func: FnMut(Self::Item) -> Result<(), Error>,
96    {
97        self.send_to(TryForEach::new(func))
98    }
99
100    /// Appends a function which is called on each element and pases along each output.
101    fn map<Func, Out>(self, func: Func) -> map::MapBuilder<Self, Func>
102    where
103        Self: Sized,
104        Func: FnMut(Self::Item) -> Out,
105    {
106        map::MapBuilder { prev: self, func }
107    }
108
109    /// Appends a predicate function which filters items.
110    fn filter<Func>(self, func: Func) -> filter::FilterBuilder<Self, Func>
111    where
112        Self: Sized,
113        Func: FnMut(&Self::Item) -> bool,
114    {
115        filter::FilterBuilder { prev: self, func }
116    }
117
118    /// Appends a function which both filters and maps items.
119    fn filter_map<Func, Out>(self, func: Func) -> filter_map::FilterMapBuilder<Self, Func>
120    where
121        Self: Sized,
122        Func: FnMut(Self::Item) -> Option<Out>,
123    {
124        filter_map::FilterMapBuilder { prev: self, func }
125    }
126
127    /// Appends a function which maps each item to an iterator and flattens the results.
128    fn flat_map<Func, IntoIter>(self, func: Func) -> flat_map::FlatMapBuilder<Self, Func>
129    where
130        Self: Sized,
131        Func: FnMut(Self::Item) -> IntoIter,
132        IntoIter: IntoIterator,
133    {
134        flat_map::FlatMapBuilder { prev: self, func }
135    }
136
137    /// Flattens items that are iterators.
138    fn flatten<IntoIter>(self) -> flatten::FlattenBuilder<Self>
139    where
140        Self: Sized,
141        Self::Item: IntoIterator,
142    {
143        flatten::FlattenBuilder { prev: self }
144    }
145
146    /// Appends a function which inspects each item without modifying it.
147    fn inspect<Func>(self, func: Func) -> inspect::InspectBuilder<Self, Func>
148    where
149        Self: Sized,
150        Func: FnMut(&Self::Item),
151    {
152        inspect::InspectBuilder { prev: self, func }
153    }
154
155    /// Splits items into two sinks based on tuple structure.
156    fn unzip<Si0, Si1, Item0, Item1>(self, sink0: Si0, sink1: Si1) -> Self::Output<Unzip<Si0, Si1>>
157    where
158        Self: Sized + SinkBuild<Item = (Item0, Item1)>,
159        Si0: Sink<Item0>,
160        Si1: Sink<Item1>,
161        Si0::Error: From<Si1::Error>,
162    {
163        self.send_to(Unzip::new(sink0, sink1))
164    }
165
166    /// Sends each item into one sink depending on the key, where the sinks are in a [`HashMap`](std::collections::HashMap).
167    ///
168    /// This requires sinks `Si` to be `Unpin`. If your sinks are not `Unpin`, first wrap them in `Box::pin` to make them `Unpin`.
169    #[cfg(feature = "std")]
170    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
171    fn demux_map<Key, ItemVal, Si>(
172        self,
173        sinks: impl Into<std::collections::HashMap<Key, Si>>,
174    ) -> Self::Output<demux_map::DemuxMap<Key, Si>>
175    where
176        Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
177        Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
178        Si: Sink<ItemVal> + Unpin,
179    {
180        self.send_to(demux_map(sinks))
181    }
182
183    /// Sends each item into one sink depending on the index, where the sinks are a variadic.
184    #[cfg(feature = "variadics")]
185    #[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
186    fn demux_var<Sinks, ItemVal, Error>(
187        self,
188        sinks: Sinks,
189    ) -> Self::Output<demux_var::DemuxVar<Sinks, Error>>
190    where
191        Self: Sized + SinkBuild<Item = (usize, ItemVal)>,
192        Sinks: SinkVariadic<ItemVal, Error>,
193    {
194        self.send_to(demux_var(sinks))
195    }
196}
197
198/// Start a [`SinkBuild`] adaptor chain, with `Item` as the input item type.
199pub struct SinkBuilder<Item>(PhantomData<fn() -> Item>);
200impl<Item> Default for SinkBuilder<Item> {
201    fn default() -> Self {
202        Self(PhantomData)
203    }
204}
205impl<Item> SinkBuilder<Item> {
206    /// Create a new sink builder.
207    pub fn new() -> Self {
208        Self::default()
209    }
210}
211impl<Item> SinkBuild for SinkBuilder<Item> {
212    type Item = Item;
213
214    type Output<Next: Sink<Self::Item>> = Next;
215    fn send_to<Next>(self, next: Next) -> Self::Output<Next>
216    where
217        Next: Sink<Self::Item>,
218    {
219        next
220    }
221}
222
223/// Blanket trait for sending items from `Self` into a [`SinkBuild`].
224pub trait ToSinkBuild {
225    /// Starts a [`SinkBuild`] adaptor chain to send all items from `self` as an [`Iterator`].
226    fn iter_to_sink_build(self) -> send_iter::SendIterBuild<Self>
227    where
228        Self: Sized + Iterator,
229    {
230        send_iter::SendIterBuild { iter: self }
231    }
232
233    /// Starts a [`SinkBuild`] adaptor chain to send all items from `self` as a [`Stream`].
234    fn stream_to_sink_build(self) -> send_stream::SendStreamBuild<Self>
235    where
236        Self: Sized + Stream,
237    {
238        send_stream::SendStreamBuild { stream: self }
239    }
240}
241impl<T> ToSinkBuild for T {}
242
243/// Forwards sink methods to `self.project().sink`.
244macro_rules! forward_sink {
245    (
246        $( $method:ident ),+
247    ) => {
248        $(
249            fn $method(self: ::core::pin::Pin<&mut Self>, cx: &mut ::core::task::Context<'_>) -> ::core::task::Poll<::core::result::Result<(), Self::Error>> {
250                self.project().sink.$method(cx)
251            }
252        )+
253    }
254}
255use forward_sink;
256
257/// Evaluates both `Poll<()>` expressions and returns `Poll::Pending` if either is pending.
258macro_rules! ready_both {
259    ($a:expr, $b:expr $(,)?) => {
260        if !matches!(
261            ($a, $b),
262            (::core::task::Poll::Ready(()), ::core::task::Poll::Ready(())),
263        ) {
264            return ::core::task::Poll::Pending;
265        }
266    };
267}
268use ready_both;
269
270/// Creates a [`Map`] sink that applies a function to each item.
271pub fn map<Func, In, Out, Si>(func: Func, sink: Si) -> Map<Si, Func>
272where
273    Func: FnMut(In) -> Out,
274    Si: Sink<Out>,
275{
276    Map::new(func, sink)
277}
278
279/// Creates a [`Filter`] sink that filters items based on a predicate.
280pub fn filter<Func, Item, Si>(func: Func, sink: Si) -> Filter<Si, Func>
281where
282    Func: FnMut(&Item) -> bool,
283    Si: Sink<Item>,
284{
285    Filter::new(func, sink)
286}
287
288/// Creates a [`FilterMap`] sink that filters and maps items in one step.
289pub fn filter_map<Func, In, Out, Si>(func: Func, sink: Si) -> FilterMap<Si, Func>
290where
291    Func: FnMut(In) -> Option<Out>,
292    Si: Sink<Out>,
293{
294    FilterMap::new(func, sink)
295}
296
297/// Creates a [`FlatMap`] sink that maps each item to an iterator and flattens the results.
298pub fn flat_map<Func, In, IntoIter, Si>(func: Func, sink: Si) -> FlatMap<Si, Func, IntoIter>
299where
300    Func: FnMut(In) -> IntoIter,
301    IntoIter: IntoIterator,
302    Si: Sink<IntoIter::Item>,
303{
304    FlatMap::new(func, sink)
305}
306
307/// Creates a [`Flatten`] sink that flattens items that are iterators.
308///
309/// Note: Due to type inference limitations, you may need to specify the item type:
310/// `flatten::<Vec<i32>, _>(sink)` where `Vec<i32>` is the input item type.
311pub fn flatten<IntoIter, Si>(sink: Si) -> Flatten<Si, IntoIter>
312where
313    IntoIter: IntoIterator,
314    Si: Sink<IntoIter::Item>,
315{
316    Flatten::new(sink)
317}
318
319/// Creates an [`Inspect`] sink that inspects each item without modifying it.
320pub fn inspect<Func, Item, Si>(func: Func, sink: Si) -> Inspect<Si, Func>
321where
322    Func: FnMut(&Item),
323    Si: Sink<Item>,
324{
325    Inspect::new(func, sink)
326}
327
328/// Creates an [`Unzip`] sink that splits tuple items into two separate sinks.
329pub fn unzip<Si0, Si1, Item0, Item1>(sink0: Si0, sink1: Si1) -> Unzip<Si0, Si1>
330where
331    Si0: Sink<Item0>,
332    Si1: Sink<Item1>,
333    Si0::Error: From<Si1::Error>,
334{
335    Unzip::new(sink0, sink1)
336}
337
338/// Creates a [`ForEach`] sink that consumes each item with a function.
339pub fn for_each<Func, Item>(func: Func) -> ForEach<Func>
340where
341    Func: FnMut(Item),
342{
343    ForEach::new(func)
344}
345
346/// Creates a [`TryForEach`] sink that consumes each item with a fallible function.
347pub fn try_for_each<Func, Item, Error>(func: Func) -> TryForEach<Func>
348where
349    Func: FnMut(Item) -> Result<(), Error>,
350{
351    TryForEach::new(func)
352}
353
354/// Creates a [`SendIter`] future that sends all items from an iterator to a sink.
355pub fn send_iter<I, Si>(iter: I, sink: Si) -> SendIter<I::IntoIter, Si>
356where
357    I: IntoIterator,
358    Si: Sink<I::Item>,
359{
360    SendIter::new(iter.into_iter(), sink)
361}
362
363/// Creates a [`SendStream`] future that sends all items from a stream to a sink.
364pub fn send_stream<St, Si>(stream: St, sink: Si) -> SendStream<St, Si>
365where
366    St: Stream,
367    Si: Sink<St::Item>,
368{
369    SendStream::new(stream, sink)
370}