1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#![cfg(not(target_arch = "wasm32"))]

use std::collections::HashMap;

use futures::{SinkExt, StreamExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

use crate::scheduled::graph::Dfir;
use crate::scheduled::graph_ext::GraphExt;
use crate::scheduled::handoff::VecHandoff;
use crate::scheduled::port::{RecvPort, SendPort};

pub type Address = String;

// These methods can't be wrapped up in a trait because async methods are not
// allowed in traits (yet).

impl Dfir<'_> {
    // TODO(justin): document these, but they're derivatives of inbound_tcp_vertex_internal.
    pub async fn inbound_tcp_vertex_port<T>(&mut self, port: u16) -> RecvPort<VecHandoff<T>>
    where
        T: 'static + DeserializeOwned + Send,
    {
        self.inbound_tcp_vertex_internal(Some(port)).await.1
    }

    pub async fn inbound_tcp_vertex<T>(&mut self) -> (u16, RecvPort<VecHandoff<T>>)
    where
        T: 'static + DeserializeOwned + Send,
    {
        self.inbound_tcp_vertex_internal(None).await
    }
    // TODO(justin): this needs to return a result/get rid of all the unwraps, I
    // guess we need a HydroflowError?
    /// Begins listening on some TCP port. Returns an [OutputPort] representing
    /// the stream of messages received. Currently there is no notion of
    /// identity to the connections received, if they are to be attached to some
    /// participant in the system, that needs to be included in the message
    /// directly.
    ///
    /// The messages will be interpreted to be bincode-encoded, length-delimited
    /// messages, as produced by [Self::outbound_tcp_vertex].
    async fn inbound_tcp_vertex_internal<T>(
        &mut self,
        port: Option<u16>,
    ) -> (u16, RecvPort<VecHandoff<T>>)
    where
        T: 'static + DeserializeOwned + Send,
    {
        let listener = TcpListener::bind(format!("localhost:{}", port.unwrap_or(0)))
            .await
            .unwrap();
        let port = listener.local_addr().unwrap().port();

        // TODO(justin): figure out an appropriate buffer here.
        let (incoming_send, incoming_messages) = futures::channel::mpsc::channel(1024);

        // Listen to incoming connections and spawn a tokio task for each one,
        // which feeds into the channel.
        // TODO(justin): give some way to get a handle into this thing.
        tokio::spawn(async move {
            loop {
                let (socket, _) = listener.accept().await.unwrap();
                let (reader, _) = socket.into_split();
                let mut reader = FramedRead::new(reader, LengthDelimitedCodec::new());
                let mut incoming_send = incoming_send.clone();
                tokio::spawn(async move {
                    while let Some(msg) = reader.next().await {
                        // TODO(justin): figure out error handling here.
                        let msg = msg.unwrap();
                        let out = bincode::deserialize(&msg).unwrap();
                        incoming_send.send(out).await.unwrap();
                    }
                    // TODO(justin): The connection is closed, so we should
                    // clean up its metadata.
                });
            }
        });

        let (send_port, recv_port) = self.make_edge("tcp ingress handoff");
        self.add_input_from_stream("tcp ingress stream", send_port, incoming_messages.map(Some));

        (port, recv_port)
    }

    pub async fn outbound_tcp_vertex<T>(&mut self) -> SendPort<VecHandoff<(Address, T)>>
    where
        T: 'static + Serialize + Send,
    {
        let (mut connection_reqs_send, mut connection_reqs_recv) =
            futures::channel::mpsc::channel(1024);
        let (mut connections_send, mut connections_recv) = futures::channel::mpsc::channel(1024);

        // TODO(justin): handle errors here.
        // Spawn an actor which establishes connections.
        tokio::spawn(async move {
            while let Some(addr) = connection_reqs_recv.next().await {
                let addr: Address = addr;
                connections_send
                    .send((addr.clone(), TcpStream::connect(addr.clone()).await))
                    .await
                    .unwrap();
            }
        });

        enum ConnStatus<T> {
            Pending(Vec<T>),
            Connected(FramedWrite<TcpStream, LengthDelimitedCodec>),
        }

        let (mut outbound_messages_send, mut outbound_messages_recv) =
            futures::channel::mpsc::channel(1024);
        tokio::spawn(async move {
            // TODO(justin): this cache should be global to the entire Hydroflow
            // instance so we can reuse connections from inbound connections.
            let mut connections = HashMap::<Address, ConnStatus<T>>::new();

            loop {
                tokio::select! {
                    Some((addr, msg)) = outbound_messages_recv.next() => {
                        let addr: Address = addr;
                        let msg: T = msg;
                        match connections.get_mut(&addr) {
                            None => {
                                // We have not seen this address before, open a
                                // connection to it and buffer the message to be
                                // sent once it's open.

                                // TODO(justin): what do we do if the buffer is full here?
                                connection_reqs_send.try_send(addr.clone()).unwrap();
                                connections.insert(addr, ConnStatus::Pending(vec![msg]));
                            }
                            Some(ConnStatus::Pending(msgs)) => {
                                // We have seen this address before but we're
                                // still trying to connect to it, so buffer this
                                // message so that when we _do_ connect we will
                                // send it.
                                msgs.push(msg);
                            }
                            Some(ConnStatus::Connected(conn)) => {
                                // TODO(justin): move the actual sending here
                                // into a different task so we don't have to
                                // wait for the send.
                                let msg = bincode::serialize(&msg).unwrap();
                                conn.send(msg.into()).await.unwrap();
                            }
                        }
                    },

                    Some((addr, conn)) = connections_recv.next() => {
                        match conn {
                            Ok(conn) => {
                                match connections.get_mut(&addr) {
                                    Some(ConnStatus::Pending(msgs)) => {
                                        let mut conn = FramedWrite::new(conn, LengthDelimitedCodec::new());
                                        for msg in msgs.drain(..) {
                                            // TODO(justin): move the actual sending here
                                            // into a different task so we don't have to
                                            // wait for the send.
                                            let msg = bincode::serialize(&msg).unwrap();
                                            conn.send(msg.into()).await.unwrap();
                                        }
                                        connections.insert(addr, ConnStatus::Connected(conn));
                                    }
                                    None => {
                                        // This means nobody ever requested this
                                        // connection, so we shouldn't have initiated it
                                        // in the first place.
                                        unreachable!()
                                    }
                                    Some(ConnStatus::Connected(_tcp)) => {
                                        // This means we were already connected, so we
                                        // shouldn't have connected again. If the
                                        // connection cache becomes shared this could
                                        // become reachable.
                                        unreachable!()
                                    }
                                }
                            }
                            Err(e) => {
                                // We couldn't connect to the address for some
                                // reason.
                                // TODO(justin): once we have a clearer picture
                                // of error handling, we could do something like
                                // send this error along a pipe to be handled by
                                // someone else. For now, just log it and drop
                                // any pending messages.
                                eprintln!("couldn't connect to {}: {}", addr, e);
                                connections.remove(&addr);
                            }
                        }
                    },
                    else => break,
                }
            }
        });

        let mut buffered_messages = Vec::new();
        let mut next_messages = Vec::new();
        let (input_port, output_port) = self.make_edge("tcp egress handoff");
        self.add_subgraph_sink("tcp egress stream", output_port, move |_ctx, recv| {
            buffered_messages.extend(recv.take_inner());
            for msg in buffered_messages.drain(..) {
                if let Err(e) = outbound_messages_send.try_send(msg) {
                    // If we weren't able to send a message (say, because the
                    // buffer is full), we get handed it back in the error. If
                    // this happens we hang onto the message to try sending it
                    // again next time.
                    next_messages.push(e.into_inner());
                }
            }

            // NB. we don't need to flush the channel here due to the use of
            // `try_send`.  It's guaranteed that there was space for the
            // messages and that they were sent.

            // TODO(justin): we do need to make sure we get rescheduled if
            // next_messages is empty here.

            std::mem::swap(&mut buffered_messages, &mut next_messages);
        });

        input_port
    }
}