Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes test failures
  • Loading branch information
liancheng committed Aug 1, 2015
commit 821e9ec25b755b4f46d715cbee465f2cb4432afb
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,10 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}

override def write(row: InternalRow): Unit = {
assert(row.numFields == schema.length)
recordConsumer.startMessage()
writeFields(row)
recordConsumer.endMessage()
consumeMessage(writeFields(row, schema))
}

private def writeFields(row: InternalRow): Unit = {
private def writeFields(row: InternalRow, schema: StructType): Unit = {
val consumers = schema.map(_.dataType).map(makeConsumer)
var i = 0

Expand Down Expand Up @@ -145,9 +142,6 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
(row: InternalRow, ordinal: Int) =>
recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))

case DecimalType.Unlimited =>
sys.error(s"Unsupported data type $dataType. Decimal precision must be specified.")

case DecimalType.Fixed(precision, _) if precision > 18 =>
sys.error(s"Unsupported data type $dataType. Decimal precision cannot be greater than 18.")

Expand All @@ -169,9 +163,9 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes))
}

case StructType(fields) =>
case structType @ StructType(fields) =>
(row: InternalRow, ordinal: Int) =>
consumeGroup(writeFields(row.getStruct(ordinal, fields.length)))
consumeGroup(writeFields(row.getStruct(ordinal, fields.length), structType))

case arrayType: ArrayType if followParquetFormatSpec =>
makeStandardArrayConsumer(arrayType.elementType)
Expand Down Expand Up @@ -214,7 +208,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
(row: InternalRow, ordinal: Int) => {
consumeGroup {
consumeField(repeatedGroupName, 0) {
val array = row.get(ordinal).asInstanceOf[Array[_]]
val array = row.get(ordinal).asInstanceOf[Seq[_]]
var i = 0

while (i < array.length) {
Expand All @@ -241,7 +235,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
(row: InternalRow, ordinal: Int) => {
consumeGroup {
consumeField(repeatedFieldName, 0) {
val array = row.get(ordinal).asInstanceOf[Array[_]]
val array = row.get(ordinal).asInstanceOf[Seq[_]]
var i = 0

while (i < array.length) {
Expand Down Expand Up @@ -281,6 +275,12 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
}

private def consumeMessage(f: => Unit): Unit = {
recordConsumer.startMessage()
f
recordConsumer.endMessage()
}

private def consumeGroup(f: => Unit): Unit = {
recordConsumer.startGroup()
f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left * empty).
* left empty).
*/
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
import org.apache.parquet.hadoop._
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

Expand Down Expand Up @@ -203,14 +203,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
}

test("compression codec") {
def compressionCodecFor(path: String): String = {
val codecs = readMetadata(new Path(path), configuration)
.getBlocks
.flatMap(_.getColumns)
.map(_.getCodec.name())
.distinct

assert(codecs.size === 1)
def compressionCodecFor(path: String, codecName: String): String = {
val codecs = for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), configuration)
block <- footer.getParquetMetadata.getBlocks
column <- block.getColumns
} yield column.getCodec.name()

assert(codecs.distinct === Seq(codecName))
codecs.head
}

Expand All @@ -220,7 +220,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
compressionCodecFor(path)
compressionCodecFor(path, codec.name())
}
}
}
Expand Down Expand Up @@ -282,9 +282,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))

val metaData = readMetadata(path, configuration)
val actualSchema = metaData.getFileMetaData.getSchema
val expectedSchema = new CatalystSchemaConverter(configuration).convert(schema)
val actualSchema = readFooter(path, configuration).getFileMetaData.getSchema

actualSchema.checkContains(expectedSchema)
expectedSchema.checkContains(actualSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.parquet

import java.io.File

import scala.collection.JavaConverters.{mapAsJavaMapConverter, seqAsJavaListConverter}
import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

Expand All @@ -30,7 +30,6 @@ import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetM
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SaveMode}
Expand Down Expand Up @@ -117,19 +116,16 @@ private[sql] trait ParquetTest extends SQLTestUtils { this: SparkFunSuite =>
ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava)
}


def readMetadata(path: Path, configuration: Configuration): ParquetMetadata = {
val summaryFileNames = Seq(
ParquetFileWriter.PARQUET_METADATA_FILE,
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

def readAllFootersWithoutSummaryFiles(
path: Path, configuration: Configuration): Seq[Footer] = {
val fs = path.getFileSystem(configuration)
val leaves = SparkHadoopUtil.get.listLeafStatuses(fs, path).filter { f =>
val name = f.getPath.getName
name.startsWith(".") && name.startsWith("_") || summaryFileNames.contains(name)
}
ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq
}

def readFooter(path: Path, configuration: Configuration): ParquetMetadata = {
ParquetFileReader.readFooter(
configuration, leaves.head, ParquetMetadataConverter.SKIP_ROW_GROUPS)
configuration,
new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE),
ParquetMetadataConverter.NO_FILTER)
}
}