1use std::collections::{HashMap, HashSet};
2use std::fmt::Debug;
3use std::hash::Hash;
4
5use dfir_rs::dfir_syntax;
6use dfir_rs::futures::{Sink, Stream};
7use dfir_rs::itertools::Itertools;
8use dfir_rs::lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap};
9use dfir_rs::lattices::set_union::SetUnionHashSet;
10use dfir_rs::lattices::{Lattice, PairBimorphism};
11use dfir_rs::scheduled::graph::Dfir;
12use lattices::set_union::SetUnion;
13use lattices::{IsTop, Max, Pair};
14use lazy_static::lazy_static;
15use prometheus::{IntCounter, register_int_counter};
16use rand::seq::IteratorRandom;
17use rand::thread_rng;
18use serde::de::DeserializeOwned;
19use serde::{Deserialize, Serialize};
20use tracing::{info, trace};
21
22use crate::GossipMessage::{Ack, Nack};
23use crate::lattices::BoundedSetLattice;
24use crate::membership::{MemberData, MemberId};
25use crate::model::{
26 Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName, delete_row, upsert_row,
27};
28use crate::util::{ClientRequestWithAddress, GossipRequestWithAddress};
29use crate::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace};
30
31pub trait Address: Hash + Debug + Clone + Eq + Serialize {}
34impl<A> Address for A where A: Hash + Debug + Clone + Eq + Serialize {}
35
36#[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
37pub struct SeedNode<A>
38where
39 A: Address,
40{
41 pub id: MemberId,
42 pub address: A,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Lattice)]
46pub struct InfectingWrite {
47 write: Namespaces<Clock>,
48 members: BoundedSetLattice<MemberId, 2>,
49}
50
51pub type MessageId = String;
52
53lazy_static! {
54 pub static ref SETS_COUNTER: IntCounter =
55 register_int_counter!("sets", "Counts the number of SET requests processed.").unwrap();
56}
57
58#[expect(clippy::too_many_arguments)]
66pub fn server<
67 ClientInput,
68 ClientOutput,
69 ClientOutputError,
70 GossipInput,
71 GossipOutput,
72 GossipOutputError,
73 GossipTrigger,
74 SeedNodeStream,
75 Addr,
76>(
77 client_inputs: ClientInput,
78 client_outputs: ClientOutput,
79 gossip_inputs: GossipInput,
80 gossip_outputs: GossipOutput,
81 gossip_trigger: GossipTrigger,
82 member_info: MemberData<Addr>,
83 seed_nodes: Vec<SeedNode<Addr>>,
84 seed_node_stream: SeedNodeStream,
85) -> Dfir<'static>
86where
87 ClientInput: Stream<Item = (ClientRequest, Addr)> + Unpin + 'static,
88 ClientOutput: Sink<(ClientResponse, Addr), Error = ClientOutputError> + Unpin + 'static,
89 GossipInput: Stream<Item = (GossipMessage, Addr)> + Unpin + 'static,
90 GossipOutput: Sink<(GossipMessage, Addr), Error = GossipOutputError> + Unpin + 'static,
91 GossipTrigger: Stream<Item = ()> + Unpin + 'static,
92 SeedNodeStream: Stream<Item = Vec<SeedNode<Addr>>> + Unpin + 'static,
93 Addr: Address + DeserializeOwned + 'static,
94 ClientOutputError: Debug + 'static,
95 GossipOutputError: Debug + 'static,
96{
97 let my_member_id = member_info.id.clone();
98 let member_id_2 = my_member_id.clone();
100 let member_id_3 = my_member_id.clone();
101 let member_id_4 = my_member_id.clone();
102 let member_id_5 = my_member_id.clone();
103 let member_id_6 = my_member_id.clone();
104
105 dfir_syntax! {
106
107 on_start = initialize() -> tee();
108 on_start -> for_each(|_| info!("{:?}: Process {} started.", context.current_tick(), member_id_6));
109
110 seed_nodes = source_stream(seed_node_stream)
111 -> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec<SeedNode<Addr>>| {
112 **last_seed_nodes = new_seed_nodes;
113 info!("Updated seed nodes: {:?}", **last_seed_nodes);
114 });
115
116 on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap()))
118 -> writes;
119
120 client_out =
121 inspect(|(resp, addr)| trace!("{:?}: Sending response: {:?} to {:?}.", context.current_tick(), resp, addr))
122 -> dest_sink(client_outputs);
123
124 client_in = source_stream(client_inputs)
125 -> map(|(msg, addr)| ClientRequestWithAddress::from_request_and_address(msg, addr))
126 -> demux_enum::<ClientRequestWithAddress<Addr>>();
127
128 client_in[Get]
129 -> inspect(|req| trace!("{:?}: Received Get request: {:?}.", context.current_tick(), req))
130 -> map(|(key, addr) : (Key, Addr)| {
131 let row = MapUnionHashMap::new_from([
132 (
133 key.row_key,
134 SetUnionHashSet::new_from([addr ])
135 ),
136 ]);
137 let table = MapUnionHashMap::new_from([(key.table, row)]);
138 MapUnionHashMap::new_from([(key.namespace, table)])
139 })
140 -> reads;
141
142 client_in[Set]
143 -> inspect(|request| trace!("{:?}: Received Set request: {:?}.", context.current_tick(), request))
144 -> map(|(key, value, _addr) : (Key, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key, value))
145 -> inspect(|_| {
146 SETS_COUNTER.inc(); })
148 -> writes;
149
150 client_in[Delete]
151 -> inspect(|req| trace!("{:?}: Received Delete request: {:?}.", context.current_tick(), req))
152 -> map(|(key, _addr) : (Key, Addr)| delete_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key))
153 -> writes;
154
155 gossip_in = source_stream(gossip_inputs)
156 -> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr))
157 -> demux_enum::<GossipRequestWithAddress<Addr>>();
158
159 incoming_gossip_messages = gossip_in[Gossip]
160 -> inspect(|request| trace!("{:?}: Received gossip request: {:?}.", context.current_tick(), request))
161 -> tee();
162
163 gossip_in[Ack]
164 -> inspect(|request| trace!("{:?}: Received gossip ack: {:?}.", context.current_tick(), request))
165 -> null();
166
167 gossip_in[Nack]
168 -> inspect(|request| trace!("{:?}: Received gossip nack: {:?}.", context.current_tick(), request))
169 -> map( |(message_id, member_id, _addr)| {
170 MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: Default::default(), members: BoundedSetLattice::new_from([member_id]) }))
171 })
172 -> infecting_writes;
173
174 gossip_out = union() -> dest_sink(gossip_outputs);
175
176 incoming_gossip_messages
177 -> map(|(_msg_id, _member_id, writes, _addr)| writes )
178 -> writes;
179
180 gossip_processing_pipeline = incoming_gossip_messages
181 -> map(|(msg_id, _member_id, writes, sender_address) : (String, MemberId, Namespaces<Max<u64>>, Addr)| {
182 let namespaces = &#namespaces;
183 let all_data: &HashMap<Namespace, TableMap<RowValue<Clock>>> = namespaces.as_reveal_ref();
184 let possible_new_data: &HashMap<Namespace, TableMap<RowValue<Max<u64>>>>= writes.as_reveal_ref();
185
186 let gossip_has_new_data = possible_new_data.iter()
192 .flat_map(|(namespace, tables)| {
193 tables.as_reveal_ref().iter().flat_map(move |(table, rows)|{
194 rows.as_reveal_ref().iter().map(move |(row_key, row_value)| (namespace, table, row_key, row_value.as_reveal_ref().0.as_reveal_ref()))
195 })
196 })
197 .any(|(ns,table, row_key, new_ts)| {
198 let existing_tables = all_data.get(ns);
199 let existing_rows = existing_tables.and_then(|tables| tables.as_reveal_ref().get(table));
200 let existing_row = existing_rows.and_then(|rows| rows.as_reveal_ref().get(row_key));
201 let existing_ts = existing_row.map(|row| row.as_reveal_ref().0.as_reveal_ref());
202
203 if let Some(existing_ts) = existing_ts {
204 trace!("Comparing timestamps: {:?} vs {:?}", new_ts, existing_ts);
205 new_ts > existing_ts
206 } else {
207 true
208 }
209 });
210
211 if gossip_has_new_data {
212 (Ack { message_id: msg_id, member_id: member_id_2.clone()}, sender_address, Some(writes))
213 } else {
214 (Nack { message_id: msg_id, member_id: member_id_3.clone()}, sender_address, None)
215 }
216 })
217 -> tee();
218
219 gossip_processing_pipeline
220 -> map(|(response, address, _writes)| (response, address))
221 -> inspect( |(msg, addr)| trace!("{:?}: Sending gossip response: {:?} to {:?}.", context.current_tick(), msg, addr))
222 -> gossip_out;
223
224 gossip_processing_pipeline
225 -> filter(|(_, _, writes)| writes.is_some())
226 -> map(|(_, _, writes)| writes.unwrap())
227 -> writes;
228
229 writes = union();
230
231 writes -> namespaces;
232
233 namespaces = state::<'static, Namespaces::<Clock>>();
234 new_writes = namespaces -> tee(); reads = state::<'tick, MapUnionHashMap<Namespace, MapUnionHashMap<TableName, MapUnionHashMap<RowKey, SetUnionHashSet<Addr>>>>>();
237
238 new_writes -> [0]process_system_table_reads;
239 reads -> [1]process_system_table_reads;
240
241 process_system_table_reads = lattice_bimorphism(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(PairBimorphism))), #namespaces, #reads)
242 -> lattice_reduce::<'tick>() -> flat_map(|result: NamespaceMap<Pair<RowValue<Clock>, SetUnion<HashSet<Addr>>>>| {
244
245 let mut response: Vec<(ClientResponse, Addr)> = vec![];
246
247 let result = result.as_reveal_ref();
248
249 for (namespace, tables) in result.iter() {
250 for (table_name, table) in tables.as_reveal_ref().iter() {
251 for (row_key, join_results) in table.as_reveal_ref().iter() {
252 let key = Key {
253 namespace: *namespace,
254 table: table_name.clone(),
255 row_key: row_key.clone(),
256 };
257
258 let timestamped_values = join_results.as_reveal_ref().0;
259 let all_values = timestamped_values.as_reveal_ref().1.as_reveal_ref();
260
261 let all_addresses = join_results.as_reveal_ref().1.as_reveal_ref();
262 let socket_addr = all_addresses.iter().find_or_first(|_| true).unwrap();
263
264 response.push((
265 ClientResponse::Get {key, value: all_values.clone()},
266 socket_addr.clone(),
267 ));
268 }
269 }
270 }
271 response
272 }) -> client_out;
273
274 new_writes -> for_each(|x| trace!("NEW WRITE: {:?}", x));
275
276 infecting_writes = union() -> state_by::<'static, MapUnionHashMap<MessageId, InfectingWrite>>(std::convert::identity, std::default::Default::default);
278
279 new_writes -> map(|write| {
280 let id = uuid::Uuid::new_v4().to_string();
284 MapUnionSingletonMap::new_from((id, InfectingWrite { write, members: BoundedSetLattice::new() }))
285 }) -> infecting_writes;
286
287 gossip_trigger = source_stream(gossip_trigger);
288
289 gossip_messages = gossip_trigger
290 -> flat_map( |_|
291 {
292 let infecting_writes = #infecting_writes.as_reveal_ref().clone();
293 trace!("{:?}: Currently gossipping {} infecting writes.", context.current_tick(), infecting_writes.iter().filter(|(_, write)| !write.members.is_top()).count());
294 infecting_writes
295 }
296 )
297 -> filter(|(_id, infecting_write)| !infecting_write.members.is_top())
298 -> map(|(id, infecting_write)| {
299 trace!("{:?}: Choosing a peer to gossip to. {:?}:{:?}", context.current_tick(), id, infecting_write);
300 let peers = #namespaces.as_reveal_ref().get(&Namespace::System).unwrap().as_reveal_ref().get("members").unwrap().as_reveal_ref().clone();
301
302 let mut peer_names = HashSet::new();
303 peers.iter().for_each(|(row_key, _)| {
304 peer_names.insert(row_key.clone());
305 });
306
307 let seed_nodes = &#seed_nodes;
308 seed_nodes.iter().for_each(|seed_node| {
309 peer_names.insert(seed_node.id.clone());
310 });
311
312 peer_names.remove(&member_id_5);
314
315 trace!("{:?}: Peers: {:?}", context.current_tick(), peer_names);
316
317 let chosen_peer_name = peer_names.iter().choose(&mut thread_rng());
318
319 if chosen_peer_name.is_none() {
320 trace!("{:?}: No peers to gossip to.", context.current_tick());
321 return None;
322 }
323
324 let chosen_peer_name = chosen_peer_name.unwrap();
325 let gossip_address = if peers.contains_key(chosen_peer_name) {
326 let peer_info_value = peers.get(chosen_peer_name).unwrap().as_reveal_ref().1.as_reveal_ref().iter().next().unwrap().clone();
327 let peer_info_deserialized = serde_json::from_str::<MemberData<Addr>>(&peer_info_value).unwrap();
328 peer_info_deserialized.protocols.iter().find(|protocol| protocol.name == "gossip").unwrap().clone().endpoint
329 } else {
330 seed_nodes.iter().find(|seed_node| seed_node.id == *chosen_peer_name).unwrap().address.clone()
331 };
332
333 trace!("Chosen peer: {:?}:{:?}", chosen_peer_name, gossip_address);
334 Some((id, infecting_write, gossip_address))
335 })
336 -> flatten()
337 -> inspect(|(message_id, infecting_write, peer_gossip_address)| trace!("{:?}: Sending write:\nMessageId:{:?}\nWrite:{:?}\nPeer Address:{:?}", context.current_tick(), message_id, infecting_write, peer_gossip_address))
338 -> map(|(message_id, infecting_write, peer_gossip_address): (String, InfectingWrite, Addr)| {
339 let gossip_request = GossipMessage::Gossip {
340 message_id: message_id.clone(),
341 member_id: member_id_4.clone(),
342 writes: infecting_write.write.clone(),
343 };
344 (gossip_request, peer_gossip_address)
345 })
346 -> gossip_out;
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use std::collections::HashSet;
353
354 use dfir_rs::tokio_stream::empty;
355 use dfir_rs::util::simulation::{Address, Fleet, Hostname};
356
357 use super::*;
358 use crate::membership::{MemberDataBuilder, Protocol};
359
360 #[dfir_rs::test]
361 async fn test_member_init() {
362 let mut fleet = Fleet::new();
363
364 let server_name: Hostname = "server".to_string();
365
366 let server_client_address = Address::new(server_name.clone(), "client".to_string());
367 let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string());
368
369 let (_, gossip_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
370
371 fleet.add_host(server_name.clone(), |ctx| {
373 let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
374 let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
375
376 let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
377 let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
378
379 let member_data = MemberDataBuilder::new(server_name.clone())
380 .add_protocol(Protocol::new(
381 "client".into(),
382 server_client_address.clone(),
383 ))
384 .add_protocol(Protocol::new(
385 "gossip".into(),
386 server_gossip_address.clone(),
387 ))
388 .build();
389
390 server(
391 client_input,
392 client_output,
393 gossip_input,
394 gossip_output,
395 gossip_trigger_rx,
396 member_data,
397 vec![],
398 empty(),
399 )
400 });
401
402 let client_name: Hostname = "client".to_string();
403
404 let key = "/sys/members/server".parse::<Key>().unwrap();
405
406 let (trigger_tx, trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
407 let (response_tx, mut response_rx) = dfir_rs::util::unbounded_channel::<ClientResponse>();
408
409 let key_clone = key.clone();
410 let server_client_address_clone = server_client_address.clone();
411
412 fleet.add_host(client_name.clone(), |ctx| {
413 let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
414 let client_rx = ctx.new_inbox::<ClientResponse>("client".to_string());
415
416 dfir_syntax! {
417
418 client_output = dest_sink(client_tx);
419
420 source_stream(trigger_rx)
421 -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) )
422 -> client_output;
423
424 client_input = source_stream(client_rx)
425 -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
426
427 }
428 });
429
430 trigger_tx.send(()).unwrap();
432
433 let expected_member_data = MemberDataBuilder::new(server_name.clone())
434 .add_protocol(Protocol::new(
435 "client".to_string(),
436 server_client_address.clone(),
437 ))
438 .add_protocol(Protocol::new(
439 "gossip".to_string(),
440 server_gossip_address.clone(),
441 ))
442 .build();
443
444 loop {
445 fleet.run_single_tick_all_hosts().await;
446
447 let responses = dfir_rs::util::collect_ready_async::<Vec<_>, _>(&mut response_rx).await;
448
449 if !responses.is_empty() {
450 assert_eq!(
451 responses,
452 &[(ClientResponse::Get {
453 key: key.clone(),
454 value: HashSet::from([
455 serde_json::to_string(&expected_member_data).unwrap()
456 ])
457 })]
458 );
459 break;
460 }
461 }
462 }
463
464 #[dfir_rs::test]
465 async fn test_multiple_values_same_tick() {
466 let mut fleet = Fleet::new();
467
468 let server_name: Hostname = "server".to_string();
469
470 let server_client_address = Address::new(server_name.clone(), "client".to_string());
471
472 let (_, gossip_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
473
474 fleet.add_host(server_name.clone(), |ctx| {
476 let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
477 let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
478
479 let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
480 let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
481 let server_gossip_address = Address::new(server_name.clone(), "gossip".to_string());
482
483 let member_data = MemberDataBuilder::new(server_name.clone())
484 .add_protocol(Protocol::new(
485 "client".into(),
486 server_client_address.clone(),
487 ))
488 .add_protocol(Protocol::new(
489 "gossip".into(),
490 server_gossip_address.clone(),
491 ))
492 .build();
493
494 server(
495 client_input,
496 client_output,
497 gossip_input,
498 gossip_output,
499 gossip_trigger_rx,
500 member_data,
501 vec![],
502 empty(),
503 )
504 });
505
506 let key = Key {
507 namespace: Namespace::System,
508 table: "table".to_string(),
509 row_key: "row".to_string(),
510 };
511 let val_a = "A".to_string();
512 let val_b = "B".to_string();
513
514 let writer_name: Hostname = "writer".to_string();
515
516 let (writer_trigger_tx, writer_trigger_rx) = dfir_rs::util::unbounded_channel::<String>();
517 let key_clone = key.clone();
518 let server_client_address_clone = server_client_address.clone();
519
520 fleet.add_host(writer_name.clone(), |ctx| {
521 let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
522 dfir_syntax! {
523 client_output = dest_sink(client_tx);
524
525 source_stream(writer_trigger_rx)
526 -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_client_address_clone.clone()) )
527 -> client_output;
528 }
529 });
530
531 let writer = fleet.get_host_mut(&writer_name).unwrap();
533 writer_trigger_tx.send(val_a.clone()).unwrap();
534 writer.run_tick();
535
536 writer_trigger_tx.send(val_b.clone()).unwrap();
537 writer.run_tick();
538
539 fleet.process_network().await;
541
542 let server = fleet.get_host_mut(&server_name).unwrap();
544 server.run_tick();
545
546 let reader_name: Hostname = "reader".to_string();
548
549 let (reader_trigger_tx, reader_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
550 let (response_tx, mut response_rx) = dfir_rs::util::unbounded_channel::<ClientResponse>();
551
552 let key_clone = key.clone();
553 let server_client_address_clone = server_client_address.clone();
554
555 fleet.add_host(reader_name.clone(), |ctx| {
556 let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
557 let client_rx = ctx.new_inbox::<ClientResponse>("client".to_string());
558
559 dfir_syntax! {
560 client_output = dest_sink(client_tx);
561
562 source_stream(reader_trigger_rx)
563 -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_client_address_clone.clone()) )
564 -> client_output;
565
566 client_input = source_stream(client_rx)
567 -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
568
569 }
570 });
571
572 reader_trigger_tx.send(()).unwrap();
573
574 loop {
575 fleet.run_single_tick_all_hosts().await;
576
577 let responses = dfir_rs::util::collect_ready_async::<Vec<_>, _>(&mut response_rx).await;
578
579 if !responses.is_empty() {
580 assert_eq!(
581 responses,
582 &[ClientResponse::Get {
583 key,
584 value: HashSet::from([val_a, val_b])
585 }]
586 );
587 break;
588 }
589 }
590 }
591
592 #[dfir_rs::test]
593 async fn test_gossip() {
594 let subscriber = tracing_subscriber::FmtSubscriber::builder()
595 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
596 .with_test_writer()
597 .finish();
598
599 let _ = tracing::subscriber::set_global_default(subscriber);
600
601 let mut fleet = Fleet::new();
602
603 let server_a: Hostname = "server_a".to_string();
604 let server_b: Hostname = "server_b".to_string();
605
606 let server_a_client_address = Address::new(server_a.clone(), "client".to_string());
607 let server_b_client_address = Address::new(server_b.clone(), "client".to_string());
608
609 let server_a_gossip_address = Address::new(server_a.clone(), "gossip".to_string());
610 let server_b_gossip_address = Address::new(server_b.clone(), "gossip".to_string());
611
612 let seed_nodes = vec![
613 SeedNode {
614 id: server_a.clone(),
615 address: server_a_gossip_address.clone(),
616 },
617 SeedNode {
618 id: server_b.clone(),
619 address: server_b_gossip_address.clone(),
620 },
621 ];
622
623 let (gossip_trigger_tx_a, gossip_trigger_rx_a) = dfir_rs::util::unbounded_channel::<()>();
624
625 let seed_nodes_clone = seed_nodes.clone();
626 fleet.add_host(server_a.clone(), |ctx| {
627 let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
628 let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
629
630 let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
631 let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
632
633 let member_data = MemberDataBuilder::new(server_a.clone())
634 .add_protocol(Protocol::new(
635 "client".into(),
636 server_a_client_address.clone(),
637 ))
638 .add_protocol(Protocol::new(
639 "gossip".into(),
640 server_a_gossip_address.clone(),
641 ))
642 .build();
643
644 server(
645 client_input,
646 client_output,
647 gossip_input,
648 gossip_output,
649 gossip_trigger_rx_a,
650 member_data,
651 seed_nodes_clone,
652 empty(),
653 )
654 });
655
656 let (_, gossip_trigger_rx_b) = dfir_rs::util::unbounded_channel::<()>();
657
658 let seed_nodes_clone = seed_nodes.clone();
659 fleet.add_host(server_b.clone(), |ctx| {
660 let client_input = ctx.new_inbox::<ClientRequest>("client".to_string());
661 let client_output = ctx.new_outbox::<ClientResponse>("client".to_string());
662
663 let gossip_input = ctx.new_inbox::<GossipMessage>("gossip".to_string());
664 let gossip_output = ctx.new_outbox::<GossipMessage>("gossip".to_string());
665
666 let member_data = MemberDataBuilder::new(server_b.clone())
667 .add_protocol(Protocol::new(
668 "client".into(),
669 server_b_client_address.clone(),
670 ))
671 .add_protocol(Protocol::new(
672 "gossip".into(),
673 server_b_gossip_address.clone(),
674 ))
675 .build();
676
677 server(
678 client_input,
679 client_output,
680 gossip_input,
681 gossip_output,
682 gossip_trigger_rx_b,
683 member_data,
684 seed_nodes_clone,
685 empty(),
686 )
687 });
688
689 let key = Key {
690 namespace: Namespace::User,
691 table: "table".to_string(),
692 row_key: "row".to_string(),
693 };
694
695 let writer_name: Hostname = "writer".to_string();
696
697 let (writer_trigger_tx, writer_trigger_rx) = dfir_rs::util::unbounded_channel::<String>();
698
699 let key_clone = key.clone();
700 let server_a_client_address_clone = server_a_client_address.clone();
701
702 fleet.add_host(writer_name.clone(), |ctx| {
703 let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
704 dfir_syntax! {
705 client_output = dest_sink(client_tx);
706
707 source_stream(writer_trigger_rx)
708 -> map(|value| (ClientRequest::Set { key: key_clone.clone(), value: value.clone()}, server_a_client_address_clone.clone()) )
709 -> client_output;
710 }
711 });
712
713 let reader_name: Hostname = "reader".to_string();
714
715 let (reader_trigger_tx, reader_trigger_rx) = dfir_rs::util::unbounded_channel::<()>();
716 let (response_tx, mut response_rx) = dfir_rs::util::unbounded_channel::<ClientResponse>();
717
718 let key_clone = key.clone();
719 let server_b_client_address_clone = server_b_client_address.clone();
720
721 fleet.add_host(reader_name.clone(), |ctx| {
722 let client_tx = ctx.new_outbox::<ClientRequest>("client".to_string());
723 let client_rx = ctx.new_inbox::<ClientResponse>("client".to_string());
724
725 dfir_syntax! {
726 client_output = dest_sink(client_tx);
727
728 source_stream(reader_trigger_rx)
729 -> map(|_| (ClientRequest::Get { key: key_clone.clone() }, server_b_client_address_clone.clone()) )
730 -> client_output;
731
732 client_input = source_stream(client_rx)
733 -> for_each(|(resp, _addr)| response_tx.send(resp).unwrap());
734
735 }
736 });
737
738 let value = "VALUE".to_string();
739 writer_trigger_tx.send(value.clone()).unwrap();
740
741 loop {
742 reader_trigger_tx.send(()).unwrap();
743 fleet.run_single_tick_all_hosts().await;
744 let responses = dfir_rs::util::collect_ready_async::<Vec<_>, _>(&mut response_rx).await;
745
746 if !responses.is_empty() {
747 assert_eq!(
748 responses,
749 &[ClientResponse::Get {
750 key,
751 value: HashSet::from([value.clone()])
752 }]
753 );
754 break;
755 }
756
757 gossip_trigger_tx_a.send(()).unwrap();
758 }
759 }
760}