diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java index e719afcbef..d0c063325e 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java @@ -145,67 +145,12 @@ private static WriteSupport writeSupport(Configuration conf, new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model); } - public static class Builder { - private final Path file; - private Configuration conf = new Configuration(); - private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; - private int blockSize = DEFAULT_BLOCK_SIZE; - private int pageSize = DEFAULT_PAGE_SIZE; - private boolean enableDictionary = DEFAULT_IS_DICTIONARY_ENABLED; - private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; - private WriterVersion writerVersion = DEFAULT_WRITER_VERSION; - - // avro-specific + public static class Builder extends ParquetWriter.Builder> { private Schema schema = null; private GenericData model = SpecificData.get(); private Builder(Path file) { - this.file = file; - } - - public Builder withConf(Configuration conf) { - this.conf = conf; - return this; - } - - public Builder withCompressionCodec(CompressionCodecName codecName) { - this.codecName = codecName; - return this; - } - - public Builder withBlockSize(int blockSize) { - this.blockSize = blockSize; - return this; - } - - public Builder withPageSize(int pageSize) { - this.pageSize = pageSize; - return this; - } - - public Builder enableDictionaryEncoding() { - this.enableDictionary = true; - return this; - } - - public Builder withDictionaryEncoding(boolean enableDictionary) { - this.enableDictionary = enableDictionary; - return this; - } - - public Builder enableValidation() { - this.enableValidation = true; - return this; - } - - public Builder withValidation(boolean enableValidation) { - this.enableValidation = enableValidation; - return this; - } - - public Builder withWriterVersion(WriterVersion version) { - this.writerVersion = version; - return this; + super(file); } public Builder withSchema(Schema schema) { @@ -218,14 +163,14 @@ public Builder withDataModel(GenericData model) { return this; } - private WriteSupport getWriteSupport() { - return AvroParquetWriter.writeSupport(conf, schema, model); + @Override + protected Builder self() { + return this; } - public ParquetWriter build() throws IOException { - return new AvroParquetWriter(file, getWriteSupport(), codecName, - blockSize, pageSize, enableDictionary, enableValidation, - writerVersion, conf); + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return AvroParquetWriter.writeSupport(conf, schema, model); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index c9b9eac950..70abdaca79 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -61,6 +61,7 @@ public class ParquetWriter implements Closeable { * @throws IOException * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, boolean, boolean) */ + @Deprecated public ParquetWriter(Path file, WriteSupport writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { this(file, writeSupport, compressionCodecName, blockSize, pageSize, DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); @@ -79,6 +80,7 @@ public ParquetWriter(Path file, WriteSupport writeSupport, CompressionCodecNa * @throws IOException * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean) */ + @Deprecated public ParquetWriter( Path file, WriteSupport writeSupport, @@ -104,6 +106,7 @@ public ParquetWriter( * @throws IOException * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, WriterVersion) */ + @Deprecated public ParquetWriter( Path file, WriteSupport writeSupport, @@ -136,6 +139,7 @@ public ParquetWriter( * @throws IOException * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, WriterVersion, Configuration) */ + @Deprecated public ParquetWriter( Path file, WriteSupport writeSupport, @@ -164,6 +168,7 @@ public ParquetWriter( * @param conf Hadoop configuration to use while accessing the filesystem * @throws IOException */ + @Deprecated public ParquetWriter( Path file, WriteSupport writeSupport, @@ -179,6 +184,7 @@ public ParquetWriter( compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, conf); } + /** * Create a new ParquetWriter. * @@ -195,6 +201,7 @@ public ParquetWriter( * @param conf Hadoop configuration to use while accessing the filesystem * @throws IOException */ + @Deprecated public ParquetWriter( Path file, ParquetFileWriter.Mode mode, @@ -207,29 +214,9 @@ public ParquetWriter( boolean validating, WriterVersion writerVersion, Configuration conf) throws IOException { - - WriteSupport.WriteContext writeContext = writeSupport.init(conf); - MessageType schema = writeContext.getSchema(); - - // TODO: in a follow-up issue, add max padding to the builder - ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file, - mode, blockSize, MAX_PADDING_SIZE_DEFAULT); - fileWriter.start(); - - CodecFactory codecFactory = new CodecFactory(conf); - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); - this.writer = new InternalParquetRecordWriter( - fileWriter, - writeSupport, - schema, - writeContext.getExtraMetaData(), - blockSize, - pageSize, - compressor, - dictionaryPageSize, - enableDictionary, - validating, - writerVersion); + this(file, mode, writeSupport, compressionCodecName, blockSize, pageSize, + dictionaryPageSize, enableDictionary, validating, writerVersion, conf, + MAX_PADDING_SIZE_DEFAULT); } /** @@ -240,10 +227,12 @@ public ParquetWriter( * @param writeSupport the implementation to write a record to a RecordConsumer * @throws IOException */ + @Deprecated public ParquetWriter(Path file, WriteSupport writeSupport) throws IOException { this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); } + @Deprecated public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport) throws IOException { this(file, writeSupport, @@ -257,6 +246,43 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport conf); } + ParquetWriter( + Path file, + ParquetFileWriter.Mode mode, + WriteSupport writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + WriterVersion writerVersion, + Configuration conf, + int maxPaddingSize) throws IOException { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = new ParquetFileWriter( + conf, schema, file, mode, blockSize, maxPaddingSize); + fileWriter.start(); + + CodecFactory codecFactory = new CodecFactory(conf); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + this.writer = new InternalParquetRecordWriter( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + blockSize, + pageSize, + compressor, + dictionaryPageSize, + enableDictionary, + validating, + writerVersion); + } + public void write(T object) throws IOException { try { writer.write(object); @@ -273,4 +299,189 @@ public void close() throws IOException { throw new IOException(e); } } + + /** + * An abstract builder class for ParquetWriter instances. + * + * Object models should extend this builder to provide writer configuration + * options. + * + * @param The type of objects written by the constructed ParquetWriter. + * @param The type of this builder that is returned by builder methods + */ + public abstract static class Builder> { + private final Path file; + private Configuration conf = new Configuration(); + private ParquetFileWriter.Mode mode; + private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; + private int rowGroupSize = DEFAULT_BLOCK_SIZE; + private int pageSize = DEFAULT_PAGE_SIZE; + private int dictionaryPageSize = DEFAULT_PAGE_SIZE; + private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT; + private boolean enableDictionary = DEFAULT_IS_DICTIONARY_ENABLED; + private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; + private WriterVersion writerVersion = DEFAULT_WRITER_VERSION; + + protected Builder(Path file) { + this.file = file; + } + + /** + * @return this as the correct subclass of ParquetWriter.Builder. + */ + protected abstract SELF self(); + + /** + * @return an appropriate WriteSupport for the object model. + */ + protected abstract WriteSupport getWriteSupport(Configuration conf); + + /** + * Set the {@link Configuration} used by the constructed writer. + * + * @param conf a {@code Configuration} + * @return this builder for method chaining. + */ + public SELF withConf(Configuration conf) { + this.conf = conf; + return self(); + } + + /** + * Set the {@link ParquetFileWriter.Mode write mode} used when creating the + * backing file for this writer. + * + * @param mode a {@code ParquetFileWriter.Mode} + * @return this builder for method chaining. + */ + public SELF withWriteMode(ParquetFileWriter.Mode mode) { + this.mode = mode; + return self(); + } + + /** + * Set the {@link CompressionCodecName compression codec} used by the + * constructed writer. + * + * @param codecName a {@code CompressionCodecName} + * @return this builder for method chaining. + */ + public SELF withCompressionCodec(CompressionCodecName codecName) { + this.codecName = codecName; + return self(); + } + + /** + * Set the Parquet format row group size used by the constructed writer. + * + * @param rowGroupSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withRowGroupSize(int rowGroupSize) { + this.rowGroupSize = rowGroupSize; + return self(); + } + + /** + * Set the Parquet format page size used by the constructed writer. + * + * @param pageSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withPageSize(int pageSize) { + this.pageSize = pageSize; + return self(); + } + + /** + * Set the Parquet format dictionary page size used by the constructed + * writer. + * + * @param dictionaryPageSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withDictionaryPageSize(int dictionaryPageSize) { + this.dictionaryPageSize = dictionaryPageSize; + return self(); + } + + /** + * Set the maximum amount of padding, in bytes, that will be used to align + * row groups with blocks in the underlying filesystem. If the underlying + * filesystem is not a block filesystem like HDFS, this has no effect. + * + * @param maxPaddingSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withMaxPaddingSize(int maxPaddingSize) { + this.maxPaddingSize = maxPaddingSize; + return self(); + } + + /** + * Enables dictionary encoding for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enableDictionaryEncoding() { + this.enableDictionary = true; + return self(); + } + + /** + * Enable or disable dictionary encoding for the constructed writer. + * + * @param enableDictionary whether dictionary encoding should be enabled + * @return this builder for method chaining. + */ + public SELF withDictionaryEncoding(boolean enableDictionary) { + this.enableDictionary = enableDictionary; + return self(); + } + + /** + * Enables validation for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enableValidation() { + this.enableValidation = true; + return self(); + } + + /** + * Enable or disable validation for the constructed writer. + * + * @param enableValidation whether validation should be enabled + * @return this builder for method chaining. + */ + public SELF withValidation(boolean enableValidation) { + this.enableValidation = enableValidation; + return self(); + } + + /** + * Set the {@link WriterVersion format version} used by the constructed + * writer. + * + * @param version a {@code WriterVersion} + * @return this builder for method chaining. + */ + public SELF withWriterVersion(WriterVersion version) { + this.writerVersion = version; + return self(); + } + + /** + * Build a {@link ParquetWriter} with the accumulated configuration. + * + * @return a configured {@code ParquetWriter} instance. + * @throws IOException + */ + public ParquetWriter build() throws IOException { + return new ParquetWriter(file, mode, getWriteSupport(conf), codecName, + rowGroupSize, pageSize, dictionaryPageSize, enableDictionary, + enableValidation, writerVersion, conf, maxPaddingSize); + } + } }