Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
PARQUET-2261: Implement SizeStatistics
  • Loading branch information
wgtmac committed Feb 24, 2024
commit 6039c93bd96b7b083d07ecd6b19bd4bb7b1b9311
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter;
Expand Down Expand Up @@ -55,6 +56,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
private int valueCount;

private Statistics<?> statistics;
private SizeStatistics.Builder sizeStatisticsBuilder;
private long rowsWrittenSoFar = 0;
private int pageRowCount;

Expand Down Expand Up @@ -112,6 +114,8 @@ private void log(Object value, int r, int d) {

private void resetStatistics() {
this.statistics = Statistics.createStats(path.getPrimitiveType());
this.sizeStatisticsBuilder = new SizeStatistics.Builder(
path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel());
}

private void definitionLevel(int definitionLevel) {
Expand All @@ -138,6 +142,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
statistics.incrementNumNulls();
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
++valueCount;
}

Expand Down Expand Up @@ -201,6 +206,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -219,6 +225,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -237,6 +244,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel, value);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -255,6 +263,7 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBoolean(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
++valueCount;
}

Expand All @@ -272,6 +281,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -290,6 +300,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand Down Expand Up @@ -389,7 +400,15 @@ void writePage() {
this.rowsWrittenSoFar += pageRowCount;
if (DEBUG) LOG.debug("write page");
try {
writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn);
SizeStatistics sizeStatistics = sizeStatisticsBuilder.build();
writePage(
pageRowCount,
valueCount,
statistics,
sizeStatistics,
repetitionLevelColumn,
definitionLevelColumn,
dataColumn);
} catch (IOException e) {
throw new ParquetEncodingException("could not write page for " + path, e);
}
Expand All @@ -409,4 +428,14 @@ abstract void writePage(
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException;

abstract void writePage(
int rowCount,
int valueCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels,
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
Expand Down Expand Up @@ -64,11 +65,25 @@ void writePage(
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
}

@Override
void writePage(
int rowCount,
int valueCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels,
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
pageWriter.writePage(
concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()),
valueCount,
rowCount,
statistics,
sizeStatistics,
repetitionLevels.getEncoding(),
definitionLevels.getEncoding(),
values.getEncoding());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
Expand Down Expand Up @@ -90,6 +91,19 @@ void writePage(
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
}

@Override
void writePage(
int rowCount,
int valueCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels,
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
// TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different
// otherwise)
BytesInput bytes = values.getBytes();
Expand All @@ -102,6 +116,7 @@ void writePage(
definitionLevels.getBytes(),
encoding,
bytes,
statistics);
statistics,
sizeStatistics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;

/**
Expand Down Expand Up @@ -73,6 +74,29 @@ void writePage(
Encoding valuesEncoding)
throws IOException;

/**
* writes a single page
* @param bytesInput the bytes for the page
* @param valueCount the number of values in that page
* @param rowCount the number of rows in that page
* @param statistics the statistics for that page
* @param sizeStatistics the size statistics for that page
* @param rlEncoding repetition level encoding
* @param dlEncoding definition level encoding
* @param valuesEncoding values encoding
* @throws IOException
*/
void writePage(
BytesInput bytesInput,
int valueCount,
int rowCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding)
throws IOException;

/**
* writes a single page in the new format
*
Expand All @@ -97,6 +121,31 @@ void writePageV2(
Statistics<?> statistics)
throws IOException;

/**
* writes a single page in the new format
* @param rowCount the number of rows in this page
* @param nullCount the number of null values (out of valueCount)
* @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
* @param repetitionLevels the repetition levels encoded in RLE without any size header
* @param definitionLevels the definition levels encoded in RLE without any size header
* @param dataEncoding the encoding for the data
* @param data the data encoded with dataEncoding
* @param statistics optional stats for this page
* @param sizeStatistics optional size stats for this page
* @throws IOException if there is an exception while writing page data
*/
void writePageV2(
int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
Statistics<?> statistics,
SizeStatistics sizeStatistics)
throws IOException;

/**
* @return the current size used in the memory buffer for that column chunk
*/
Expand Down
Loading