Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.util.{Failure, Try}

import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
Expand Down Expand Up @@ -281,20 +282,27 @@ private[sql] class ParquetRelation(
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec

// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
// of these flags are smaller than the parquet row group size.
val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)

// Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt =
ParquetRelation.initializeLocalJobFunc(
requiredColumns,
filters,
dataSchema,
parquetBlockSize,
useMetadataCache,
parquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp,
followParquetFormatSpec) _

// Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
val setInputPaths =
ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _

Utils.withDummyCallSite(sqlContext.sparkContext) {
new SqlNewHadoopRDD(
Expand Down Expand Up @@ -482,11 +490,35 @@ private[sql] object ParquetRelation extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"

/**
* If parquet's block size (row group size) setting is larger than the min split size,
* we use parquet's block size setting as the min split size. Otherwise, we will create
* tasks processing nothing (because a split does not cover the starting point of a
* parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
*/
private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
val minSplitSize =
math.max(
conf.getLong("mapred.min.split.size", 0L),
conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
if (parquetBlockSize > minSplitSize) {
val message =
s"Parquet's block size (row group size) is larger than " +
s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " +
s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " +
s"$parquetBlockSize."
logDebug(message)
conf.set("mapred.min.split.size", parquetBlockSize.toString)
conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
}
}

/** This closure sets various Parquet configurations at both driver side and executor side. */
private[parquet] def initializeLocalJobFunc(
requiredColumns: Array[String],
filters: Array[Filter],
dataSchema: StructType,
parquetBlockSize: Long,
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
Expand Down Expand Up @@ -522,16 +554,21 @@ private[sql] object ParquetRelation extends Logging {
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)

overrideMinSplitSize(parquetBlockSize, conf)
}

/** This closure sets input paths at the driver side. */
private[parquet] def initializeDriverSideJobFunc(
inputFiles: Array[FileStatus])(job: Job): Unit = {
inputFiles: Array[FileStatus],
parquetBlockSize: Long)(job: Job): Unit = {
// We side the input paths at the driver side.
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
}

private[parquet] def readSchema(
Expand Down