Skip to content

Commit 4de885d

Browse files
committed
PARQUET-2261: Implement SizeStatistics
1 parent 4f78c86 commit 4de885d

File tree

21 files changed

+1390
-98
lines changed

21 files changed

+1390
-98
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.parquet.column.ParquetProperties;
2727
import org.apache.parquet.column.page.DictionaryPage;
2828
import org.apache.parquet.column.page.PageWriter;
29+
import org.apache.parquet.column.statistics.SizeStatistics;
2930
import org.apache.parquet.column.statistics.Statistics;
3031
import org.apache.parquet.column.values.ValuesWriter;
3132
import org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter;
@@ -55,6 +56,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
5556
private int valueCount;
5657

5758
private Statistics<?> statistics;
59+
private SizeStatistics.Builder sizeStatisticsBuilder;
5860
private long rowsWrittenSoFar = 0;
5961
private int pageRowCount;
6062

@@ -112,6 +114,8 @@ private void log(Object value, int r, int d) {
112114

113115
private void resetStatistics() {
114116
this.statistics = Statistics.createStats(path.getPrimitiveType());
117+
this.sizeStatisticsBuilder = new SizeStatistics.Builder(
118+
path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel());
115119
}
116120

117121
private void definitionLevel(int definitionLevel) {
@@ -138,6 +142,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) {
138142
repetitionLevel(repetitionLevel);
139143
definitionLevel(definitionLevel);
140144
statistics.incrementNumNulls();
145+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
141146
++valueCount;
142147
}
143148

@@ -201,6 +206,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
201206
definitionLevel(definitionLevel);
202207
dataColumn.writeDouble(value);
203208
statistics.updateStats(value);
209+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
204210
updateBloomFilter(value);
205211
++valueCount;
206212
}
@@ -219,6 +225,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
219225
definitionLevel(definitionLevel);
220226
dataColumn.writeFloat(value);
221227
statistics.updateStats(value);
228+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
222229
updateBloomFilter(value);
223230
++valueCount;
224231
}
@@ -237,6 +244,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
237244
definitionLevel(definitionLevel);
238245
dataColumn.writeBytes(value);
239246
statistics.updateStats(value);
247+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel, value);
240248
updateBloomFilter(value);
241249
++valueCount;
242250
}
@@ -255,6 +263,7 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) {
255263
definitionLevel(definitionLevel);
256264
dataColumn.writeBoolean(value);
257265
statistics.updateStats(value);
266+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
258267
++valueCount;
259268
}
260269

@@ -272,6 +281,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
272281
definitionLevel(definitionLevel);
273282
dataColumn.writeInteger(value);
274283
statistics.updateStats(value);
284+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
275285
updateBloomFilter(value);
276286
++valueCount;
277287
}
@@ -290,6 +300,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
290300
definitionLevel(definitionLevel);
291301
dataColumn.writeLong(value);
292302
statistics.updateStats(value);
303+
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
293304
updateBloomFilter(value);
294305
++valueCount;
295306
}
@@ -389,7 +400,15 @@ void writePage() {
389400
this.rowsWrittenSoFar += pageRowCount;
390401
if (DEBUG) LOG.debug("write page");
391402
try {
392-
writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn);
403+
SizeStatistics sizeStatistics = sizeStatisticsBuilder.build();
404+
writePage(
405+
pageRowCount,
406+
valueCount,
407+
statistics,
408+
sizeStatistics,
409+
repetitionLevelColumn,
410+
definitionLevelColumn,
411+
dataColumn);
393412
} catch (IOException e) {
394413
throw new ParquetEncodingException("could not write page for " + path, e);
395414
}
@@ -401,6 +420,7 @@ void writePage() {
401420
pageRowCount = 0;
402421
}
403422

423+
@Deprecated
404424
abstract void writePage(
405425
int rowCount,
406426
int valueCount,
@@ -409,4 +429,14 @@ abstract void writePage(
409429
ValuesWriter definitionLevels,
410430
ValuesWriter values)
411431
throws IOException;
432+
433+
abstract void writePage(
434+
int rowCount,
435+
int valueCount,
436+
Statistics<?> statistics,
437+
SizeStatistics sizeStatistics,
438+
ValuesWriter repetitionLevels,
439+
ValuesWriter definitionLevels,
440+
ValuesWriter values)
441+
throws IOException;
412442
}

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.parquet.column.ColumnDescriptor;
2525
import org.apache.parquet.column.ParquetProperties;
2626
import org.apache.parquet.column.page.PageWriter;
27+
import org.apache.parquet.column.statistics.SizeStatistics;
2728
import org.apache.parquet.column.statistics.Statistics;
2829
import org.apache.parquet.column.values.ValuesWriter;
2930
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
@@ -56,6 +57,7 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
5657
}
5758

