pn/
pn.rs

1use std::cell::RefCell;
2use std::collections::{HashMap, HashSet};
3use std::ops::Deref;
4use std::rc::Rc;
5
6use dfir_rs::dfir_syntax;
7use dfir_rs::scheduled::ticks::TickInstant;
8use dfir_rs::serde::{Deserialize, Serialize};
9use dfir_rs::util::deploy::{
10    ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged,
11};
12use dfir_rs::util::{deserialize_from_bytes, serialize_to_bytes};
13pub use topolotree::protocol::*;
14
15type NextStateType = (u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>);
16
17#[derive(Serialize, Deserialize, Clone, Debug)]
18enum GossipOrIncrement {
19    Gossip(Vec<NextStateType>),
20    Increment(u64, i64),
21}
22
23#[dfir_rs::main]
24async fn main() {
25    let ports = dfir_rs::util::deploy::init::<()>().await;
26
27    let my_id: Vec<usize> = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap();
28    let my_id = my_id[0];
29    let num_replicas: Vec<usize> = serde_json::from_str(&std::env::args().nth(2).unwrap()).unwrap();
30    let num_replicas = num_replicas[0];
31
32    let increment_requests = ports
33        .port("increment_requests")
34        .connect::<ConnectedDirect>()
35        .into_source();
36
37    let query_responses = ports
38        .port("query_responses")
39        .connect::<ConnectedDirect>()
40        .into_sink();
41
42    let to_peer = ports
43        .port("to_peer")
44        .connect::<ConnectedDemux<ConnectedDirect>>()
45        .into_sink();
46
47    let from_peer = ports
48        .port("from_peer")
49        .connect::<ConnectedTagged<ConnectedDirect>>()
50        .into_source();
51
52    let f1 = async move {
53        #[cfg(target_os = "linux")]
54        loop {
55            let x = procinfo::pid::stat_self().unwrap();
56            let bytes = x.rss * 1024 * 4;
57            println!("memory,{}", bytes);
58            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
59        }
60    };
61
62    let df = dfir_syntax! {
63        next_state = union()
64            -> fold::<'static>(|| (HashMap::<u64, Rc<RefCell<(Vec<u64>, Vec<u64>)>>>::new(), HashSet::new(), TickInstant::default()), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
65                if context.current_tick() != *last_tick {
66                    modified_tweets.clear();
67                }
68
69                match goi {
70                    GossipOrIncrement::Gossip(gossip) => {
71                        for (counter_id, gossip_rc) in gossip.iter() {
72                            let gossip_borrowed = gossip_rc.as_ref().borrow();
73                            let (pos, neg) = gossip_borrowed.deref();
74                            let cur_value = cur_state.entry(*counter_id).or_insert(Rc::new(RefCell::new((
75                                vec![0; num_replicas], vec![0; num_replicas]
76                            ))));
77                            let mut cur_value = cur_value.as_ref().borrow_mut();
78
79                            for i in 0..num_replicas {
80                                if pos[i] > cur_value.0[i] {
81                                    cur_value.0[i] = pos[i];
82                                    modified_tweets.insert(*counter_id);
83                                }
84
85                                if neg[i] > cur_value.1[i] {
86                                    cur_value.1[i] = neg[i];
87                                    modified_tweets.insert(*counter_id);
88                                }
89                            }
90                        }
91                    }
92                    GossipOrIncrement::Increment(counter_id, delta) => {
93                        let cur_value = cur_state.entry(counter_id).or_insert(Rc::new(RefCell::new((
94                            vec![0; num_replicas], vec![0; num_replicas]
95                        ))));
96                        let mut cur_value = cur_value.as_ref().borrow_mut();
97
98                        if delta > 0 {
99                            cur_value.0[my_id] += delta as u64;
100                        } else {
101                            cur_value.1[my_id] += (-delta) as u64;
102                        }
103
104                        modified_tweets.insert(counter_id);
105                    }
106                }
107
108                *last_tick = context.current_tick();
109            })
110            -> filter(|(_, _, tick)| *tick == context.current_tick())
111            -> filter(|(_, modified_tweets, _)| !modified_tweets.is_empty())
112            -> map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, state.get(t).unwrap().clone())).collect::<Vec<_>>())
113            -> tee();
114
115        source_stream(from_peer)
116            -> map(|x| deserialize_from_bytes::<GossipOrIncrement>(&x.unwrap().1).unwrap())
117            -> next_state;
118
119        source_stream(increment_requests)
120            -> map(|x| deserialize_from_bytes::<OperationPayload>(&x.unwrap()).unwrap())
121            -> map(|t| GossipOrIncrement::Increment(t.key, t.change))
122            -> next_state;
123
124        all_peers = source_iter(0..num_replicas)
125            -> filter(|x| *x != my_id);
126
127        all_peers -> [0] broadcaster;
128        next_state -> [1] broadcaster;
129        broadcaster = cross_join::<'static, 'tick>()
130            -> map(|(peer, state)| {
131                (peer as u32, serialize_to_bytes(GossipOrIncrement::Gossip(state)))
132            })
133            -> dest_sink(to_peer);
134
135        next_state
136            -> flat_map(|a: Vec<NextStateType>| {
137                a.into_iter().map(|(k, rc_array)| {
138                    let rc_borrowed = rc_array.as_ref().borrow();
139                    let (pos, neg) = rc_borrowed.deref();
140                    QueryResponse {
141                        key: k,
142                        value: pos.iter().sum::<u64>() as i64 - neg.iter().sum::<u64>() as i64
143                    }
144                }).collect::<Vec<_>>()
145            })
146            -> map(serialize_to_bytes::<QueryResponse>)
147            -> dest_sink(query_responses);
148    };
149
150    // initial memory
151    #[cfg(target_os = "linux")]
152    {
153        let x = procinfo::pid::stat_self().unwrap();
154        let bytes = x.rss * 1024 * 4;
155        println!("memory,{}", bytes);
156    }
157
158    let f1_handle = tokio::spawn(f1);
159    dfir_rs::util::deploy::launch_flow(df).await;
160    f1_handle.abort();
161}