Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.parquet.Ints;
import org.apache.parquet.Log;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
Expand All @@ -54,6 +51,7 @@ class InternalParquetRecordWriter<T> {
private final Map<String, String> extraMetaData;
private final long rowGroupSize;
private long rowGroupSizeThreshold;
private long nextRowGroupSize;
private final int pageSize;
private final BytesCompressor compressor;
private final boolean validating;
Expand Down Expand Up @@ -92,6 +90,7 @@ public InternalParquetRecordWriter(
this.extraMetaData = extraMetaData;
this.rowGroupSize = rowGroupSize;
this.rowGroupSizeThreshold = rowGroupSize;
this.nextRowGroupSize = rowGroupSizeThreshold;
this.pageSize = pageSize;
this.compressor = compressor;
this.validating = validating;
Expand Down Expand Up @@ -126,15 +125,17 @@ public void write(T value) throws IOException, InterruptedException {
private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = columnStore.getBufferedSize();
if (memSize > rowGroupSizeThreshold) {
LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
long recordSize = memSize / recordCount;
// flush the row group if it is within ~2 records of the limit
// it is much better to be slightly under size than to be over at all
if (memSize > (nextRowGroupSize - 2 * recordSize)) {
LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount));
flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
} else {
float recordSize = (float) memSize / recordCount;
recordCountForNextMemCheck = min(
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
Expand All @@ -145,7 +146,7 @@ private void checkBlockSizeReached() throws IOException {
private void flushRowGroupToStore()
throws IOException {
LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
LOG.warn("Too much memory used: " + columnStore.memUsageString());
}

Expand All @@ -155,6 +156,9 @@ private void flushRowGroupToStore()
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(
parquetFileWriter.getNextRowGroupSize(),
rowGroupSizeThreshold);
}

columnStore = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.parquet.Log.DEBUG;
import static org.apache.parquet.format.Util.writeFileMetaData;
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;

import java.io.IOException;
import java.nio.charset.Charset;
Expand All @@ -41,6 +43,7 @@
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand Down Expand Up @@ -69,6 +72,22 @@ public class ParquetFileWriter {
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
public static final int CURRENT_VERSION = 1;

// need to supply a buffer size when setting block size. this is the default
// for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is normally overridable via the hadoop config right? by hardcoding it, now requests for non default buffer sizes won't be respected right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't exposing a way to set this when we create the file anyway, so this isn't removing an option that users had. It is setting an option that the library had and wasn't using. Hard-coding it to the value we would always use anyway. I need to have a value for this because there isn't a method call that allows us to set the block size and default the buffer size. This default is very stable so I think this is perfectly safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this isn't removing an option that users had

Yeah that's what I'm wondering about -- we were using a method before that didn't ask for this info, did that method pull this info from the hadoop configuration? If so -- then users were previously able to change this by messing w/ their hadoop configuration, but now they can't.

On the other hand, if the method we were using before just grabbed the hardcoded default, then this makes sense as it's no different (I'm not worried about duplicating the default, just about whether we've taken away the ability to change this via hadoop configuration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is something you set through the API, it wasn't exposed before, and the default hasn't changed since Hadoop-1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks


// visible for testing
static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
static {
BLOCK_FS_SCHEMES.add("hdfs");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need hftp too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we extract this info from the path, or just make it a required argument to the constructor, and pass the right one in all the hadoop related usages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can tell easily, I think it makes sense to check this way. No need to complicate the API when you wouldn't ever change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's HFTP?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HFTP is a FileSystem implementation for HDFS over HTTP. It's used to read data across clusters when they have mismatched versions of HDFS.

I guess more of what I was getting at is, there's HDFS, HFTP, and VIEWFS, and (possibly?) more. So locking in to just hdfs maybe is too strict? I wonder if there's a property of FileSystem that describes whether it supports blocks / or if there's some flag that's set (like block size) that indicates that the FS uses blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like HFTP is a read-only FS so we shouldn't need to add that. But I think we should add viewfs as you suggested and webhdfs. Anything backed by HDFS should probably have it but we can add more over time.

BLOCK_FS_SCHEMES.add("webhdfs");
BLOCK_FS_SCHEMES.add("viewfs");
}

private static boolean supportsBlockSize(FileSystem fs) {
return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
}

// File creation modes
public static enum Mode {
CREATE,
Expand All @@ -79,13 +98,13 @@ public static enum Mode {

private final MessageType schema;
private final FSDataOutputStream out;
private final AlignmentStrategy alignment;
private BlockMetaData currentBlock;
private ColumnChunkMetaData currentColumn;
private long currentRecordCount;
private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
private long uncompressedLength;
private long compressedLength;
private Set<org.apache.parquet.column.Encoding> currentEncodings;
private Set<Encoding> currentEncodings;

private CompressionCodecName currentChunkCodec;
private ColumnPath currentChunkPath;
Expand Down Expand Up @@ -157,7 +176,8 @@ private final STATE error() throws IOException {
*/
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file) throws IOException {
this(configuration, schema, file, Mode.CREATE);
this(configuration, schema, file, Mode.CREATE, DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT);
}

/**
Expand All @@ -168,12 +188,60 @@ public ParquetFileWriter(Configuration configuration, MessageType schema,
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode) throws IOException {
super();
Path file, Mode mode) throws IOException {
this(configuration, schema, file, mode, DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT);
}

/**
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param file the file to write to
* @param mode file creation mode
* @param rowGroupSize the row group size
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode, long rowGroupSize,
int maxPaddingSize)
throws IOException {
this.schema = schema;
FileSystem fs = file.getFileSystem(configuration);
boolean overwriteFlag = (mode == Mode.OVERWRITE);
this.out = fs.create(file, overwriteFlag);

if (supportsBlockSize(fs)) {
// use the default block size, unless row group size is larger
long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by using the default block size, are we ignoring user's requests for non-default block sizes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, users control their HDFS block size by setting dfs.block.size, which is what this method accesses. So this is getting the user-configured block size, checking that it is valid, and using it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that's what I wanted to double check, thanks


this.alignment = PaddingAlignment.get(
dfsBlockSize, rowGroupSize, maxPaddingSize);
this.out = fs.create(file, overwriteFlag, DFS_BUFFER_SIZE_DEFAULT,
fs.getDefaultReplication(file), dfsBlockSize);

} else {
this.alignment = NoAlignment.get(rowGroupSize);
this.out = fs.create(file, overwriteFlag);
}
}

/**
* FOR TESTING ONLY.
*
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param file the file to write to
* @param rowAndBlockSize the row group size
* @throws IOException if the file can not be created
*/
ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, long rowAndBlockSize, int maxPaddingSize)
throws IOException {
FileSystem fs = file.getFileSystem(configuration);
this.schema = schema;
this.alignment = PaddingAlignment.get(
rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
this.out = fs.create(file, true, DFS_BUFFER_SIZE_DEFAULT,
fs.getDefaultReplication(file), rowAndBlockSize);
}

/**
Expand All @@ -195,6 +263,9 @@ public void startBlock(long recordCount) throws IOException {
state = state.startBlock();
if (DEBUG) LOG.debug(out.getPos() + ": start block");
// out.write(MAGIC); // TODO: add a magic delimiter

alignment.alignForRowGroup(out);

currentBlock = new BlockMetaData();
currentRecordCount = recordCount;
}
Expand All @@ -203,16 +274,14 @@ public void startBlock(long recordCount) throws IOException {
* start a column inside a block
* @param descriptor the column descriptor
* @param valueCount the value count in this column
* @param statistics the statistics in this column
* @param compressionCodecName
* @throws IOException
*/
public void startColumn(ColumnDescriptor descriptor,
long valueCount,
CompressionCodecName compressionCodecName) throws IOException {
state = state.startColumn();
if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
currentEncodings = new HashSet<org.apache.parquet.column.Encoding>();
currentEncodings = new HashSet<Encoding>();
currentChunkPath = ColumnPath.get(descriptor.getPath());
currentChunkType = descriptor.getType();
currentChunkCodec = compressionCodecName;
Expand Down Expand Up @@ -263,9 +332,9 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding) throws IOException {
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
Expand Down Expand Up @@ -300,9 +369,9 @@ public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding) throws IOException {
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
Expand Down Expand Up @@ -337,7 +406,7 @@ void writeDataPages(BytesInput bytes,
long uncompressedTotalPageSize,
long compressedTotalPageSize,
Statistics totalStats,
List<org.apache.parquet.column.Encoding> encodings) throws IOException {
List<Encoding> encodings) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
long headersSize = bytes.size() - compressedTotalPageSize;
Expand Down Expand Up @@ -367,8 +436,6 @@ public void endColumn() throws IOException {
currentChunkValueCount,
compressedLength,
uncompressedLength));
if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
currentColumn = null;
this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
this.uncompressedLength = 0;
this.compressedLength = 0;
Expand Down Expand Up @@ -464,6 +531,10 @@ public long getPos() throws IOException {
return out.getPos();
}

public long getNextRowGroupSize() throws IOException {
return alignment.nextRowGroupSize(out);
}

/**
* Will merge the metadata of all the footers together
* @param footers the list files footers to merge
Expand Down Expand Up @@ -550,4 +621,83 @@ static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, bool
return mergedSchema.union(toMerge, strict);
}

private interface AlignmentStrategy {
void alignForRowGroup(FSDataOutputStream out) throws IOException;

long nextRowGroupSize(FSDataOutputStream out) throws IOException;
}

private static class NoAlignment implements AlignmentStrategy {
public static NoAlignment get(long rowGroupSize) {
return new NoAlignment(rowGroupSize);
}

private final long rowGroupSize;

private NoAlignment(long rowGroupSize) {
this.rowGroupSize = rowGroupSize;
}

@Override
public void alignForRowGroup(FSDataOutputStream out) {
}

@Override
public long nextRowGroupSize(FSDataOutputStream out) {
return rowGroupSize;
}
}

/**
* Alignment strategy that pads when less than half the row group size is
* left before the next DFS block.
*/
private static class PaddingAlignment implements AlignmentStrategy {
private static final byte[] zeros = new byte[4096];

public static PaddingAlignment get(long dfsBlockSize, long rowGroupSize,
int maxPaddingSize) {
return new PaddingAlignment(dfsBlockSize, rowGroupSize, maxPaddingSize);
}

protected final long dfsBlockSize;
protected final long rowGroupSize;
protected final int maxPaddingSize;

private PaddingAlignment(long dfsBlockSize, long rowGroupSize,
int maxPaddingSize) {
this.dfsBlockSize = dfsBlockSize;
this.rowGroupSize = rowGroupSize;
this.maxPaddingSize = maxPaddingSize;
}

@Override
public void alignForRowGroup(FSDataOutputStream out) throws IOException {
long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);

if (isPaddingNeeded(remaining)) {
if (DEBUG) LOG.debug("Adding " + remaining + " bytes of padding (" +
"row group size=" + rowGroupSize + "B, " +
"block size=" + dfsBlockSize + "B)");
for (; remaining > 0; remaining -= zeros.length) {
out.write(zeros, 0, (int) Math.min((long) zeros.length, remaining));
}
}
}

@Override
public long nextRowGroupSize(FSDataOutputStream out) throws IOException {
long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);

if (isPaddingNeeded(remaining)) {
return rowGroupSize;
}

return Math.min(remaining, rowGroupSize);
}

protected boolean isPaddingNeeded(long remaining) {
return (remaining <= maxPaddingSize);
}
}
}
Loading