forked from dathere/qsv
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdedup.rs
More file actions
209 lines (184 loc) · 6.99 KB
/
Copy pathdedup.rs
File metadata and controls
209 lines (184 loc) · 6.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
use std::cmp;
use crate::config::{Config, Delimiter};
use crate::select::SelectColumns;
use crate::util;
use crate::CliResult;
use csv::ByteRecord;
use rayon::prelude::*;
use serde::Deserialize;
use crate::cmd::sort::iter_cmp;
static USAGE: &str = r#"
Deduplicates CSV rows.
Note that this requires reading all of the CSV data into memory because because the
rows need to be sorted first.
That is, unless the --sorted option is used to indicate the CSV is already sorted
(typically, with the extsort command). This will make dedup run in streaming mode
with constant memory.
Either way, the output will not only be deduplicated, it will also be sorted.
A duplicate count will also be sent to <stderr>.
Usage:
qsv dedup [options] [<input>]
sort options:
-s, --select <arg> Select a subset of columns to dedup.
Note that the outputs will remain at the full width
of the CSV.
See 'qsv select --help' for the format details.
-C, --no-case Compare strings disregarding case
--sorted The input is already sorted. Do not load the CSV into
memory to sort it first. Meant to be used in tandem and
after an extsort.
-D, --dupes-output <file> Write duplicates to <file>.
-H, --human-readable Comma separate duplicate count.
-j, --jobs <arg> The number of jobs to run in parallel when sorting
an unsorted CSV, before deduping.
When not set, the number of jobs is set to the
number of CPUs detected.
Does not work with --sorted option as its not
multithreaded.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will not be interpreted
as headers. That is, it will be sorted with the rest
of the rows. Otherwise, the first row will always
appear as the header row in the output.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. (default: ,)
"#;
#[derive(Deserialize)]
struct Args {
arg_input: Option<String>,
flag_select: SelectColumns,
flag_no_case: bool,
flag_sorted: bool,
flag_dupes_output: Option<String>,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_human_readable: bool,
flag_jobs: Option<usize>,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let no_case = args.flag_no_case;
let rconfig = Config::new(&args.arg_input)
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers)
.select(args.flag_select);
let mut rdr = rconfig.reader()?;
let mut wtr = Config::new(&args.flag_output).writer()?;
let dupes_output = args.flag_dupes_output.is_some();
let mut dupewtr = Config::new(&args.flag_dupes_output).writer()?;
let headers = rdr.byte_headers()?.clone();
if dupes_output {
dupewtr.write_byte_record(&headers)?;
}
let sel = rconfig.selection(&headers)?;
rconfig.write_headers(&mut rdr, &mut wtr)?;
let mut dupe_count = 0_usize;
if args.flag_sorted {
let mut record = ByteRecord::new();
let mut next_record = ByteRecord::new();
rdr.read_byte_record(&mut record)?;
loop {
let more_records = rdr.read_byte_record(&mut next_record)?;
if !more_records {
wtr.write_byte_record(&record)?;
break;
};
let a = sel.select(&record);
let b = sel.select(&next_record);
let comparison = if no_case {
iter_cmp_no_case(a, b)
} else {
iter_cmp(a, b)
};
match comparison {
cmp::Ordering::Equal => {
dupe_count += 1;
if dupes_output {
dupewtr.write_byte_record(&record)?;
}
}
cmp::Ordering::Less => {
wtr.write_byte_record(&record)?;
record.clone_from(&next_record);
}
cmp::Ordering::Greater => {
let fail_msg = format!(
"Aborting! Input not sorted! {record:?} is greater than {next_record:?}"
);
return fail!(fail_msg);
}
}
}
} else {
util::njobs(args.flag_jobs);
let mut all = rdr.byte_records().collect::<Result<Vec<_>, _>>()?;
all.par_sort_unstable_by(|r1, r2| {
let a = sel.select(r1);
let b = sel.select(r2);
iter_cmp(a, b)
});
let mut current = 0;
while current + 1 < all.len() {
let a = sel.select(&all[current]);
let b = sel.select(&all[current + 1]);
if no_case {
if iter_cmp_no_case(a, b) == cmp::Ordering::Equal {
dupe_count += 1;
if dupes_output {
dupewtr.write_byte_record(&all[current])?;
}
} else {
wtr.write_byte_record(&all[current])?;
}
} else if iter_cmp(a, b) == cmp::Ordering::Equal {
dupe_count += 1;
if dupes_output {
dupewtr.write_byte_record(&all[current])?;
}
} else {
wtr.write_byte_record(&all[current])?;
}
current += 1;
}
wtr.write_byte_record(&all[current])?;
}
dupewtr.flush()?;
if args.flag_human_readable {
use thousands::Separable;
eprintln!("{}", dupe_count.separate_with_commas());
} else {
eprintln!("{dupe_count}");
}
Ok(wtr.flush()?)
}
/// Try comparing `a` and `b` ignoring the case
#[inline]
pub fn iter_cmp_no_case<'a, L, R>(mut a: L, mut b: R) -> cmp::Ordering
where
L: Iterator<Item = &'a [u8]>,
R: Iterator<Item = &'a [u8]>,
{
loop {
match (next_no_case(&mut a), next_no_case(&mut b)) {
(None, None) => return cmp::Ordering::Equal,
(None, _) => return cmp::Ordering::Less,
(_, None) => return cmp::Ordering::Greater,
(Some(x), Some(y)) => match x.cmp(&y) {
cmp::Ordering::Equal => (),
non_eq => return non_eq,
},
}
}
}
#[inline]
fn next_no_case<'a, X>(xs: &mut X) -> Option<String>
where
X: Iterator<Item = &'a [u8]>,
{
xs.next()
.map(|bytes| unsafe { std::str::from_utf8_unchecked(bytes) })
.map(str::to_lowercase)
}