1#![allow(clippy::allow_attributes, missing_docs, reason = "deprecated")]
3
4use std::borrow::Cow;
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use super::context::Context;
9use super::graph_ext::GraphExt;
10use super::handoff::Iter;
11use super::port::{RecvPort, SendCtx};
12use crate::scheduled::graph::Dfir;
13use crate::scheduled::handoff::VecHandoff;
14
15const QUERY_EDGE_NAME: Cow<'static, str> = Cow::Borrowed("query handoff");
16
17#[derive(Default)]
18pub struct Query<'a> {
19 df: Rc<RefCell<Dfir<'a>>>,
20}
21
22impl<'a> Query<'a> {
23 pub fn new() -> Self {
24 Default::default()
25 }
26
27 pub fn source<F, T>(&mut self, f: F) -> Operator<'a, T>
28 where
29 T: 'static,
30 F: 'static + FnMut(&Context, &SendCtx<VecHandoff<T>>),
31 {
32 let mut df = self.df.borrow_mut();
33
34 let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
35 df.add_subgraph_source("source", send_port, f);
36
37 Operator {
38 df: self.df.clone(),
39 recv_port,
40 }
41 }
42
43 pub fn concat<T>(&mut self, ops: Vec<Operator<T>>) -> Operator<'a, T>
44 where
45 T: 'static,
46 {
47 let mut df = self.df.borrow_mut();
48
49 let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
50 df.add_subgraph_n_m(
51 "concat",
52 ops.into_iter().map(|op| op.recv_port).collect(),
53 vec![send_port],
54 |_ctx, ins, out| {
55 for &input in ins {
56 out[0].give(input.take_inner());
57 }
58 },
59 );
60
61 Operator {
62 df: self.df.clone(),
63 recv_port,
64 }
65 }
66
67 pub fn run_available(&mut self) {
68 (*self.df).borrow_mut().run_available();
69 }
70}
71
72pub struct Operator<'a, T>
73where
74 T: 'static,
75{
76 df: Rc<RefCell<Dfir<'a>>>,
77 recv_port: RecvPort<VecHandoff<T>>,
78}
79
80impl<'a, T> Operator<'a, T>
81where
82 T: 'static,
83{
84 pub fn map<U, F>(self, mut f: F) -> Operator<'a, U>
85 where
86 F: 'static + Fn(T) -> U,
87 U: 'static,
88 {
89 let mut df = self.df.borrow_mut();
90
91 let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
92 df.add_subgraph_in_out("map", self.recv_port, send_port, move |_ctx, recv, send| {
93 send.give(Iter(recv.take_inner().into_iter().map(&mut f)));
94 });
95
96 drop(df);
97 Operator {
98 df: self.df,
99 recv_port,
100 }
101 }
102
103 #[must_use]
104 pub fn filter<F>(self, mut f: F) -> Operator<'a, T>
105 where
106 F: 'static + Fn(&T) -> bool,
107 {
108 let mut df = self.df.borrow_mut();
109
110 let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
111 df.add_subgraph_in_out(
112 "filter",
113 self.recv_port,
114 send_port,
115 move |_ctx, recv, send| {
116 send.give(Iter(recv.take_inner().into_iter().filter(&mut f)));
117 },
118 );
119
120 drop(df);
121 Operator {
122 df: self.df,
123 recv_port,
124 }
125 }
126
127 #[must_use]
128 pub fn concat(self, other: Operator<'a, T>) -> Operator<'a, T> {
129 let mut df = self.df.borrow_mut();
132
133 let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
134 df.add_subgraph_2in_out(
135 "concat",
136 self.recv_port,
137 other.recv_port,
138 send_port,
139 |_ctx, recv1, recv2, send| {
140 send.give(recv1.take_inner());
141 send.give(recv2.take_inner());
142 },
143 );
144
145 drop(df);
146 Operator {
147 df: self.df,
148 recv_port,
149 }
150 }
151
152 pub fn sink<F>(self, f: F)
153 where
154 F: 'static + Fn(T),
155 {
156 self.df
157 .borrow_mut()
158 .add_subgraph_sink("sink", self.recv_port, move |_ctx, recv| {
159 for v in recv.take_inner() {
160 f(v)
161 }
162 });
163 }
164}
165
166impl<'a, T: Clone> Operator<'a, T> {
167 pub fn tee(self, n: usize) -> Vec<Operator<'a, T>>
168 where
169 T: 'static,
170 {
171 let mut df = self.df.borrow_mut();
174
175 let mut sends = Vec::with_capacity(n);
176 let mut recvs = Vec::with_capacity(n);
177 for _ in 0..n {
178 let (send_port, recv_port) = df.make_edge(QUERY_EDGE_NAME);
179 sends.push(send_port);
180 recvs.push(Operator {
181 df: self.df.clone(),
182 recv_port,
183 });
184 }
185
186 df.add_subgraph_n_m(
187 "tee",
188 vec![self.recv_port],
189 sends,
190 move |_ctx, recvs, sends| {
191 let input = recvs.iter().next().unwrap().take_inner();
192 if let Some((&last_output, outputs)) = sends.split_last() {
193 for output in outputs {
194 output.give(Iter(input.iter().cloned()));
195 }
196 last_output.give(input);
197 }
198 },
199 );
200
201 recvs
202 }
203}