hydro_test/external_client/
chat.rs1use std::hash::{DefaultHasher, Hash, Hasher};
2
3use colored::{Color, Colorize};
4use hydro_lang::keyed_stream::KeyedStream;
5use hydro_lang::location::MembershipEvent;
6use hydro_lang::unsafety::NonDet;
7use hydro_lang::*;
8use hydro_std::membership::track_membership;
9use palette::{FromColor, Hsv, Srgb};
10
11fn hash_to_color<T: Hash>(input: T) -> Color {
12 let mut hasher = DefaultHasher::new();
13 input.hash(&mut hasher);
14 let hash = hasher.finish();
15
16 let hue = (hash % 360) as f32;
18 let hsv = Hsv::new(hue, 1.0, 1.0);
19 let rgb: Srgb<u8> = Srgb::from_color(hsv).into_format();
20
21 Color::TrueColor {
22 r: rgb.red,
23 g: rgb.green,
24 b: rgb.blue,
25 }
26}
27
28pub fn chat_server<'a, P>(
33 process: &Process<'a, P>,
34 in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded>,
35 membership: KeyedStream<u64, MembershipEvent, Process<'a, P>, Unbounded>,
36 nondet_user_arrival_broadcast: NonDet,
37) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, NoOrder> {
38 let current_members = track_membership(membership);
39
40 let tick = process.tick();
41
42 current_members
43 .snapshot(&tick, nondet_user_arrival_broadcast)
44 .keys()
45 .cross_product(
46 in_stream
47 .entries()
48 .batch(&tick, nondet_user_arrival_broadcast),
49 )
50 .into_keyed()
51 .all_ticks()
52 .filter_map_with_key(q!(|(recipient_id, (sender_id, line))| {
53 if sender_id != recipient_id {
54 Some(format!(
55 "From {}: {:}",
56 sender_id,
57 line.color(self::hash_to_color(sender_id + 10))
58 ))
59 } else {
60 None
61 }
62 }))
63}