1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use dfir_rs::dfir_syntax;
7use dfir_rs::scheduled::ticks::TickInstant;
8use dfir_rs::serde::{Deserialize, Serialize};
9use dfir_rs::util::deploy::{
10 ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged,
11};
12use dfir_rs::util::{deserialize_from_bytes, serialize_to_bytes};
13use topolotree::protocol::*;
14
15#[derive(Serialize, Deserialize, Clone, Debug)]
16enum GossipOrIncrement {
17 Gossip(Vec<(u64, (usize, u64, u64))>),
18 Increment(u64, i64),
19}
20
21type NextStateType = (u64, bool, Rc<RefCell<(Vec<u64>, Vec<u64>)>>);
22
23#[dfir_rs::main]
24async fn main() {
25 let ports = dfir_rs::util::deploy::init::<()>().await;
26
27 let my_id: Vec<usize> = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap();
28 let my_id = my_id[0];
29 let num_replicas: Vec<usize> = serde_json::from_str(&std::env::args().nth(2).unwrap()).unwrap();
30 let num_replicas = num_replicas[0];
31
32 let increment_requests = ports
33 .port("increment_requests")
34 .connect::<ConnectedDirect>()
35 .into_source();
36
37 let query_responses = ports
38 .port("query_responses")
39 .connect::<ConnectedDirect>()
40 .into_sink();
41
42 let to_peer = ports
43 .port("to_peer")
44 .connect::<ConnectedDemux<ConnectedDirect>>()
45 .into_sink();
46
47 let from_peer = ports
48 .port("from_peer")
49 .connect::<ConnectedTagged<ConnectedDirect>>()
50 .into_source();
51
52 let f1 = async move {
53 #[cfg(target_os = "linux")]
54 loop {
55 let x = procinfo::pid::stat_self().unwrap();
56 let bytes = x.rss * 1024 * 4;
57 println!("memory,{}", bytes);
58 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
59 }
60 };
61
62 let df = dfir_syntax! {
63 next_state = union()
64 -> fold::<'static>(|| (HashMap::<u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>>::new(), HashMap::new(), TickInstant::default()), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
65 if context.current_tick() != *last_tick {
66 modified_tweets.clear();
67 }
68
69 match goi {
70 GossipOrIncrement::Gossip(gossip) => {
71 for (counter_id, (gossip_i, gossip_pos, gossip_neg)) in gossip.iter() {
72 let gossip_i = *gossip_i;
73 let cur_value = cur_state.entry(*counter_id).or_insert(Rc::new(RefCell::new((
74 vec![0; num_replicas], vec![0; num_replicas]
75 ))));
76 let mut cur_value = cur_value.as_ref().borrow_mut();
77
78 if *gossip_pos > cur_value.0[gossip_i] {
79 cur_value.0[gossip_i] = *gossip_pos;
80 modified_tweets.entry(*counter_id).or_insert(false);
81 }
82
83 if *gossip_neg > cur_value.1[gossip_i] {
84 cur_value.1[gossip_i] = *gossip_neg;
85 modified_tweets.entry(*counter_id).or_insert(false);
86 }
87 }
88 }
89 GossipOrIncrement::Increment(counter_id, delta) => {
90 let cur_value = cur_state.entry(counter_id).or_insert(Rc::new(RefCell::new((
91 vec![0; num_replicas], vec![0; num_replicas]
92 ))));
93 let mut cur_value = cur_value.as_ref().borrow_mut();
94
95 if delta > 0 {
96 cur_value.0[my_id] += delta as u64;
97 } else {
98 cur_value.1[my_id] += (-delta) as u64;
99 }
100
101 *modified_tweets.entry(counter_id).or_insert(false) |= true;
102 }
103 }
104
105 *last_tick = context.current_tick();
106 })
107 -> filter(|(_, _, tick)| *tick == context.current_tick())
108 -> filter(|(_, modified_tweets, _)| !modified_tweets.is_empty())
109 -> map(|(state, modified_tweets, _)| modified_tweets.iter().map(|(t, is_local)| (*t, *is_local, state.get(t).unwrap().clone())).collect::<Vec<_>>())
110 -> tee();
111
112 source_stream(from_peer)
113 -> map(|x| deserialize_from_bytes::<GossipOrIncrement>(&x.unwrap().1).unwrap())
114 -> next_state;
115
116 source_stream(increment_requests)
117 -> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
118 -> map(|t| GossipOrIncrement::Increment(t.key, t.change))
119 -> next_state;
120
121 all_peers = source_iter(0..num_replicas)
122 -> filter(|x| *x != my_id);
123
124 all_peers -> [0] broadcaster;
125 next_state -> [1] broadcaster;
126 broadcaster = cross_join::<'static, 'tick>()
127 -> map(|(peer, state): (_, Vec<NextStateType>)| {
128 (peer as u32, state.iter().filter(|t| t.1).map(|(k, _, v)| (*k, (my_id, v.as_ref().borrow().0[my_id], v.as_ref().borrow().1[my_id]))).collect())
129 })
130 -> filter(|(_, gossip): &(_, Vec<_>)| !gossip.is_empty())
131 -> map(|(peer, gossip): (_, _)| {
132 (peer, serialize_to_bytes(GossipOrIncrement::Gossip(gossip)))
133 })
134 -> dest_sink(to_peer);
135
136 next_state
137 -> flat_map(|a: Vec<NextStateType>| {
138 a.into_iter().map(|(k, _, rc_array)| {
139 let rc_borrowed = rc_array.as_ref().borrow();
140 let (pos, neg) = rc_borrowed.deref();
141 QueryResponse {
142 key: k,
143 value: pos.iter().sum::<u64>() as i64 - neg.iter().sum::<u64>() as i64
144 }
145 }).collect::<Vec<_>>()
146 })
147 -> map(serialize_to_bytes::<QueryResponse>)
148 -> dest_sink(query_responses);
149 };
150
151 #[cfg(target_os = "linux")]
153 {
154 let x = procinfo::pid::stat_self().unwrap();
155 let bytes = x.rss * 1024 * 4;
156 println!("memory,{}", bytes);
157 }
158
159 let f1_handle = tokio::spawn(f1);
160 dfir_rs::util::deploy::launch_flow(df).await;
161 f1_handle.abort();
162}