dfir_lang/graph/ops/
unique.rs

1use quote::quote_spanned;
2
3use super::{
4    OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
5};
6
7/// Takes one stream as input and filters out any duplicate occurrences. The output
8/// contains all unique values from the input.
9///
10/// ```dfir
11/// source_iter(vec![1, 1, 2, 3, 2, 1, 3])
12///     -> unique()
13///     -> assert_eq([1, 2, 3]);
14/// ```
15///
16/// `unique` can also be provided with one generic lifetime persistence argument, either
17/// `'tick` or `'static`, to specify how data persists. The default is `'tick`.
18/// With `'tick`, uniqueness is only considered within the current tick, so across multiple ticks
19/// duplicate values may be emitted.
20/// With `'static`, values will be remembered across ticks and no duplicates will ever be emitted.
21///
22/// ```rustbook
23/// let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<usize>();
24/// let mut flow = dfir_rs::dfir_syntax! {
25///     source_stream(input_recv)
26///         -> unique::<'tick>()
27///         -> for_each(|n| println!("{}", n));
28/// };
29///
30/// input_send.send(3).unwrap();
31/// input_send.send(3).unwrap();
32/// input_send.send(4).unwrap();
33/// input_send.send(3).unwrap();
34/// flow.run_available();
35/// // 3, 4
36///
37/// input_send.send(3).unwrap();
38/// input_send.send(5).unwrap();
39/// flow.run_available();
40/// // 3, 5
41/// // Note: 3 is emitted again.
42/// ```
43pub const UNIQUE: OperatorConstraints = OperatorConstraints {
44    name: "unique",
45    categories: &[OperatorCategory::Persistence],
46    hard_range_inn: RANGE_1,
47    soft_range_inn: RANGE_1,
48    hard_range_out: RANGE_1,
49    soft_range_out: RANGE_1,
50    num_args: 0,
51    persistence_args: &(0..=1),
52    type_args: RANGE_0,
53    is_external_input: false,
54    has_singleton_output: false,
55    flo_type: None,
56    ports_inn: None,
57    ports_out: None,
58    input_delaytype_fn: |_| None,
59    write_fn: |wc @ &WriteContextArgs {
60                   root,
61                   op_span,
62                   context,
63                   df_ident,
64                   ident,
65                   inputs,
66                   outputs,
67                   is_pull,
68                   ..
69               },
70               diagnostics| {
71        let [persistence] = wc.persistence_args_disallow_mutable(diagnostics);
72
73        let input = &inputs[0];
74        let output = &outputs[0];
75
76        let uniquedata_ident = wc.make_ident("uniquedata");
77
78        let write_prologue = quote_spanned! {op_span=>
79            let #uniquedata_ident = #df_ident.add_state(::std::cell::RefCell::new(#root::rustc_hash::FxHashSet::default()));
80        };
81        let write_prologue_after = wc
82            .persistence_as_state_lifespan(persistence)
83            .map(|lifespan| quote_spanned! {op_span=>
84                #df_ident.set_state_lifespan_hook(#uniquedata_ident, #lifespan, |rcell| { rcell.take(); });
85            }).unwrap_or_default();
86
87        let filter_fn = quote_spanned! {op_span=>
88            |item| {
89                let mut set = unsafe {
90                    // SAFETY: handle from `#df_ident.add_state(..)`.
91                    #context.state_ref_unchecked(#uniquedata_ident)
92                }.borrow_mut();
93
94                if !set.contains(item) {
95                    set.insert(::std::clone::Clone::clone(item));
96                    true
97                } else {
98                    false
99                }
100            }
101        };
102        let write_iterator = if is_pull {
103            quote_spanned! {op_span=>
104                let #ident = #input.filter(#filter_fn);
105            }
106        } else {
107            quote_spanned! {op_span=>
108                let #ident = #root::pusherator::filter::Filter::new(#filter_fn, #output);
109            }
110        };
111
112        Ok(OperatorWriteOutput {
113            write_prologue,
114            write_prologue_after,
115            write_iterator,
116            ..Default::default()
117        })
118    },
119};