5859
@Override
60+
@Deprecated
5961
void writePage(
6062
int rowCount,
6163
int valueCount,
@@ -64,11 +66,25 @@ void writePage(
6466
ValuesWriter definitionLevels,
6567
ValuesWriter values)
6668
throws IOException {
69+
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
70+
}
71+
72+
@Override
73+
void writePage(
74+
int rowCount,
75+
int valueCount,
76+
Statistics<?> statistics,
77+
SizeStatistics sizeStatistics,
78+
ValuesWriter repetitionLevels,
79+
ValuesWriter definitionLevels,
80+
ValuesWriter values)
81+
throws IOException {
6782
pageWriter.writePage(
6883
concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()),
6984
valueCount,
7085
rowCount,
7186
statistics,
87+
sizeStatistics,
7288
repetitionLevels.getEncoding(),
7389
definitionLevels.getEncoding(),
7490
values.getEncoding());

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.parquet.column.Encoding;
2525
import org.apache.parquet.column.ParquetProperties;
2626
import org.apache.parquet.column.page.PageWriter;
27+
import org.apache.parquet.column.statistics.SizeStatistics;
2728
import org.apache.parquet.column.statistics.Statistics;
2829
import org.apache.parquet.column.values.ValuesWriter;
2930
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
@@ -82,6 +83,7 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
8283
}
8384

8485
@Override
86+
@Deprecated
8587
void writePage(
8688
int rowCount,
8789
int valueCount,
@@ -90,6 +92,19 @@ void writePage(
9092
ValuesWriter definitionLevels,
9193
ValuesWriter values)
9294
throws IOException {
95+
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
96+
}
97+
98+
@Override
99+
void writePage(
100+
int rowCount,
101+
int valueCount,
102+
Statistics<?> statistics,
103+
SizeStatistics sizeStatistics,
104+
ValuesWriter repetitionLevels,
105+
ValuesWriter definitionLevels,
106+
ValuesWriter values)
107+
throws IOException {
93108
// TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different
94109
// otherwise)
95110
BytesInput bytes = values.getBytes();
@@ -102,6 +117,7 @@ void writePage(
102117
definitionLevels.getBytes(),
103118
encoding,
104119
bytes,
105-
statistics);
120+
statistics,
121+
sizeStatistics);
106122
}
107123
}

parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import org.apache.parquet.bytes.BytesInput;
2323
import org.apache.parquet.column.Encoding;
24+
import org.apache.parquet.column.statistics.SizeStatistics;
2425
import org.apache.parquet.column.statistics.Statistics;
2526

2627
/**
@@ -63,6 +64,7 @@ void writePage(
6364
* @param valuesEncoding values encoding
6465
* @throws IOException
6566
*/
67+
@Deprecated
6668
void writePage(
6769
BytesInput bytesInput,
6870
int valueCount,
@@ -73,6 +75,29 @@ void writePage(
7375
Encoding valuesEncoding)
7476
throws IOException;
7577

78+
/**
79+
* writes a single page
80+
* @param bytesInput the bytes for the page
81+
* @param valueCount the number of values in that page
82+
* @param rowCount the number of rows in that page
83+
* @param statistics the statistics for that page
84+
* @param sizeStatistics the size statistics for that page
85+
* @param rlEncoding repetition level encoding
86+
* @param dlEncoding definition level encoding
87+
* @param valuesEncoding values encoding
88+
* @throws IOException
89+
*/
90+
void writePage(
91+
BytesInput bytesInput,
92+
int valueCount,
93+
int rowCount,
94+
Statistics<?> statistics,
95+
SizeStatistics sizeStatistics,
96+
Encoding rlEncoding,
97+
Encoding dlEncoding,
98+
Encoding valuesEncoding)
99+
throws IOException;
100+
76101
/**
77102
* writes a single page in the new format
78103
*
@@ -86,6 +111,7 @@ void writePage(
86111
* @param statistics optional stats for this page
87112
* @throws IOException if there is an exception while writing page data
88113
*/
114+
@Deprecated
89115
void writePageV2(
90116
int rowCount,
91117
int nullCount,
@@ -97,6 +123,31 @@ void writePageV2(
97123
Statistics<?> statistics)
98124
throws IOException;
99125

126+
/**
127+
* writes a single page in the new format
128+
* @param rowCount the number of rows in this page
129+
* @param nullCount the number of null values (out of valueCount)
130+
* @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
131+
* @param repetitionLevels the repetition levels encoded in RLE without any size header
132+
* @param definitionLevels the definition levels encoded in RLE without any size header
133+
* @param dataEncoding the encoding for the data
134+
* @param data the data encoded with dataEncoding
135+
* @param statistics optional stats for this page
136+
* @param sizeStatistics optional size stats for this page
137+
* @throws IOException if there is an exception while writing page data
138+
*/
139+
void writePageV2(
140+
int rowCount,
141+
int nullCount,
142+
int valueCount,
143+
BytesInput repetitionLevels,
144+
BytesInput definitionLevels,
145+
Encoding dataEncoding,
146+
BytesInput data,
147+
Statistics<?> statistics,
148+
SizeStatistics sizeStatistics)
149+
throws IOException;
150+
100151
/**
101152
* @return the current size used in the memory buffer for that column chunk
102153
*/

0 commit comments

Comments
 (0)