1use core::fmt::Debug;
3use core::hash::Hash;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use std::collections::HashMap;
7
8use crate::{Sink, ready_both};
9
10pub struct DemuxMap<Key, Si> {
12 sinks: HashMap<Key, Si>,
13}
14
15impl<Key, Si> DemuxMap<Key, Si> {
16 pub fn new<Item>(sinks: impl Into<HashMap<Key, Si>>) -> Self
18 where
19 Self: Sink<(Key, Item)>,
20 {
21 Self {
22 sinks: sinks.into(),
23 }
24 }
25}
26
27impl<Key, Si, Item> Sink<(Key, Item)> for DemuxMap<Key, Si>
28where
29 Key: Eq + Hash + Debug + Unpin,
30 Si: Sink<Item> + Unpin,
31{
32 type Error = Si::Error;
33
34 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35 self.get_mut()
36 .sinks
37 .values_mut()
38 .try_fold(Poll::Ready(()), |poll, sink| {
39 ready_both!(poll, Pin::new(sink).poll_ready(cx)?);
40 Poll::Ready(Ok(()))
41 })
42 }
43
44 fn start_send(self: Pin<&mut Self>, item: (Key, Item)) -> Result<(), Self::Error> {
45 let sink = self
46 .get_mut()
47 .sinks
48 .get_mut(&item.0)
49 .unwrap_or_else(|| panic!("`DemuxMap` missing key {:?}", item.0));
50 Pin::new(sink).start_send(item.1)
51 }
52
53 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54 self.get_mut()
55 .sinks
56 .values_mut()
57 .try_fold(Poll::Ready(()), |poll, sink| {
58 ready_both!(poll, Pin::new(sink).poll_flush(cx)?);
59 Poll::Ready(Ok(()))
60 })
61 }
62
63 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64 self.get_mut()
65 .sinks
66 .values_mut()
67 .try_fold(Poll::Ready(()), |poll, sink| {
68 ready_both!(poll, Pin::new(sink).poll_close(cx)?);
69 Poll::Ready(Ok(()))
70 })
71 }
72}
73
74pub fn demux_map<Key, Si, Item>(sinks: impl Into<HashMap<Key, Si>>) -> DemuxMap<Key, Si>
78where
79 Key: Eq + Hash + Debug + Unpin,
80 Si: Sink<Item> + Unpin,
81{
82 DemuxMap::new(sinks)
83}