Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
2ca2c38
init commit
lianhuiwang Jun 3, 2016
edea710
fix unit test
lianhuiwang Jun 3, 2016
8426522
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 3, 2016
153293e
fix unit test
lianhuiwang Jun 3, 2016
7dfb743
update
lianhuiwang Jun 24, 2016
68e6d6d
Revert "fix unit test"
lianhuiwang Jun 24, 2016
595ef36
Revert "fix unit test"
lianhuiwang Jun 24, 2016
7d7ece0
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
2e55a9d
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
b2b6eba
update
lianhuiwang Jun 24, 2016
c5a291e
Merge branch 'apache-master' into metadata-only
lianhuiwang Jun 24, 2016
6404c1f
update opt for core
lianhuiwang Jun 24, 2016
1bb5812
refactor
lianhuiwang Jun 24, 2016
7e3729e
add ut
lianhuiwang Jun 24, 2016
fbf5d61
fix ut
lianhuiwang Jun 24, 2016
3411fd6
fix project
lianhuiwang Jun 26, 2016
aefab7f
address comments
lianhuiwang Jun 27, 2016
c5ccdea
fix cube/rollup
lianhuiwang Jun 27, 2016
ae6cf9f
fix style
lianhuiwang Jun 27, 2016
159331b
refactor
lianhuiwang Jun 27, 2016
3a1438b
refactor
lianhuiwang Jun 28, 2016
c0a7d59
update
lianhuiwang Jun 29, 2016
a4045ca
add comments
lianhuiwang Jun 29, 2016
0a023e7
fix minor
lianhuiwang Jun 29, 2016
a9b38ab
rename
lianhuiwang Jun 29, 2016
a5ea995
update
lianhuiwang Jun 29, 2016
1bed08d
fix monir
lianhuiwang Jun 29, 2016
a22e962
refactor
lianhuiwang Jul 1, 2016
41fef2c
update
lianhuiwang Jul 1, 2016
bd53678
Merge branch 'apache-master' into metadata-only
lianhuiwang Jul 1, 2016
88f7308
update
lianhuiwang Jul 1, 2016
2568193
add ut
lianhuiwang Jul 1, 2016
26a97f4
address comments
lianhuiwang Jul 4, 2016
4297f9f
update name
lianhuiwang Jul 6, 2016
1a65aa7
address comments
lianhuiwang Jul 6, 2016
d5e0df4
update
lianhuiwang Jul 6, 2016
9d6dd76
update2
lianhuiwang Jul 6, 2016
9cb01d8
update
lianhuiwang Jul 6, 2016
3e2687d
doc improve
cloud-fan Jul 6, 2016
2b4faf3
update
cloud-fan Jul 7, 2016
88fd3bf
Merge pull request #2 from cloud-fan/metadata-only
lianhuiwang Jul 7, 2016
a894bb7
delete cases
lianhuiwang Jul 7, 2016
9546b40
Merge branch 'metadata-only' of https://github.com/lianhuiwang/spark …
lianhuiwang Jul 7, 2016
85b695b
update ut
lianhuiwang Jul 7, 2016
bcfe8e5
Merge branch 'master' of https://github.com/apache/spark into metadat…
lianhuiwang Jul 7, 2016
67211be
Merge branch 'master' of https://github.com/apache/spark into metadat…
lianhuiwang Jul 7, 2016
501f93b
address commetns
lianhuiwang Jul 11, 2016
8ee2a8c
refactor
lianhuiwang Jul 11, 2016
d888c85
fix minor
lianhuiwang Jul 11, 2016
ff16509
update
lianhuiwang Jul 12, 2016
358ad13
remove duplicate code
lianhuiwang Jul 12, 2016
030776a
fix minor
lianhuiwang Jul 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ private[sql] object DataSourceScanExec {
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty,
metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = {
metastoreTableIdentifier: Option[TableIdentifier] = None,
isSupportBatch: Boolean = true): DataSourceScanExec = {
val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
Expand All @@ -364,7 +365,8 @@ private[sql] object DataSourceScanExec {

relation match {
case r: HadoopFsRelation
if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) =>
if isSupportBatch &&
r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) =>
BatchedDataSourceScanExec(
output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.DataSourceScanExec
Expand Down Expand Up @@ -109,108 +111,45 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

val readFile = files.fileFormat.buildReaderWithPartitionValues(
sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options,
hadoopConf = files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))

val plannedPartitions = files.bucketSpec match {
case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
p.files.map { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

(0 until bucketing.numBuckets).map { bucketId =>
FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
}

case _ =>
val defaultMaxSplitBytes = files.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = files.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = files.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Add the given file to the current partition. */
def addFile(file: PartitionedFile): Unit = {
currentSize += file.length + openCostInBytes
currentFiles.append(file)
}

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray.toSeq) // Copy to a new Array.
partitions.append(newPartition)
}
currentFiles.clear()
currentSize = 0
}

// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
addFile(file)
}
closePartition()
partitions
val optimizerMetadataOnly =
readDataColumns.isEmpty && files.sparkSession.sessionState.conf.optimizerMetadataOnly
val scanRdd: RDD[InternalRow] = if (optimizerMetadataOnly) {
val partitionSchema = files.partitionSchema.toAttributes
lazy val converter = GenerateUnsafeProjection.generate(partitionSchema, partitionSchema)
val partitionValues = selectedPartitions.map(_.values)
files.sqlContext.sparkContext.parallelize(partitionValues, 1).map(converter(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this partition has more than one data files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now in this PR, default of spark.sql.optimizer.metadataOnly is false, So if user needs this feature, he should set spark.sql.optimizer.metadataOnly=true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think optimizer should never affect the correctness of the query result. If this optimization is too hard to implement with current code base, we should improve the code base first, instead of rushing in a partial implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I rethink more and then i will add a metadataOnly optimizer to optimizer list.Thanks.

} else {
val readFile = files.fileFormat.buildReaderWithPartitionValues(
sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options,
hadoopConf = files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))

val plannedPartitions = getFilePartitions(files, selectedPartitions)
new FileScanRDD(
files.sparkSession,
readFile,
plannedPartitions)
}

val meta = Map(
"Format" -> files.fileFormat.toString,
"ReadSchema" -> prunedDataSchema.simpleString,
"metadataOnly" -> optimizerMetadataOnly.toString,
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
INPUT_PATHS -> files.location.paths.mkString(", "))

val scan =
DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sparkSession,
readFile,
plannedPartitions),
scanRdd,
files,
meta,
table)
table,
!optimizerMetadataOnly)

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
Expand All @@ -225,6 +164,85 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case _ => Nil
}

