dfir_rs/scheduled/
query.rs

1//! Old query API, where each operator is a single subgraph. Deprecated.
2#![allow(clippy::allow_attributes, missing_docs, reason = "deprecated")]
3
4use std::borrow::Cow;
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use super::context::Context;
9use super::graph_ext::GraphExt;
10use super::handoff::Iter;
11use super::port::{RecvPort, SendCtx};
12use crate::scheduled::graph::Dfir;
13use crate::scheduled::handoff::VecHandoff;
14
15const QUERY_EDGE_NAME: Cow<'static, str> = Cow::Borrowed("query handoff");
16
17#[derive(Default)]
18pub struct Query<'a> {
19    df: Rc<RefCell<Dfir<'a>>>,
20}
21
22impl<'a> Query<'a> {
23    pub fn new() -> Self {
24        Default::default()
25    }
26
27    pub fn source<F, T>(&mut self, f: F) -> Operator<'a, T>
28    where
29        T: 'static,
30        F: 'static + FnMut(&Context, &SendCtx<VecHandoff<T>>),
31    {
32        let mut df = self.df.borrow_mut();
33
34        let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
35        df.add_subgraph_source("source", send_port, f);
36
37        Operator {
38            df: self.df.clone(),
39            recv_port,
40        }
41    }
42
43    pub fn concat<T>(&mut self, ops: Vec<Operator<T>>) -> Operator<'a, T>
44    where
45        T: 'static,
46    {
47        let mut df = self.df.borrow_mut();
48
49        let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
50        df.add_subgraph_n_m(
51            "concat",
52            ops.into_iter().map(|op| op.recv_port).collect(),
53            vec![send_port],
54            |_ctx, ins, out| {
55                for &input in ins {
56                    out[0].give(input.take_inner());
57                }
58            },
59        );
60
61        Operator {
62            df: self.df.clone(),
63            recv_port,
64        }
65    }
66
67    pub fn run_available(&mut self) {
68        (*self.df).borrow_mut().run_available();
69    }
70}
71
72pub struct Operator<'a, T>
73where
74    T: 'static,
75{
76    df: Rc<RefCell<Dfir<'a>>>,
77    recv_port: RecvPort<VecHandoff<T>>,
78}
79
80impl<'a, T> Operator<'a, T>
81where
82    T: 'static,
83{
84    pub fn map<U, F>(self, mut f: F) -> Operator<'a, U>
85    where
86        F: 'static + Fn(T) -> U,
87        U: 'static,
88    {
89        let mut df = self.df.borrow_mut();
90
91        let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
92        df.add_subgraph_in_out("map", self.recv_port, send_port, move |_ctx, recv, send| {
93            send.give(Iter(recv.take_inner().into_iter().map(&mut f)));
94        });
95
96        drop(df);
97        Operator {
98            df: self.df,
99            recv_port,
100        }
101    }
102
103    #[must_use]
104    pub fn filter<F>(self, mut f: F) -> Operator<'a, T>
105    where
106        F: 'static + Fn(&T) -> bool,
107    {
108        let mut df = self.df.borrow_mut();
109
110        let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
111        df.add_subgraph_in_out(
112            "filter",
113            self.recv_port,
114            send_port,
115            move |_ctx, recv, send| {
116                send.give(Iter(recv.take_inner().into_iter().filter(&mut f)));
117            },
118        );
119
120        drop(df);
121        Operator {
122            df: self.df,
123            recv_port,
124        }
125    }
126
127    #[must_use]
128    pub fn concat(self, other: Operator<'a, T>) -> Operator<'a, T> {
129        // TODO(justin): this is very slow.
130
131        let mut df = self.df.borrow_mut();
132
133        let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
134        df.add_subgraph_2in_out(
135            "concat",
136            self.recv_port,
137            other.recv_port,
138            send_port,
139            |_ctx, recv1, recv2, send| {
140                send.give(recv1.take_inner());
141                send.give(recv2.take_inner());
142            },
143        );
144
145        drop(df);
146        Operator {
147            df: self.df,
148            recv_port,
149        }
150    }
151
152    pub fn sink<F>(self, f: F)
153    where
154        F: 'static + Fn(T),
155    {
156        self.df
157            .borrow_mut()
158            .add_subgraph_sink("sink", self.recv_port, move |_ctx, recv| {
159                for v in recv.take_inner() {
160                    f(v)
161                }
162            });
163    }
164}
165
166impl<'a, T: Clone> Operator<'a, T> {
167    pub fn tee(self, n: usize) -> Vec<Operator<'a, T>>
168    where
169        T: 'static,
170    {
171        // TODO(justin): this is very slow. TODO(mingwei) use teeing handoff once its added.
172
173        let mut df = self.df.borrow_mut();
174
175        let mut sends = Vec::with_capacity(n);
176        let mut recvs = Vec::with_capacity(n);
177        for _ in 0..n {
178            let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
179            sends.push(send_port);
180            recvs.push(Operator {
181                df: self.df.clone(),
182                recv_port,
183            });
184        }
185
186        df.add_subgraph_n_m(
187            "tee",
188            vec![self.recv_port],
189            sends,
190            move |_ctx, recvs, sends| {
191                let input = recvs.iter().next().unwrap().take_inner();
192                if let Some((&last_output, outputs)) = sends.split_last() {
193                    for output in outputs {
194                        output.give(Iter(input.iter().cloned()));
195                    }
196                    last_output.give(input);
197                }
198            },
199        );
200
201        recvs
202    }
203}