sinktools/
demux_map.rs

1//! [`DemuxMap`] and related items.
2use core::fmt::Debug;
3use core::hash::Hash;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use std::collections::HashMap;
7
8use crate::{Sink, ready_both};
9
10/// Sink which receives keys paired with items `(Key, Item)`, and pushes to the corresponding output sink in a [`HashMap`] of sinks.
11pub struct DemuxMap<Key, Si> {
12    sinks: HashMap<Key, Si>,
13}
14
15impl<Key, Si> DemuxMap<Key, Si> {
16    /// Create with the given next `sinks` map.
17    pub fn new<Item>(sinks: impl Into<HashMap<Key, Si>>) -> Self
18    where
19        Self: Sink<(Key, Item)>,
20    {
21        Self {
22            sinks: sinks.into(),
23        }
24    }
25}
26
27impl<Key, Si, Item> Sink<(Key, Item)> for DemuxMap<Key, Si>
28where
29    Key: Eq + Hash + Debug + Unpin,
30    Si: Sink<Item> + Unpin,
31{
32    type Error = Si::Error;
33
34    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35        self.get_mut()
36            .sinks
37            .values_mut()
38            .try_fold(Poll::Ready(()), |poll, sink| {
39                ready_both!(poll, Pin::new(sink).poll_ready(cx)?);
40                Poll::Ready(Ok(()))
41            })
42    }
43
44    fn start_send(self: Pin<&mut Self>, item: (Key, Item)) -> Result<(), Self::Error> {
45        let sink = self
46            .get_mut()
47            .sinks
48            .get_mut(&item.0)
49            .unwrap_or_else(|| panic!("`DemuxMap` missing key {:?}", item.0));
50        Pin::new(sink).start_send(item.1)
51    }
52
53    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54        self.get_mut()
55            .sinks
56            .values_mut()
57            .try_fold(Poll::Ready(()), |poll, sink| {
58                ready_both!(poll, Pin::new(sink).poll_flush(cx)?);
59                Poll::Ready(Ok(()))
60            })
61    }
62
63    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64        self.get_mut()
65            .sinks
66            .values_mut()
67            .try_fold(Poll::Ready(()), |poll, sink| {
68                ready_both!(poll, Pin::new(sink).poll_close(cx)?);
69                Poll::Ready(Ok(()))
70            })
71    }
72}
73
74/// Creates a `DemuxMap` sink that sends each item to one of many outputs, depending on the key.
75///
76/// This requires sinks `Si` to be `Unpin`. If your sinks are not `Unpin`, first wrap them in `Box::pin` to make them `Unpin`.
77pub fn demux_map<Key, Si, Item>(sinks: impl Into<HashMap<Key, Si>>) -> DemuxMap<Key, Si>
78where
79    Key: Eq + Hash + Debug + Unpin,
80    Si: Sink<Item> + Unpin,
81{
82    DemuxMap::new(sinks)
83}