gossip_kv/
server.rs

1use std::collections::{HashMap, HashSet};
2use std::fmt::Debug;
3use std::hash::Hash;
4
5use dfir_rs::dfir_syntax;
6use dfir_rs::futures::{Sink, Stream};
7use dfir_rs::itertools::Itertools;
8use dfir_rs::lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap};
9use dfir_rs::lattices::set_union::SetUnionHashSet;
10use dfir_rs::lattices::{Lattice, PairBimorphism};
11use dfir_rs::scheduled::graph::Dfir;
12use lattices::set_union::SetUnion;
13use lattices::{IsTop, Max, Pair};
14use lazy_static::lazy_static;
15use prometheus::{IntCounter, register_int_counter};
16use rand::seq::IteratorRandom;
17use rand::thread_rng;
18use serde::de::DeserializeOwned;
19use serde::{Deserialize, Serialize};
20use tracing::{info, trace};
21
22use crate::GossipMessage::{Ack, Nack};
23use crate::lattices::BoundedSetLattice;
24use crate::membership::{MemberData, MemberId};
25use crate::model::{
26    Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName, delete_row, upsert_row,
27};
28use crate::util::{ClientRequestWithAddress, GossipRequestWithAddress};
29use crate::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace};
30
31/// A trait that represents an abstract network address. In production, this will typically be
32/// SocketAddr.
33pub trait Address: Hash + Debug + Clone + Eq + Serialize {}
34impl<A> Address for A where A: Hash + Debug + Clone + Eq + Serialize {}
35
36#[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
37pub struct SeedNode<A>
38where
39    A: Address,
40{
41    pub id: MemberId,
42    pub address: A,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Lattice)]
46pub struct InfectingWrite {
47    write: Namespaces<Clock>,
48    members: BoundedSetLattice<MemberId, 2>,
49}
50
51pub type MessageId = String;
52
53lazy_static! {
54    pub static ref SETS_COUNTER: IntCounter =
55        register_int_counter!("sets", "Counts the number of SET requests processed.").unwrap();
56}
57
58/// Creates a L0 key-value store server using DFIR.
59///
60/// # Arguments
61/// -- `client_inputs`: The input stream of client requests for the client protocol.
62/// -- `client_outputs`: The output sink of client responses for the client protocol.
63/// -- `member_info`: The membership information of the server.
64/// -- `seed_nodes`: A list of seed nodes that can be used to bootstrap the gossip cluster.
65#[expect(clippy::too_many_arguments)]
66pub fn server<
67    ClientInput,
68    ClientOutput,
69    ClientOutputError,
70    GossipInput,
71    GossipOutput,
72    GossipOutputError,
73    GossipTrigger,
74    SeedNodeStream,
75    Addr,
76>(
77    client_inputs: ClientInput,
78    client_outputs: ClientOutput,
79    gossip_inputs: GossipInput,
80    gossip_outputs: GossipOutput,
81    gossip_trigger: GossipTrigger,
82    member_info: MemberData<Addr>,
83    seed_nodes: Vec<SeedNode<Addr>>,
84    seed_node_stream: SeedNodeStream,
85) -> Dfir<'static>
86where
87    ClientInput: Stream<Item = (ClientRequest, Addr)> + Unpin + 'static,
88    ClientOutput: Sink<(ClientResponse, Addr), Error = ClientOutputError> + Unpin + 'static,
89    GossipInput: Stream<Item = (GossipMessage, Addr)> + Unpin + 'static,
90    GossipOutput: Sink<(GossipMessage, Addr), Error = GossipOutputError> + Unpin + 'static,
91    GossipTrigger: Stream<Item = ()> + Unpin + 'static,
92    SeedNodeStream: Stream<Item = Vec<SeedNode<Addr>>> + Unpin + 'static,
93    Addr: Address + DeserializeOwned + 'static,
94    ClientOutputError: Debug + 'static,
95    GossipOutputError: Debug + 'static,
96{
97    let my_member_id = member_info.id.clone();
98    // TODO: This is ugly, but the only way this works at the moment.
99    let member_id_2 = my_member_id.clone();
100    let member_id_3 = my_member_id.clone();
101    let member_id_4 = my_member_id.clone();
102    let member_id_5 = my_member_id.clone();
103    let member_id_6 = my_member_id.clone();
104
105    dfir_syntax! {
106
107        on_start = initialize() -> tee();
108        on_start -> for_each(|_| info!("{:?}: Process {} started.", context.current_tick(), member_id_6));
109
110        seed_nodes = source_stream(seed_node_stream)
111            -> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec<SeedNode<Addr>>| {
112                **last_seed_nodes = new_seed_nodes;
113                info!("Updated seed nodes: {:?}", **last_seed_nodes);
114            });
115
116        // Setup member metadata for this process.
117        on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap()))
118            -> writes;
119
120        client_out =
121            inspect(|(resp, addr)| trace!("{:?}: Sending response: {:?} to {:?}.", context.current_tick(), resp, addr))
122            -> dest_sink(client_outputs);
123
124        client_in = source_stream(client_inputs)
125            -> map(|(msg, addr)| ClientRequestWithAddress::from_request_and_address(msg, addr))
126            -> demux_enum::<ClientRequestWithAddress<Addr>>();
127
128        client_in[Get]
129            -> inspect(|req| trace!("{:?}: Received Get request: {:?}.", context.current_tick(), req))
130            -> map(|(key, addr) : (Key, Addr)| {
131                let row = MapUnionHashMap::new_from([
132                        (
133                            key.row_key,
134                            SetUnionHashSet::new_from([addr /* to respond with the result later*/])
135                        ),
136                ]);
137                let table = MapUnionHashMap::new_from([(key.table, row)]);
138                MapUnionHashMap::new_from([(key.namespace, table)])
139            })
140            -> reads;
141
142        client_in[Set]
143            -> inspect(|request| trace!("{:?}: Received Set request: {:?}.", context.current_tick(), request))
144            -> map(|(key, value, _addr) : (Key, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key, value))
145            -> inspect(|_| {
146                SETS_COUNTER.inc(); // Bump SET metrics
147            })
148            -> writes;
149
150        client_in[Delete]
151            -> inspect(|req| trace!("{:?}: Received Delete request: {:?}.", context.current_tick(), req))
152            -> map(|(key, _addr) : (Key, Addr)| delete_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key))
153            -> writes;
154
155        gossip_in = source_stream(gossip_inputs)
156            -> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr))
157            -> demux_enum::<GossipRequestWithAddress<Addr>>();
158
159        incoming_gossip_messages = gossip_in[Gossip]
160            -> inspect(|request| trace!("{:?}: Received gossip request: {:?}.", context.current_tick(), request))
161            -> tee();
162
163        gossip_in[Ack]
164            -> inspect(|request| trace!("{:?}: Received gossip ack: {:?}.", context.current_tick(), request))
165            -> null();
166
167        gossip_in[Nack]
168            -> inspect(|request| trace!("{:?}: Received gossip nack: {:?}.", context.current_tick(), request))
169            -> map( |(message_id, member_id, _addr)| {
170                MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: Default::default(), members: BoundedSetLattice::new_from([member_id]) }))
171            })
172            -> infecting_writes;
173
174        gossip_out = union() -> dest_sink(gossip_outputs);
175
176        incoming_gossip_messages
177            -> map(|(_msg_id, _member_id, writes, _addr)| writes )
178            -> writes;
179
180        gossip_processing_pipeline = incoming_gossip_messages
181            -> map(|(msg_id, _member_id, writes, sender_address) : (String, MemberId, Namespaces<Max<u64>>, Addr)| {
182                let namespaces = &#namespaces;
183                let all_data: &HashMap<Namespace, TableMap<RowValue<Clock>>> = namespaces.as_reveal_ref();
184                let possible_new_data: &HashMap<Namespace, TableMap<RowValue<Max<u64>>>>= writes.as_reveal_ref();
185
186                // Check if any of the data is new
187                /* TODO: This logic is duplicated in MapUnion::Merge and ideally should be accessed
188                   from the pass-through streaming output from `state`. See
189                   https://www.notion.so/hydro-project/Proposal-for-State-API-10a2a586262f8080b981d1a2948a69ac
190                   for more. */
191                let gossip_has_new_data = possible_new_data.iter()
192                    .flat_map(|(namespace, tables)| {
193                        tables.as_reveal_ref().iter().flat_map(move |(table, rows)|{
194                            rows.as_reveal_ref().iter().map(move |(row_key, row_value)| (namespace, table, row_key, row_value.as_reveal_ref().0.as_reveal_ref()))
195                        })
196                    })
197                    .any(|(ns,table, row_key, new_ts)| {
198                        let existing_tables = all_data.get(ns);
199                        let existing_rows = existing_tables.and_then(|tables| tables.as_reveal_ref().get(table));
200                        let existing_row = existing_rows.and_then(|rows| rows.as_reveal_ref().get(row_key));
201                        let existing_ts = existing_row.map(|row| row.as_reveal_ref().0.as_reveal_ref());
202
203                        if let Some(existing_ts) = existing_ts {
204                            trace!("Comparing timestamps: {:?} vs {:?}", new_ts, existing_ts);
205                            new_ts > existing_ts
206                        } else {
207                            true
208                        }
209                    });
210
211                if gossip_has_new_data {
212                    (Ack { message_id: msg_id, member_id: member_id_2.clone()}, sender_address, Some(writes))
213                } else {
214                    (Nack { message_id: msg_id, member_id: member_id_3.clone()}, sender_address, None)
215                }
216             })
217            -> tee();
218
219        gossip_processing_pipeline
220            -> map(|(response, address, _writes)| (response, address))
221            -> inspect( |(msg, addr)| trace!("{:?}: Sending gossip response: {:?} to {:?}.", context.current_tick(), msg, addr))
222            -> gossip_out;
223
224        gossip_processing_pipeline
225            -> filter(|(_, _, writes)| writes.is_some())
226            -> map(|(_, _, writes)| writes.unwrap())
227            -> writes;
228
229        writes = union();
230
231        writes -> namespaces;
232
233        namespaces = state::<'static, Namespaces::<Clock>>();
234        new_writes = namespaces -> tee(); // TODO: Use the output from here to generate NACKs / ACKs
235
236        reads = state::<'tick, MapUnionHashMap<Namespace, MapUnionHashMap<TableName, MapUnionHashMap<RowKey, SetUnionHashSet<Addr>>>>>();
237
238        new_writes -> [0]process_system_table_reads;
239        reads -> [1]process_system_table_reads;
240
241        process_system_table_reads = lattice_bimorphism(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(PairBimorphism))), #namespaces, #reads)
242            -> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydro/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick.
243            -> flat_map(|result: NamespaceMap<Pair<RowValue<Clock>, SetUnion<HashSet<Addr>>>>| {
244
245                let mut response: Vec<(ClientResponse, Addr)> = vec![];
246
247                    let result = result.as_reveal_ref();
248
249                    for (namespace, tables) in result.iter() {
250                        for (table_name, table) in tables.as_reveal_ref().iter() {
251                            for (row_key, join_results) in table.as_reveal_ref().iter() {
252                                let key = Key {
253                                    namespace: *namespace,
254                                    table: table_name.clone(),
255                                    row_key: row_key.clone(),
256                                };
257
258                                let timestamped_values = join_results.as_reveal_ref().0;
259                                let all_values = timestamped_values.as_reveal_ref().1.as_reveal_ref();
260
261                                let all_addresses = join_results.as_reveal_ref().1.as_reveal_ref();
262                                let socket_addr = all_addresses.iter().find_or_first(|_| true).unwrap();
263
264                                response.push((
265                                    ClientResponse::Get {key, value: all_values.clone()},
266                                    socket_addr.clone(),
267                            ));
268                        }
269                    }
270                }
271                response
272            }) -> client_out;
273
274        new_writes -> for_each(|x| trace!("NEW WRITE: {:?}", x));
275
276        // Step 1: Put the new writes in a map, with the write as the key and a SetBoundedLattice as the value.
277        infecting_writes = union() -> state_by::<'static, MapUnionHashMap<MessageId, InfectingWrite>>(std::convert::identity, std::default::Default::default);
278
279        new_writes -> map(|write| {
280            // Ideally, the write itself is the key, but writes are a hashmap and hashmaps don't
281            // have a hash implementation. So we just generate a GUID identifier for the write
282            // for now.
283            let id = uuid::Uuid::new_v4().to_string();
284            MapUnionSingletonMap::new_from((id, InfectingWrite { write, members: BoundedSetLattice::new() }))
285        }) -> infecting_writes;
286
287        gossip_trigger = source_stream(gossip_trigger);
288
289        gossip_messages = gossip_trigger
290        -> flat_map( |_|
291            {
292                let infecting_writes = #infecting_writes.as_reveal_ref().clone();
293                trace!("{:?}: Currently gossipping {} infecting writes.", context.current_tick(), infecting_writes.iter().filter(|(_, write)| !write.members.is_top()).count());
294                infecting_writes
295            }
296        )
297        -> filter(|(_id, infecting_write)| !infecting_write.members.is_top())
298        -> map(|(id, infecting_write)| {
299            trace!("{:?}: Choosing a peer to gossip to. {:?}:{:?}", context.current_tick(), id, infecting_write);
300            let peers = #namespaces.as_reveal_ref().get(&Namespace::System).unwrap().as_reveal_ref().get("members").unwrap().as_reveal_ref().clone();
301
302            let mut peer_names = HashSet::new();
303            peers.iter().for_each(|(row_key, _)| {
304                peer_names.insert(row_key.clone());
305            });
306
307            let seed_nodes = &#seed_nodes;
308            seed_nodes.iter().for_each(|seed_node| {
309                peer_names.insert(seed_node.id.clone());
310            });
311
312            // Exclude self from the list of peers.
313            peer_names.remove(&member_id_5);
314
315            trace!("{:?}: Peers: {:?}", context.current_tick(), peer_names);
316
317            let chosen_peer_name = peer_names.iter().choose(&mut thread_rng());
318
319            if chosen_peer_name.is_none() {
320                trace!("{:?}: No peers to gossip to.", context.current_tick());
321                return None;
322            }
323
324            let chosen_peer_name = chosen_peer_name.unwrap();
325            let gossip_address = if peers.contains_key(chosen_peer_name) {
326                let peer_info_value = peers.get(chosen_peer_name).unwrap().as_reveal_ref().1.as_reveal_ref().iter().next().unwrap().clone();
327                let peer_info_deserialized = serde_json::from_str::<MemberData<Addr>>(&peer_info_value).unwrap();
328                peer_info_deserialized.protocols.iter().find(|protocol| protocol.name == "gossip").unwrap().clone().endpoint
329            } else {
330                seed_nodes.iter().find(|seed_node| seed_node.id == *chosen_peer_name).unwrap().address.clone()
331            };
332
333            trace!("Chosen peer: {:?}:{:?}", chosen_peer_name, gossip_address);
334            Some((id, infecting_write, gossip_address))
335        })
336        -> flatten()
337        -> inspect(|(message_id, infecting_write, peer_gossip_address)| trace!("{:?}: Sending write:\nMessageId:{:?}\nWrite:{:?}\nPeer Address:{:?}", context.current_tick(), message_id, infecting_write, peer_gossip_address))
338        -> map(|(message_id, infecting_write, peer_gossip_address): (String, InfectingWrite, Addr)| {
339            let gossip_request = GossipMessage::Gossip {
340                message_id: message_id.clone(),
341                member_id: member_id_4.clone(),
342                writes: infecting_write.write.clone(),
343            };
344            (gossip_request, peer_gossip_address)
345        })
346        -> gossip_out;
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use std::collections::HashSet;
353
354    use dfir_rs::tokio_stream::empty;
355    use dfir_rs::util::simulation::{Address, Fleet, Hostname};
356
357    use super::*;
358    use crate::membership::{MemberDataBuilder, Protocol};
359
360    #[dfir_rs::test]
361    async fn test_member_init() {
362        let mut fleet = Fleet::new();
363
364        let server_name: Hostname = "server".to_string();
365
366        let server_client_address = Address::new(server_name.clone(), "client".to_string());
367        let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string());
368
369        let (_, gossip_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
370
371        // Create the kv server
372        fleet.add_host(server_name.clone(), |ctx| {
373            let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
374            let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
375
376            let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
377            let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
378
379            let member_data = MemberDataBuilder::new(server_name.clone())
380                .add_protocol(Protocol::new(
381                    "client".into(),
382                    server_client_address.clone(),
383                ))
384                .add_protocol(Protocol::new(
385                    "gossip".into(),
386                    server_gossip_address.clone(),
387                ))
388                .build();
389
390            server(
391                client_input,
392                client_output,
393                gossip_input,
394                gossip_output,
395                gossip_trigger_rx,
396                member_data,
397                vec![],
398                empty(),
399            )
400        });
401
402        let client_name: Hostname = "client".to_string();
403
404        let key = "/sys/members/server".parse::<Key>().unwrap();
405
406        let (trigger_tx, trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
407        let (response_tx, mut response_rx) = dfir_rs::util::unbounded_channel::<ClientResponse>();
408
409        let key_clone = key.clone();
410        let server_client_address_clone = server_client_address.clone();
411
412        fleet.add_host(client_name.clone(), |ctx| {
413            let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
414            let client_rx = ctx.new_inbox::<ClientResponse>("client".to_string());
415
416            dfir_syntax! {
417
418                client_output = dest_sink(client_tx);
419
420                source_stream(trigger_rx)
421                    -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) )
422                    -> client_output;
423
424                client_input = source_stream(client_rx)
425                    -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
426
427            }
428        });
429
430        // Send a trigger to the client to send a get request.
431        trigger_tx.send(()).unwrap();
432
433        let expected_member_data = MemberDataBuilder::new(server_name.clone())
434            .add_protocol(Protocol::new(
435                "client".to_string(),
436                server_client_address.clone(),
437            ))
438            .add_protocol(Protocol::new(
439                "gossip".to_string(),
440                server_gossip_address.clone(),
441            ))
442            .build();
443
444        loop {
445            fleet.run_single_tick_all_hosts().await;
446
447            let responses = dfir_rs::util::collect_ready_async::<Vec<_>, _>(&mut response_rx).await;
448
449            if !responses.is_empty() {
450                assert_eq!(
451                    responses,
452                    &[(ClientResponse::Get {
453                        key: key.clone(),
454                        value: HashSet::from([
455                            serde_json::to_string(&expected_member_data).unwrap()
456                        ])
457                    })]
458                );
459                break;
460            }
461        }
462    }
463
464    #[dfir_rs::test]
465    async fn test_multiple_values_same_tick() {
466        let mut fleet = Fleet::new();
467
468        let server_name: Hostname = "server".to_string();
469
470        let server_client_address = Address::new(server_name.clone(), "client".to_string());
471
472        let (_, gossip_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
473
474        // Create the kv server
475        fleet.add_host(server_name.clone(), |ctx| {
476            let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
477            let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
478
479            let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
480            let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
481            let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string());
482
483            let member_data = MemberDataBuilder::new(server_name.clone())
484                .add_protocol(Protocol::new(
485                    "client".into(),
486                    server_client_address.clone(),
487                ))
488                .add_protocol(Protocol::new(
489                    "gossip".into(),
490                    server_gossip_address.clone(),
491                ))
492                .build();
493
494            server(
495                client_input,
496                client_output,
497                gossip_input,
498                gossip_output,
499                gossip_trigger_rx,
500                member_data,
501                vec![],
502                empty(),
503            )
504        });
505
506        let key = Key {
507            namespace: Namespace::System,
508            table: "table".to_string(),
509            row_key: "row".to_string(),
510        };
511        let val_a = "A".to_string();
512        let val_b = "B".to_string();
513
514        let writer_name: Hostname = "writer".to_string();
515
516        let (writer_trigger_tx, writer_trigger_rx) = dfir_rs::util::unbounded_channel::<String>();
517        let key_clone = key.clone();
518        let server_client_address_clone = server_client_address.clone();
519
520        fleet.add_host(writer_name.clone(), |ctx| {
521            let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
522            dfir_syntax! {
523                client_output = dest_sink(client_tx);
524
525                source_stream(writer_trigger_rx)
526                    -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_client_address_clone.clone()) )
527                    -> client_output;
528            }
529        });
530
531        // Send two messages from the writer.
532        let writer = fleet.get_host_mut(&writer_name).unwrap();
533        writer_trigger_tx.send(val_a.clone()).unwrap();
534        writer.run_tick();
535
536        writer_trigger_tx.send(val_b.clone()).unwrap();
537        writer.run_tick();
538
539        // Transmit messages across the network.
540        fleet.process_network().await;
541
542        // Run the server.
543        let server = fleet.get_host_mut(&server_name).unwrap();
544        server.run_tick();
545
546        // Read the value back.
547        let reader_name: Hostname = "reader".to_string();
548
549        let (reader_trigger_tx, reader_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
550        let (response_tx, mut response_rx) = dfir_rs::util::unbounded_channel::<ClientResponse>();
551
552        let key_clone = key.clone();
553        let server_client_address_clone = server_client_address.clone();
554
555        fleet.add_host(reader_name.clone(), |ctx| {
556            let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
557            let client_rx = ctx.new_inbox::<ClientResponse>("client".to_string());
558
559            dfir_syntax! {
560                client_output = dest_sink(client_tx);
561
562                source_stream(reader_trigger_rx)
563                    -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) )
564                    -> client_output;
565
566                client_input = source_stream(client_rx)
567                    -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
568
569            }
570        });
571
572        reader_trigger_tx.send(()).unwrap();
573
574        loop {
575            fleet.run_single_tick_all_hosts().await;
576
577            let responses = dfir_rs::util::collect_ready_async::<Vec<_>, _>(&mut response_rx).await;
578
579            if !responses.is_empty() {
580                assert_eq!(
581                    responses,
582                    &[ClientResponse::Get {
583                        key,
584                        value: HashSet::from([val_a, val_b])
585                    }]
586                );
587                break;
588            }
589        }
590    }
591
592    #[dfir_rs::test]
593    async fn test_gossip() {
594        let subscriber = tracing_subscriber::FmtSubscriber::builder()
595            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
596            .with_test_writer()
597            .finish();
598
599        let _ = tracing::subscriber::set_global_default(subscriber);
600
601        let mut fleet = Fleet::new();
602
603        let server_a: Hostname = "server_a".to_string();
604        let server_b: Hostname = "server_b".to_string();
605
606        let server_a_client_address = Address::new(server_a.clone(), "client".to_string());
607        let server_b_client_address = Address::new(server_b.clone(), "client".to_string());
608
609        let server_a_gossip_address = Address::new(server_a.clone(), "gossip".to_string());
610        let server_b_gossip_address = Address::new(server_b.clone(), "gossip".to_string());
611
612        let seed_nodes = vec![
613            SeedNode {
614                id: server_a.clone(),
615                address: server_a_gossip_address.clone(),
616            },
617            SeedNode {
618                id: server_b.clone(),
619                address: server_b_gossip_address.clone(),
620            },
621        ];
622
623        let (gossip_trigger_tx_a, gossip_trigger_rx_a) = dfir_rs::util::unbounded_channel::<()>();
624
625        let seed_nodes_clone = seed_nodes.clone();
626        fleet.add_host(server_a.clone(), |ctx| {
627            let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
628            let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
629
630            let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
631            let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
632
633            let member_data = MemberDataBuilder::new(server_a.clone())
634                .add_protocol(Protocol::new(
635                    "client".into(),
636                    server_a_client_address.clone(),
637                ))
638                .add_protocol(Protocol::new(
639                    "gossip".into(),
640                    server_a_gossip_address.clone(),
641                ))
642                .build();
643
644            server(
645                client_input,
646                client_output,
647                gossip_input,
648                gossip_output,
649                gossip_trigger_rx_a,
650                member_data,
651                seed_nodes_clone,
652                empty(),
653            )
654        });
655
656        let (_, gossip_trigger_rx_b) = dfir_rs::util::unbounded_channel::<()>();
657
658        let seed_nodes_clone = seed_nodes.clone();
659        fleet.add_host(server_b.clone(), |ctx| {
660            let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
661            let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
662
663            let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
664            let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
665
666            let member_data = MemberDataBuilder::new(server_b.clone())
667                .add_protocol(Protocol::new(
668                    "client".into(),
669                    server_b_client_address.clone(),
670                ))
671                .add_protocol(Protocol::new(
672                    "gossip".into(),
673                    server_b_gossip_address.clone(),
674                ))
675                .build();
676
677            server(
678                client_input,
679                client_output,
680                gossip_input,
681                gossip_output,
682                gossip_trigger_rx_b,
683                member_data,
684                seed_nodes_clone,
685                empty(),
686            )
687        });
688
689        let key = Key {
690            namespace: Namespace::User,
691            table: "table".to_string(),
692            row_key: "row".to_string(),
693        };
694
695        let writer_name: Hostname = "writer".to_string();
696
697        let (writer_trigger_tx, writer_trigger_rx) = dfir_rs::util::unbounded_channel::<String>();
698
699        let key_clone = key.clone();
700        let server_a_client_address_clone = server_a_client_address.clone();
701
702        fleet.add_host(writer_name.clone(), |ctx| {
703            let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
704            dfir_syntax! {
705                client_output = dest_sink(client_tx);
706
707                source_stream(writer_trigger_rx)
708                    -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_a_client_address_clone.clone()) )
709                    -> client_output;
710            }
711        });
712
713        let reader_name: Hostname = "reader".to_string();
714
715        let (reader_trigger_tx, reader_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
716        let (response_tx, mut response_rx) = dfir_rs::util::unbounded_channel::<ClientResponse>();
717
718        let key_clone = key.clone();
719        let server_b_client_address_clone = server_b_client_address.clone();
720
721        fleet.add_host(reader_name.clone(), |ctx| {
722            let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
723            let client_rx = ctx.new_inbox::<ClientResponse>("client".to_string());
724
725            dfir_syntax! {
726                client_output = dest_sink(client_tx);
727
728                source_stream(reader_trigger_rx)
729                    -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_b_client_address_clone.clone()) )
730                    -> client_output;
731
732                client_input = source_stream(client_rx)
733                    -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
734
735            }
736        });
737
738        let value = "VALUE".to_string();
739        writer_trigger_tx.send(value.clone()).unwrap();
740
741        loop {
742            reader_trigger_tx.send(()).unwrap();
743            fleet.run_single_tick_all_hosts().await;
744            let responses = dfir_rs::util::collect_ready_async::<Vec<_>, _>(&mut response_rx).await;
745
746            if !responses.is_empty() {
747                assert_eq!(
748                    responses,
749                    &[ClientResponse::Get {
750                        key,
751                        value: HashSet::from([value.clone()])
752                    }]
753                );
754                break;
755            }
756
757            gossip_trigger_tx_a.send(()).unwrap();
758        }
759    }
760}