private def getFilePartitions(
files: HadoopFsRelation,
selectedPartitions: Seq[Partition]): Seq[FilePartition] = files.bucketSpec match {
case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
p.files.map { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

(0 until bucketing.numBuckets).map { bucketId =>
FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
}

case _ =>
val defaultMaxSplitBytes = files.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = files.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = files.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Add the given file to the current partition. */
def addFile(file: PartitionedFile): Unit = {
currentSize += file.length + openCostInBytes
currentFiles.append(file)
}

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray.toSeq) // Copy to a new Array.
partitions.append(newPartition)
}
currentFiles.clear()
currentSize = 0
}

// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
addFile(file)
}
closePartition()
partitions
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
case f: LocatedFileStatus => f.getBlockLocations
case f => Array.empty[BlockLocation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the doc to explain what metadata-only query means.

.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we turn it on by default?


val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
.internal()
.doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
Expand Down Expand Up @@ -599,6 +604,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

def nativeView: Boolean = getConf(NATIVE_VIEW)

def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,21 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}

test("optimize metadataOnly") {
withSQLConf("spark.sql.optimizer.metadataOnly" -> "true") {
val table =
createTable(
files = Seq(
"p1=1/file1" -> 10,
"p1=2/file2" -> 10))

checkDataset(table.select($"p1"), Row(1), Row(2))
checkDataset(table.where("p1 = 1").select($"p1"), Row(1))
val df = table.where("p1 = 1 AND (p1 + c1) = 2 AND c1 = 1")
assert(getPhysicalFilters(df) contains resolve(df, "c1 = 1"))
}
}

// Helpers for checking the arguments passed to the FileFormat.

protected val checkPartitionSchema =
Expand Down