Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into native-ddl
Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
  • Loading branch information
viirya committed Feb 25, 2016
commit 5f8e70d7dfd7c7c3efb640210e9a0798dffad964
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// This test verifies parts of the plan. Disable whole stage codegen.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
val BucketSpec(numBuckets, bucketColumnNames, _, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
Expand All @@ -87,15 +87,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
}

val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
val matchedBuckets = new BitSet(numBuckets)
bucketValues.foreach { value =>
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
assert(rdd.isDefined, plan)

val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
}
// checking if all the pruned buckets are empty
assert(checkedResult.collect().forall(_ == true))

checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}
}

Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.