1#![doc = include_str!("../README.md")]
2#![cfg_attr(not(any(test, feature = "std")), no_std)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![warn(missing_docs)]
5
6use core::marker::PhantomData;
7
8use futures_util::Stream;
9pub use futures_util::sink;
10pub use futures_util::sink::Sink;
11#[cfg(feature = "variadics")]
12#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
13pub use variadics;
14
15pub mod filter;
16pub mod filter_map;
17pub mod flat_map;
18pub mod flatten;
19pub mod for_each;
20pub mod inspect;
21pub mod lazy;
22pub mod lazy_sink_source;
23pub mod map;
24pub mod send_iter;
25pub mod send_stream;
26pub mod try_for_each;
27pub mod unzip;
28
29use filter::Filter;
30use filter_map::FilterMap;
31use flat_map::FlatMap;
32use flatten::Flatten;
33use for_each::ForEach;
34use inspect::Inspect;
35use map::Map;
36use send_iter::SendIter;
37use send_stream::SendStream;
38use try_for_each::TryForEach;
39use unzip::Unzip;
40
41#[cfg(feature = "std")]
42#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
43pub mod demux_map;
44#[cfg(feature = "std")]
45#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
46pub use demux_map::demux_map;
47
48#[cfg(feature = "std")]
49#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
50pub mod demux_map_lazy;
51#[cfg(feature = "std")]
52#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
53pub use demux_map_lazy::demux_map_lazy;
54
55#[cfg(feature = "variadics")]
56#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
57pub mod demux_var;
58#[cfg(feature = "variadics")]
59#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
60pub use demux_var::{SinkVariadic, demux_var};
61
62pub trait SinkBuild {
66 type Item;
68
69 type Output<Next: Sink<Self::Item>>;
71 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
76 where
77 Next: Sink<Self::Item>;
78
79 fn fanout<Si0, Si1>(self, sink0: Si0, sink1: Si1) -> Self::Output<sink::Fanout<Si0, Si1>>
81 where
82 Self: Sized,
83 Self::Item: Clone,
84 Si0: Sink<Self::Item>,
85 Si1: Sink<Self::Item, Error = Si0::Error>,
86 {
87 self.send_to(sink::SinkExt::fanout(sink0, sink1))
88 }
89
90 fn for_each<Func>(self, func: Func) -> Self::Output<ForEach<Func>>
92 where
93 Self: Sized,
94 Func: FnMut(Self::Item),
95 {
96 self.send_to(ForEach::new(func))
97 }
98
99 fn try_for_each<Func, Error>(self, func: Func) -> Self::Output<TryForEach<Func>>
102 where
103 Self: Sized,
104 Func: FnMut(Self::Item) -> Result<(), Error>,
105 {
106 self.send_to(TryForEach::new(func))
107 }
108
109 fn map<Func, Out>(self, func: Func) -> map::MapBuilder<Self, Func>
111 where
112 Self: Sized,
113 Func: FnMut(Self::Item) -> Out,
114 {
115 map::MapBuilder { prev: self, func }
116 }
117
118 fn filter<Func>(self, func: Func) -> filter::FilterBuilder<Self, Func>
120 where
121 Self: Sized,
122 Func: FnMut(&Self::Item) -> bool,
123 {
124 filter::FilterBuilder { prev: self, func }
125 }
126
127 fn filter_map<Func, Out>(self, func: Func) -> filter_map::FilterMapBuilder<Self, Func>
129 where
130 Self: Sized,
131 Func: FnMut(Self::Item) -> Option<Out>,
132 {
133 filter_map::FilterMapBuilder { prev: self, func }
134 }
135
136 fn flat_map<Func, IntoIter>(self, func: Func) -> flat_map::FlatMapBuilder<Self, Func>
138 where
139 Self: Sized,
140 Func: FnMut(Self::Item) -> IntoIter,
141 IntoIter: IntoIterator,
142 {
143 flat_map::FlatMapBuilder { prev: self, func }
144 }
145
146 fn flatten<IntoIter>(self) -> flatten::FlattenBuilder<Self>
148 where
149 Self: Sized,
150 Self::Item: IntoIterator,
151 {
152 flatten::FlattenBuilder { prev: self }
153 }
154
155 fn inspect<Func>(self, func: Func) -> inspect::InspectBuilder<Self, Func>
157 where
158 Self: Sized,
159 Func: FnMut(&Self::Item),
160 {
161 inspect::InspectBuilder { prev: self, func }
162 }
163
164 fn unzip<Si0, Si1, Item0, Item1>(self, sink0: Si0, sink1: Si1) -> Self::Output<Unzip<Si0, Si1>>
166 where
167 Self: Sized + SinkBuild<Item = (Item0, Item1)>,
168 Si0: Sink<Item0>,
169 Si1: Sink<Item1>,
170 Si0::Error: From<Si1::Error>,
171 {
172 self.send_to(Unzip::new(sink0, sink1))
173 }
174
175 #[cfg(feature = "std")]
179 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
180 fn demux_map<Key, ItemVal, Si>(
181 self,
182 sinks: impl Into<std::collections::HashMap<Key, Si>>,
183 ) -> Self::Output<demux_map::DemuxMap<Key, Si>>
184 where
185 Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
186 Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
187 Si: Sink<ItemVal> + Unpin,
188 {
189 self.send_to(demux_map(sinks))
190 }
191
192 #[cfg(feature = "std")]
196 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
197 fn demux_map_lazy<Key, ItemVal, Si, Func>(
198 self,
199 func: Func,
200 ) -> Self::Output<demux_map_lazy::LazyDemuxSink<Key, Si, Func>>
201 where
202 Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
203 Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
204 Si: Sink<ItemVal> + Unpin,
205 Func: FnMut(&Key) -> Si + Unpin,
206 {
207 self.send_to(demux_map_lazy(func))
208 }
209
210 #[cfg(feature = "variadics")]
212 #[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
213 fn demux_var<Sinks, ItemVal, Error>(
214 self,
215 sinks: Sinks,
216 ) -> Self::Output<demux_var::DemuxVar<Sinks, Error>>
217 where
218 Self: Sized + SinkBuild<Item = (usize, ItemVal)>,
219 Sinks: SinkVariadic<ItemVal, Error>,
220 {
221 self.send_to(demux_var(sinks))
222 }
223}
224
225pub struct SinkBuilder<Item>(PhantomData<fn() -> Item>);
227impl<Item> Default for SinkBuilder<Item> {
228 fn default() -> Self {
229 Self(PhantomData)
230 }
231}
232impl<Item> SinkBuilder<Item> {
233 pub fn new() -> Self {
235 Self::default()
236 }
237}
238impl<Item> SinkBuild for SinkBuilder<Item> {
239 type Item = Item;
240
241 type Output<Next: Sink<Self::Item>> = Next;
242 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
243 where
244 Next: Sink<Self::Item>,
245 {
246 next
247 }
248}
249
250pub trait ToSinkBuild {
252 fn iter_to_sink_build(self) -> send_iter::SendIterBuild<Self>
254 where
255 Self: Sized + Iterator,
256 {
257 send_iter::SendIterBuild { iter: self }
258 }
259
260 fn stream_to_sink_build(self) -> send_stream::SendStreamBuild<Self>
262 where
263 Self: Sized + Stream,
264 {
265 send_stream::SendStreamBuild { stream: self }
266 }
267}
268impl<T> ToSinkBuild for T {}
269
270macro_rules! forward_sink {
272 (
273 $( $method:ident ),+
274 ) => {
275 $(
276 fn $method(self: ::core::pin::Pin<&mut Self>, cx: &mut ::core::task::Context<'_>) -> ::core::task::Poll<::core::result::Result<(), Self::Error>> {
277 self.project().sink.$method(cx)
278 }
279 )+
280 }
281}
282use forward_sink;
283
284macro_rules! ready_both {
286 ($a:expr, $b:expr $(,)?) => {
287 if !matches!(
288 ($a, $b),
289 (::core::task::Poll::Ready(()), ::core::task::Poll::Ready(())),
290 ) {
291 return ::core::task::Poll::Pending;
292 }
293 };
294}
295use ready_both;
296
297pub fn map<Func, In, Out, Si>(func: Func, sink: Si) -> Map<Si, Func>
299where
300 Func: FnMut(In) -> Out,
301 Si: Sink<Out>,
302{
303 Map::new(func, sink)
304}
305
306pub fn filter<Func, Item, Si>(func: Func, sink: Si) -> Filter<Si, Func>
308where
309 Func: FnMut(&Item) -> bool,
310 Si: Sink<Item>,
311{
312 Filter::new(func, sink)
313}
314
315pub fn filter_map<Func, In, Out, Si>(func: Func, sink: Si) -> FilterMap<Si, Func>
317where
318 Func: FnMut(In) -> Option<Out>,
319 Si: Sink<Out>,
320{
321 FilterMap::new(func, sink)
322}
323
324pub fn flat_map<Func, In, IntoIter, Si>(func: Func, sink: Si) -> FlatMap<Si, Func, IntoIter>
326where
327 Func: FnMut(In) -> IntoIter,
328 IntoIter: IntoIterator,
329 Si: Sink<IntoIter::Item>,
330{
331 FlatMap::new(func, sink)
332}
333
334pub fn flatten<IntoIter, Si>(sink: Si) -> Flatten<Si, IntoIter>
339where
340 IntoIter: IntoIterator,
341 Si: Sink<IntoIter::Item>,
342{
343 Flatten::new(sink)
344}
345
346pub fn inspect<Func, Item, Si>(func: Func, sink: Si) -> Inspect<Si, Func>
348where
349 Func: FnMut(&Item),
350 Si: Sink<Item>,
351{
352 Inspect::new(func, sink)
353}
354
355pub fn unzip<Si0, Si1, Item0, Item1>(sink0: Si0, sink1: Si1) -> Unzip<Si0, Si1>
357where
358 Si0: Sink<Item0>,
359 Si1: Sink<Item1>,
360 Si0::Error: From<Si1::Error>,
361{
362 Unzip::new(sink0, sink1)
363}
364
365pub fn for_each<Func, Item>(func: Func) -> ForEach<Func>
367where
368 Func: FnMut(Item),
369{
370 ForEach::new(func)
371}
372
373pub fn try_for_each<Func, Item, Error>(func: Func) -> TryForEach<Func>
375where
376 Func: FnMut(Item) -> Result<(), Error>,
377{
378 TryForEach::new(func)
379}
380
381pub fn send_iter<I, Si>(iter: I, sink: Si) -> SendIter<I::IntoIter, Si>
383where
384 I: IntoIterator,
385 Si: Sink<I::Item>,
386{
387 SendIter::new(iter.into_iter(), sink)
388}
389
390pub fn send_stream<St, Si>(stream: St, sink: Si) -> SendStream<St, Si>
392where
393 St: Stream,
394 Si: Sink<St::Item>,
395{
396 SendStream::new(stream, sink)
397}