1use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::{Sink, SinkBuild, forward_sink};
7
8pin_project! {
9 #[must_use = "sinks do nothing unless polled"]
13 pub struct Map<Si, Func> {
14 #[pin]
15 sink: Si,
16 func: Func,
17 }
18}
19
20impl<Si, Func> Map<Si, Func> {
21 pub fn new<Item>(func: Func, sink: Si) -> Self
23 where
24 Self: Sink<Item>,
25 {
26 Self { sink, func }
27 }
28}
29
30impl<Si, Func, Item, ItemOut> Sink<Item> for Map<Si, Func>
31where
32 Si: Sink<ItemOut>,
33 Func: FnMut(Item) -> ItemOut,
34{
35 type Error = Si::Error;
36
37 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
38 let this = self.project();
39 let item = (this.func)(item);
40 this.sink.start_send(item)
41 }
42
43 forward_sink!(poll_ready, poll_flush, poll_close);
44}
45
46pub struct MapBuilder<Prev, Func> {
48 pub(crate) prev: Prev,
49 pub(crate) func: Func,
50}
51impl<Prev, ItemOut, Func> SinkBuild for MapBuilder<Prev, Func>
52where
53 Prev: SinkBuild,
54 Func: FnMut(Prev::Item) -> ItemOut,
55{
56 type Item = ItemOut;
57
58 type Output<Next: Sink<ItemOut>> = Prev::Output<Map<Next, Func>>;
59
60 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
61 where
62 Next: Sink<ItemOut>,
63 {
64 self.prev.send_to(Map::new(self.func, next))
65 }
66}