Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
PARQUET-306: Base next row group size on bytes remaining.
This uses the getNextRowGroupSize in InternalParquetRecordWriter to set
the target size of the next row group when a row group is flushed. The
actual target size is either this value (the remaining bytes in the
block) or the row group size set by the memory manager, whichever is
smaller.
  • Loading branch information
rdblue committed Jun 22, 2015
commit f1dc6598e9a8ed8261566721bcaffc9cfcbf6ed2
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.parquet.Ints;
import org.apache.parquet.Log;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
Expand All @@ -54,6 +51,7 @@ class InternalParquetRecordWriter<T> {
private final Map<String, String> extraMetaData;
private final long rowGroupSize;
private long rowGroupSizeThreshold;
private long nextRowGroupSize;
private final int pageSize;
private final BytesCompressor compressor;
private final boolean validating;
Expand Down Expand Up @@ -92,6 +90,7 @@ public InternalParquetRecordWriter(
this.extraMetaData = extraMetaData;
this.rowGroupSize = rowGroupSize;
this.rowGroupSizeThreshold = rowGroupSize;
this.nextRowGroupSize = rowGroupSizeThreshold;
this.pageSize = pageSize;
this.compressor = compressor;
this.validating = validating;
Expand Down Expand Up @@ -126,15 +125,17 @@ public void write(T value) throws IOException, InterruptedException {
private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = columnStore.getBufferedSize();
if (memSize > rowGroupSizeThreshold) {
LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
long recordSize = memSize / recordCount;
// flush the row group if it is within ~2 records of the limit
// it is much better to be slightly under size than to be over at all
if (memSize > (nextRowGroupSize - 2 * recordSize)) {
LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount));
flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
} else {
float recordSize = (float) memSize / recordCount;
recordCountForNextMemCheck = min(
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
Expand All @@ -145,7 +146,7 @@ private void checkBlockSizeReached() throws IOException {
private void flushRowGroupToStore()
throws IOException {
LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
LOG.warn("Too much memory used: " + columnStore.memUsageString());
}

Expand All @@ -155,6 +156,9 @@ private void flushRowGroupToStore()
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(
parquetFileWriter.getNextRowGroupSize(),
rowGroupSizeThreshold);
}

columnStore = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ public void alignForRowGroup(FSDataOutputStream out) throws IOException {

if (isPaddingNeeded(remaining)) {
if (DEBUG) LOG.debug("Adding " + remaining + " bytes of padding (" +
"row group size=" + rowGroupSize + "," +
"block size=" + dfsBlockSize + ")");
"row group size=" + rowGroupSize + "B, " +
"block size=" + dfsBlockSize + "B)");
for (; remaining > 0; remaining -= zeros.length) {
out.write(zeros, 0, (int) Math.min((long) zeros.length, remaining));
}
Expand Down