1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use std::cell::RefCell;
use std::rc::Rc;

use dfir_rs::scheduled::graph::Dfir;
use dfir_rs::scheduled::graph_ext::GraphExt;
use dfir_rs::scheduled::handoff::{Iter, VecHandoff};
use dfir_rs::scheduled::port::RecvPort;

use crate::{Datum, RelExpr};

pub(crate) fn run_dataflow(r: RelExpr) -> Vec<Vec<Datum>> {
    let mut df = Dfir::new();

    let output_port = render_relational(&mut df, r);

    let output = Rc::new(RefCell::new(Vec::new()));
    let inner = output.clone();

    df.add_subgraph_sink("output sink", output_port, move |_ctx, recv| {
        for v in recv.take_inner() {
            (*inner).borrow_mut().push(v);
        }
    });

    df.run_available();

    let v = (*output).borrow();
    v.clone()
}

fn render_relational(df: &mut Dfir, r: RelExpr) -> RecvPort<VecHandoff<Vec<Datum>>> {
    let (send_port, recv_port) = df.make_edge("handoff");
    match r {
        RelExpr::Values(mut v) => {
            // TODO: drip-feed data?
            let scope = Vec::new();
            df.add_subgraph_source("value source", send_port, move |_ctx, send| {
                send.give(Iter(
                    v.drain(..)
                        .map(|row| row.into_iter().map(|e| e.eval(&scope)).collect()),
                ));
            });
        }
        RelExpr::Filter(preds, v) => {
            let input = render_relational(df, *v);
            df.add_subgraph_in_out("filter", input, send_port, move |_ctx, recv, send| {
                send.give(Iter(recv.take_inner().into_iter().filter(|row| {
                    preds.iter().all(|p| p.eval(row) == Datum::Bool(true))
                })));
            });
        }
        RelExpr::Project(exprs, v) => {
            let input = render_relational(df, *v);
            df.add_subgraph_in_out("project", input, send_port, move |_ctx, recv, send| {
                send.give(Iter(
                    recv.take_inner()
                        .into_iter()
                        .map(|row| exprs.iter().map(|e| e.eval(&row)).collect()),
                ));
            });
        }
    }
    recv_port
}