1use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use pin_project_lite::pin_project;
6
7use crate::{Sink, SinkBuild};
8
9pin_project! {
10 #[must_use = "sinks do nothing unless polled"]
14 pub struct Inspect<Si, Func> {
15 #[pin]
16 sink: Si,
17 func: Func,
18 }
19}
20
21impl<Si, Func> Inspect<Si, Func> {
22 pub fn new<Item>(func: Func, sink: Si) -> Self
24 where
25 Self: Sink<Item>,
26 {
27 Self { sink, func }
28 }
29}
30
31impl<Si, Func, Item> Sink<Item> for Inspect<Si, Func>
32where
33 Si: Sink<Item>,
34 Func: FnMut(&Item),
35{
36 type Error = Si::Error;
37
38 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
39 self.project().sink.poll_ready(cx)
40 }
41 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
42 let this = self.project();
43 (this.func)(&item);
44 this.sink.start_send(item)
45 }
46 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 self.project().sink.poll_flush(cx)
48 }
49 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50 self.project().sink.poll_close(cx)
51 }
52}
53
54pub struct InspectBuilder<Prev, Func> {
56 pub(crate) prev: Prev,
57 pub(crate) func: Func,
58}
59impl<Prev, Func> SinkBuild for InspectBuilder<Prev, Func>
60where
61 Prev: SinkBuild,
62 Func: FnMut(&Prev::Item),
63{
64 type Item = Prev::Item;
65
66 type Output<Next: Sink<Prev::Item>> = Prev::Output<Inspect<Next, Func>>;
67
68 fn send_to<Next>(self, next: Next) -> Self::Output<Next>
69 where
70 Next: Sink<Prev::Item>,
71 {
72 self.prev.send_to(Inspect::new(self.func, next))
73 }
74}