1#![doc = include_str!("../README.md")]
2#![cfg_attr(not(any(test, feature = "std")), no_std)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![warn(missing_docs)]
5
6use core::marker::PhantomData;
7
8use futures_util::Stream;
9pub use futures_util::sink;
10pub use futures_util::sink::Sink;
11#[cfg(feature = "variadics")]
12#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
13pub use variadics;
14
15pub mod filter;
16pub mod filter_map;
17pub mod flat_map;
18pub mod flatten;
19pub mod for_each;
20pub mod inspect;
21pub mod map;
22pub mod send_iter;
23pub mod send_stream;
24pub mod try_for_each;
25pub mod unzip;
26
27use filter::Filter;
28use filter_map::FilterMap;
29use flat_map::FlatMap;
30use flatten::Flatten;
31use for_each::ForEach;
32use inspect::Inspect;
33use map::Map;
34use send_iter::SendIter;
35use send_stream::SendStream;
36use try_for_each::TryForEach;
37use unzip::Unzip;
38
39#[cfg(feature = "std")]
40#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
41pub mod demux_map;
42#[cfg(feature = "std")]
43#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
44pub use demux_map::demux_map;
45
46#[cfg(feature = "variadics")]
47#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
48pub mod demux_var;
49#[cfg(feature = "variadics")]
50#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
51pub use demux_var::{SinkVariadic, demux_var};
52
53pub trait SinkBuild {
57 type Item;
59
60 type Output<Next: Sink<Self::Item>>;
62 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
67 where
68 Next: Sink<Self::Item>;
69
70 fn fanout<Si0, Si1>(self, sink0: Si0, sink1: Si1) -> Self::Output<sink::Fanout<Si0, Si1>>
72 where
73 Self: Sized,
74 Self::Item: Clone,
75 Si0: Sink<Self::Item>,
76 Si1: Sink<Self::Item, Error = Si0::Error>,
77 {
78 self.send_to(sink::SinkExt::fanout(sink0, sink1))
79 }
80
81 fn for_each<Func>(self, func: Func) -> Self::Output<ForEach<Func>>
83 where
84 Self: Sized,
85 Func: FnMut(Self::Item),
86 {
87 self.send_to(ForEach::new(func))
88 }
89
90 fn try_for_each<Func, Error>(self, func: Func) -> Self::Output<TryForEach<Func>>
93 where
94 Self: Sized,
95 Func: FnMut(Self::Item) -> Result<(), Error>,
96 {
97 self.send_to(TryForEach::new(func))
98 }
99
100 fn map<Func, Out>(self, func: Func) -> map::MapBuilder<Self, Func>
102 where
103 Self: Sized,
104 Func: FnMut(Self::Item) -> Out,
105 {
106 map::MapBuilder { prev: self, func }
107 }
108
109 fn filter<Func>(self, func: Func) -> filter::FilterBuilder<Self, Func>
111 where
112 Self: Sized,
113 Func: FnMut(&Self::Item) -> bool,
114 {
115 filter::FilterBuilder { prev: self, func }
116 }
117
118 fn filter_map<Func, Out>(self, func: Func) -> filter_map::FilterMapBuilder<Self, Func>
120 where
121 Self: Sized,
122 Func: FnMut(Self::Item) -> Option<Out>,
123 {
124 filter_map::FilterMapBuilder { prev: self, func }
125 }
126
127 fn flat_map<Func, IntoIter>(self, func: Func) -> flat_map::FlatMapBuilder<Self, Func>
129 where
130 Self: Sized,
131 Func: FnMut(Self::Item) -> IntoIter,
132 IntoIter: IntoIterator,
133 {
134 flat_map::FlatMapBuilder { prev: self, func }
135 }
136
137 fn flatten<IntoIter>(self) -> flatten::FlattenBuilder<Self>
139 where
140 Self: Sized,
141 Self::Item: IntoIterator,
142 {
143 flatten::FlattenBuilder { prev: self }
144 }
145
146 fn inspect<Func>(self, func: Func) -> inspect::InspectBuilder<Self, Func>
148 where
149 Self: Sized,
150 Func: FnMut(&Self::Item),
151 {
152 inspect::InspectBuilder { prev: self, func }
153 }
154
155 fn unzip<Si0, Si1, Item0, Item1>(self, sink0: Si0, sink1: Si1) -> Self::Output<Unzip<Si0, Si1>>
157 where
158 Self: Sized + SinkBuild<Item = (Item0, Item1)>,
159 Si0: Sink<Item0>,
160 Si1: Sink<Item1>,
161 Si0::Error: From<Si1::Error>,
162 {
163 self.send_to(Unzip::new(sink0, sink1))
164 }
165
166 #[cfg(feature = "std")]
170 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
171 fn demux_map<Key, ItemVal, Si>(
172 self,
173 sinks: impl Into<std::collections::HashMap<Key, Si>>,
174 ) -> Self::Output<demux_map::DemuxMap<Key, Si>>
175 where
176 Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
177 Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
178 Si: Sink<ItemVal> + Unpin,
179 {
180 self.send_to(demux_map(sinks))
181 }
182
183 #[cfg(feature = "variadics")]
185 #[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
186 fn demux_var<Sinks, ItemVal, Error>(
187 self,
188 sinks: Sinks,
189 ) -> Self::Output<demux_var::DemuxVar<Sinks, Error>>
190 where
191 Self: Sized + SinkBuild<Item = (usize, ItemVal)>,
192 Sinks: SinkVariadic<ItemVal, Error>,
193 {
194 self.send_to(demux_var(sinks))
195 }
196}
197
198pub struct SinkBuilder<Item>(PhantomData<fn() -> Item>);
200impl<Item> Default for SinkBuilder<Item> {
201 fn default() -> Self {
202 Self(PhantomData)
203 }
204}
205impl<Item> SinkBuilder<Item> {
206 pub fn new() -> Self {
208 Self::default()
209 }
210}
211impl<Item> SinkBuild for SinkBuilder<Item> {
212 type Item = Item;
213
214 type Output<Next: Sink<Self::Item>> = Next;
215 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
216 where
217 Next: Sink<Self::Item>,
218 {
219 next
220 }
221}
222
223pub trait ToSinkBuild {
225 fn iter_to_sink_build(self) -> send_iter::SendIterBuild<Self>
227 where
228 Self: Sized + Iterator,
229 {
230 send_iter::SendIterBuild { iter: self }
231 }
232
233 fn stream_to_sink_build(self) -> send_stream::SendStreamBuild<Self>
235 where
236 Self: Sized + Stream,
237 {
238 send_stream::SendStreamBuild { stream: self }
239 }
240}
241impl<T> ToSinkBuild for T {}
242
243macro_rules! forward_sink {
245 (
246 $( $method:ident ),+
247 ) => {
248 $(
249 fn $method(self: ::core::pin::Pin<&mut Self>, cx: &mut ::core::task::Context<'_>) -> ::core::task::Poll<::core::result::Result<(), Self::Error>> {
250 self.project().sink.$method(cx)
251 }
252 )+
253 }
254}
255use forward_sink;
256
257macro_rules! ready_both {
259 ($a:expr, $b:expr $(,)?) => {
260 if !matches!(
261 ($a, $b),
262 (::core::task::Poll::Ready(()), ::core::task::Poll::Ready(())),
263 ) {
264 return ::core::task::Poll::Pending;
265 }
266 };
267}
268use ready_both;
269
270pub fn map<Func, In, Out, Si>(func: Func, sink: Si) -> Map<Si, Func>
272where
273 Func: FnMut(In) -> Out,
274 Si: Sink<Out>,
275{
276 Map::new(func, sink)
277}
278
279pub fn filter<Func, Item, Si>(func: Func, sink: Si) -> Filter<Si, Func>
281where
282 Func: FnMut(&Item) -> bool,
283 Si: Sink<Item>,
284{
285 Filter::new(func, sink)
286}
287
288pub fn filter_map<Func, In, Out, Si>(func: Func, sink: Si) -> FilterMap<Si, Func>
290where
291 Func: FnMut(In) -> Option<Out>,
292 Si: Sink<Out>,
293{
294 FilterMap::new(func, sink)
295}
296
297pub fn flat_map<Func, In, IntoIter, Si>(func: Func, sink: Si) -> FlatMap<Si, Func, IntoIter>
299where
300 Func: FnMut(In) -> IntoIter,
301 IntoIter: IntoIterator,
302 Si: Sink<IntoIter::Item>,
303{
304 FlatMap::new(func, sink)
305}
306
307pub fn flatten<IntoIter, Si>(sink: Si) -> Flatten<Si, IntoIter>
312where
313 IntoIter: IntoIterator,
314 Si: Sink<IntoIter::Item>,
315{
316 Flatten::new(sink)
317}
318
319pub fn inspect<Func, Item, Si>(func: Func, sink: Si) -> Inspect<Si, Func>
321where
322 Func: FnMut(&Item),
323 Si: Sink<Item>,
324{
325 Inspect::new(func, sink)
326}
327
328pub fn unzip<Si0, Si1, Item0, Item1>(sink0: Si0, sink1: Si1) -> Unzip<Si0, Si1>
330where
331 Si0: Sink<Item0>,
332 Si1: Sink<Item1>,
333 Si0::Error: From<Si1::Error>,
334{
335 Unzip::new(sink0, sink1)
336}
337
338pub fn for_each<Func, Item>(func: Func) -> ForEach<Func>
340where
341 Func: FnMut(Item),
342{
343 ForEach::new(func)
344}
345
346pub fn try_for_each<Func, Item, Error>(func: Func) -> TryForEach<Func>
348where
349 Func: FnMut(Item) -> Result<(), Error>,
350{
351 TryForEach::new(func)
352}
353
354pub fn send_iter<I, Si>(iter: I, sink: Si) -> SendIter<I::IntoIter, Si>
356where
357 I: IntoIterator,
358 Si: Sink<I::Item>,
359{
360 SendIter::new(iter.into_iter(), sink)
361}
362
363pub fn send_stream<St, Si>(stream: St, sink: Si) -> SendStream<St, Si>
365where
366 St: Stream,
367 Si: Sink<St::Item>,
368{
369 SendStream::new(stream, sink)
370}