dfir_lang/graph/ops/unique.rs
1use quote::quote_spanned;
2
3use super::{
4 OperatorCategory, OperatorConstraints, OperatorWriteOutput, RANGE_0, RANGE_1, WriteContextArgs,
5};
6
7/// Takes one stream as input and filters out any duplicate occurrences. The output
8/// contains all unique values from the input.
9///
10/// ```dfir
11/// source_iter(vec![1, 1, 2, 3, 2, 1, 3])
12/// -> unique()
13/// -> assert_eq([1, 2, 3]);
14/// ```
15///
16/// `unique` can also be provided with one generic lifetime persistence argument, either
17/// `'tick` or `'static`, to specify how data persists. The default is `'tick`.
18/// With `'tick`, uniqueness is only considered within the current tick, so across multiple ticks
19/// duplicate values may be emitted.
20/// With `'static`, values will be remembered across ticks and no duplicates will ever be emitted.
21///
22/// ```rustbook
23/// let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<usize>();
24/// let mut flow = dfir_rs::dfir_syntax! {
25/// source_stream(input_recv)
26/// -> unique::<'tick>()
27/// -> for_each(|n| println!("{}", n));
28/// };
29///
30/// input_send.send(3).unwrap();
31/// input_send.send(3).unwrap();
32/// input_send.send(4).unwrap();
33/// input_send.send(3).unwrap();
34/// flow.run_available();
35/// // 3, 4
36///
37/// input_send.send(3).unwrap();
38/// input_send.send(5).unwrap();
39/// flow.run_available();
40/// // 3, 5
41/// // Note: 3 is emitted again.
42/// ```
43pub const UNIQUE: OperatorConstraints = OperatorConstraints {
44 name: "unique",
45 categories: &[OperatorCategory::Persistence],
46 hard_range_inn: RANGE_1,
47 soft_range_inn: RANGE_1,
48 hard_range_out: RANGE_1,
49 soft_range_out: RANGE_1,
50 num_args: 0,
51 persistence_args: &(0..=1),
52 type_args: RANGE_0,
53 is_external_input: false,
54 has_singleton_output: false,
55 flo_type: None,
56 ports_inn: None,
57 ports_out: None,
58 input_delaytype_fn: |_| None,
59 write_fn: |wc @ &WriteContextArgs {
60 root,
61 op_span,
62 context,
63 df_ident,
64 ident,
65 inputs,
66 outputs,
67 is_pull,
68 ..
69 },
70 diagnostics| {
71 let [persistence] = wc.persistence_args_disallow_mutable(diagnostics);
72
73 let input = &inputs[0];
74 let output = &outputs[0];
75
76 let uniquedata_ident = wc.make_ident("uniquedata");
77
78 let write_prologue = quote_spanned! {op_span=>
79 let #uniquedata_ident = #df_ident.add_state(::std::cell::RefCell::new(#root::rustc_hash::FxHashSet::default()));
80 };
81 let write_prologue_after = wc
82 .persistence_as_state_lifespan(persistence)
83 .map(|lifespan| quote_spanned! {op_span=>
84 #df_ident.set_state_lifespan_hook(#uniquedata_ident, #lifespan, |rcell| { rcell.take(); });
85 }).unwrap_or_default();
86
87 let filter_fn = quote_spanned! {op_span=>
88 |item| {
89 let mut set = unsafe {
90 // SAFETY: handle from `#df_ident.add_state(..)`.
91 #context.state_ref_unchecked(#uniquedata_ident)
92 }.borrow_mut();
93
94 if !set.contains(item) {
95 set.insert(::std::clone::Clone::clone(item));
96 true
97 } else {
98 false
99 }
100 }
101 };
102 let write_iterator = if is_pull {
103 quote_spanned! {op_span=>
104 let #ident = #input.filter(#filter_fn);
105 }
106 } else {
107 quote_spanned! {op_span=>
108 let #ident = #root::pusherator::filter::Filter::new(#filter_fn, #output);
109 }
110 };
111
112 Ok(OperatorWriteOutput {
113 write_prologue,
114 write_prologue_after,
115 write_iterator,
116 ..Default::default()
117 })
118 },
119};