Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use withSQLConf in JoinSuite
  • Loading branch information
adrian-wang committed Aug 6, 2015
commit 6a771e9d9aa771ccbf6ef7cc594246956687f8e5
65 changes: 24 additions & 41 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.types.BinaryType
import org.apache.spark.sql.test.SQLTestUtils


class JoinSuite extends QueryTest with BeforeAndAfterEach {
class JoinSuite extends QueryTest with SQLTestUtils with BeforeAndAfterEach {
// Ensures tables are loaded.
TestData

override def sqlContext: SQLContext = org.apache.spark.sql.test.TestSQLContext
lazy val ctx = org.apache.spark.sql.test.TestSQLContext
import ctx.implicits._
import ctx.logicalPlanToSparkQuery
Expand Down Expand Up @@ -66,7 +67,6 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("join operator selection") {
ctx.cacheManager.clearCache()

val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]),
("SELECT * FROM testData LEFT SEMI JOIN testData2", classOf[LeftSemiJoinBNL]),
Expand Down Expand Up @@ -96,8 +96,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
try {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, false)
withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
Seq(
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2",
Expand All @@ -112,45 +111,35 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
("SELECT * FROM testData full outer join testData2 ON key = a",
classOf[ShuffledHashOuterJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}
}

test("SortMergeJoin shouldn't work on unsortable columns") {
val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
try {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
Seq(
("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}
}

test("broadcasted hash join operator selection") {
ctx.cacheManager.clearCache()
ctx.sql("CACHE TABLE testData")

val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
Seq(
("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a and key = 2", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a where key = 2",
classOf[BroadcastHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
try {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
Seq(
("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a and key = 2",
classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a where key = 2",
classOf[BroadcastHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}

ctx.sql("UNCACHE TABLE testData")
Expand All @@ -160,25 +149,21 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
ctx.cacheManager.clearCache()
ctx.sql("CACHE TABLE testData")

val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
Seq(
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[SortMergeJoin]),
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
classOf[SortMergeJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
try {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, false)
withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
Seq(
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[BroadcastHashOuterJoin]),
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
classOf[BroadcastHashOuterJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}

ctx.sql("UNCACHE TABLE testData")
Expand Down Expand Up @@ -220,9 +205,9 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
x.join(y).where($"x.a" === $"y.a"),
Row(1, 1, 1, 1) ::
Row(1, 1, 1, 2) ::
Row(1, 2, 1, 1) ::
Row(1, 2, 1, 2) :: Nil
Row(1, 1, 1, 2) ::
Row(1, 2, 1, 1) ::
Row(1, 2, 1, 2) :: Nil
)
}

Expand Down Expand Up @@ -465,25 +450,24 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("broadcasted left semi join operator selection") {
ctx.cacheManager.clearCache()
ctx.sql("CACHE TABLE testData")
val tmp = ctx.conf.autoBroadcastJoinThreshold

ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=1000000000")
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
classOf[BroadcastLeftSemiJoinHash])
).foreach {
case (query, joinClass) => assertJoin(query, joinClass)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
classOf[BroadcastLeftSemiJoinHash])
).foreach {
case (query, joinClass) => assertJoin(query, joinClass)
}
}

ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")

Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash])
).foreach {
case (query, joinClass) => assertJoin(query, joinClass)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash])
).foreach {
case (query, joinClass) => assertJoin(query, joinClass)
}
}

ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp)
ctx.sql("UNCACHE TABLE testData")
}

Expand All @@ -496,6 +480,5 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
Row(2, 2) ::
Row(3, 1) ::
Row(3, 2) :: Nil)

}
}
}