Skip to main content

hydro_lang/networking/
mod.rs

1//! Types for configuring network channels with serialization formats, transports, etc.
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
9use crate::nondet::NonDet;
10
11#[sealed::sealed]
12trait SerKind<T: ?Sized> {
13    fn serialize_thunk(is_demux: bool) -> syn::Expr;
14
15    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr;
16}
17
18/// Serialize items using the [`bincode`] crate.
19pub enum Bincode {}
20
21#[sealed::sealed]
22impl<T: Serialize + DeserializeOwned> SerKind<T> for Bincode {
23    fn serialize_thunk(is_demux: bool) -> syn::Expr {
24        serialize_bincode::<T>(is_demux)
25    }
26
27    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
28        deserialize_bincode::<T>(tagged)
29    }
30}
31
32/// An unconfigured serialization backend.
33pub enum NoSer {}
34
35#[sealed::sealed]
36trait TransportKind {
37    /// Returns the [`NetworkingInfo`] describing this transport's configuration.
38    fn networking_info() -> NetworkingInfo;
39}
40
41#[sealed::sealed]
42#[diagnostic::on_unimplemented(
43    message = "TCP transport requires a failure policy. For example, `TCP.fail_stop()` stops sending messages after a failed connection."
44)]
45trait TcpFailPolicy {
46    /// Returns the [`TcpFault`] variant for this failure policy.
47    fn tcp_fault() -> TcpFault;
48}
49
50/// A TCP failure policy that stops sending messages after a failed connection.
51pub enum FailStop {}
52#[sealed::sealed]
53impl TcpFailPolicy for FailStop {
54    fn tcp_fault() -> TcpFault {
55        TcpFault::FailStop
56    }
57}
58
59/// A TCP failure policy that allows messages to be lost.
60pub enum Lossy {}
61#[sealed::sealed]
62impl TcpFailPolicy for Lossy {
63    fn tcp_fault() -> TcpFault {
64        TcpFault::Lossy
65    }
66}
67
68/// Send items across a length-delimited TCP channel.
69pub struct Tcp<F> {
70    _phantom: PhantomData<F>,
71}
72
73#[sealed::sealed]
74impl<F: TcpFailPolicy> TransportKind for Tcp<F> {
75    fn networking_info() -> NetworkingInfo {
76        NetworkingInfo::Tcp {
77            fault: F::tcp_fault(),
78        }
79    }
80}
81
82/// A networking backend implementation that supports items of type `T`.
83#[sealed::sealed]
84pub trait NetworkFor<T: ?Sized> {
85    /// Generates serialization logic for sending `T`.
86    fn serialize_thunk(is_demux: bool) -> syn::Expr;
87
88    /// Generates deserialization logic for receiving `T`.
89    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr;
90
91    /// Returns the optional name of the network channel.
92    fn name(&self) -> Option<&str>;
93
94    /// Returns the [`NetworkingInfo`] describing this network channel's transport and fault model.
95    fn networking_info() -> NetworkingInfo;
96}
97
98/// The fault model for a TCP connection.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
100pub enum TcpFault {
101    /// Stops sending messages after a failed connection.
102    FailStop,
103    /// Messages may be lost (e.g. due to network partitions).
104    Lossy,
105}
106
107/// Describes the networking configuration for a network channel at the IR level.
108#[derive(Debug, Clone, PartialEq, Eq, Hash)]
109pub enum NetworkingInfo {
110    /// A TCP-based network channel with a specific fault model.
111    Tcp {
112        /// The fault model for this TCP connection.
113        fault: TcpFault,
114    },
115}
116
117/// A network channel configuration with `T` as transport backend and `S` as the serialization
118/// backend.
119pub struct NetworkingConfig<Tr: ?Sized, S: ?Sized, Name = ()> {
120    name: Option<Name>,
121    _phantom: (PhantomData<Tr>, PhantomData<S>),
122}
123
124impl<Tr: ?Sized, S: ?Sized> NetworkingConfig<Tr, S> {
125    /// Names the network channel and enables stable communication across multiple service versions.
126    pub fn name(self, name: impl Into<String>) -> NetworkingConfig<Tr, S, String> {
127        NetworkingConfig {
128            name: Some(name.into()),
129            _phantom: (PhantomData, PhantomData),
130        }
131    }
132}
133
134impl<Tr: ?Sized, N> NetworkingConfig<Tr, NoSer, N> {
135    /// Configures the network channel to use [`bincode`] to serialize items.
136    pub const fn bincode(mut self) -> NetworkingConfig<Tr, Bincode, N> {
137        let taken_name = self.name.take();
138        std::mem::forget(self); // nothing else is stored
139        NetworkingConfig {
140            name: taken_name,
141            _phantom: (PhantomData, PhantomData),
142        }
143    }
144}
145
146impl<S: ?Sized> NetworkingConfig<Tcp<()>, S> {
147    /// Configures the TCP transport to stop sending messages after a failed connection.
148    ///
149    /// Note that the Hydro simulator will not simulate connection failures that impact the
150    /// *liveness* of a program. If an output assertion depends on a `fail_stop` channel
151    /// making progress, that channel will not experience a failure that would cause the test to
152    /// block indefinitely. However, any *safety* issues caused by connection failures will still
153    /// be caught, such as a race condition between a failed connection and some other message.
154    pub const fn fail_stop(self) -> NetworkingConfig<Tcp<FailStop>, S> {
155        NetworkingConfig {
156            name: self.name,
157            _phantom: (PhantomData, PhantomData),
158        }
159    }
160
161    /// Configures the TCP transport to allow messages to be lost.
162    ///
163    /// This is appropriate for networks where messages may be dropped, such as when
164    /// running under a Maelstrom partition nemesis. Unlike `fail_stop`, which guarantees
165    /// a prefix of messages is delivered, `lossy` makes no such guarantee.
166    ///
167    /// # Non-Determinism
168    /// A lossy TCP channel will non-deterministically drop messages during execution.
169    pub const fn lossy(self, nondet: NonDet) -> NetworkingConfig<Tcp<Lossy>, S> {
170        let _ = nondet;
171        NetworkingConfig {
172            name: self.name,
173            _phantom: (PhantomData, PhantomData),
174        }
175    }
176}
177
178#[sealed::sealed]
179impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S>
180where
181    Tr: TransportKind,
182    S: SerKind<T>,
183{
184    fn serialize_thunk(is_demux: bool) -> syn::Expr {
185        S::serialize_thunk(is_demux)
186    }
187
188    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
189        S::deserialize_thunk(tagged)
190    }
191
192    fn name(&self) -> Option<&str> {
193        None
194    }
195
196    fn networking_info() -> NetworkingInfo {
197        Tr::networking_info()
198    }
199}
200
201#[sealed::sealed]
202impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S, String>
203where
204    Tr: TransportKind,
205    S: SerKind<T>,
206{
207    fn serialize_thunk(is_demux: bool) -> syn::Expr {
208        S::serialize_thunk(is_demux)
209    }
210
211    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
212        S::deserialize_thunk(tagged)
213    }
214
215    fn name(&self) -> Option<&str> {
216        self.name.as_deref()
217    }
218
219    fn networking_info() -> NetworkingInfo {
220        Tr::networking_info()
221    }
222}
223
224/// A network channel that uses length-delimited TCP for transport.
225pub const TCP: NetworkingConfig<Tcp<()>, NoSer> = NetworkingConfig {
226    name: None,
227    _phantom: (PhantomData, PhantomData),
228};