Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-14796][SQL] Add spark.sql.optimizer.minSetSize config option.
  • Loading branch information
dongjoon-hyun committed Apr 21, 2016
commit 64989b4c42ed53f0123b79d970a656352baa385e
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ trait CatalystConf {
def groupByOrdinal: Boolean

def optimizerMaxIterations: Int
def optimizerMinSetSize: Int
def maxCaseBranchesForCodegen: Int

/**
Expand All @@ -47,6 +48,7 @@ case class SimpleCatalystConf(
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
optimizerMinSetSize: Int = 10,
maxCaseBranchesForCodegen: Int = 20)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
OptimizeIn,
OptimizeIn(conf),
ConstantFolding,
LikeSimplification,
BooleanSimplification,
Expand Down Expand Up @@ -682,10 +682,11 @@ object ConstantFolding extends Rule[LogicalPlan] {
* Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
* which is much faster
*/
object OptimizeIn extends Rule[LogicalPlan] {
case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && list.size > 10 =>
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
list.size > conf.optimizerMinSetSize =>
val hSet = list.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -33,7 +34,7 @@ class ConstantFoldingSuite extends PlanTest {
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("ConstantFolding", Once,
OptimizeIn,
OptimizeIn(SimpleCatalystConf(true)),
ConstantFolding,
BooleanSimplification) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you use this import?


import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types._
Expand All @@ -36,7 +39,7 @@ class OptimizeInSuite extends PlanTest {
NullPropagation,
ConstantFolding,
BooleanSimplification,
OptimizeIn) :: Nil
OptimizeIn(SimpleCatalystConf(true))) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use named arguments for boolean parameters.

}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down Expand Up @@ -128,4 +131,21 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: Use configuration.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd give this a more descriptive name, and explicitly say setting the threshold for turning into InSet

val plan =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3))))
.analyze

val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(true))(plan)
comparePlans(notOptimizedPlan, plan)

val optimizedPlan = OptimizeIn(SimpleCatalystConf(true, optimizerMinSetSize = 2))(plan)
optimizedPlan match {
case Filter(cond, _)
if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 =>
// pass
case _ => fail("Unexpected result for OptimizedIn")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ object SQLConf {

val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer and analyzer runs")
.doc("The max number of iterations the optimizer and analyzer runs.")
.intConf
.createWithDefault(100)

val OPTIMIZER_MIN_SET_SIZE = SQLConfigBuilder("spark.sql.optimizer.minSetSize")
.internal()
.doc("The minimum threshold of set size to be optimized.")
.intConf
.createWithDefault(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not great that this default is defined in two different places.


val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
Expand Down Expand Up @@ -529,6 +535,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)

def optimizerMinSetSize: Int = getConf(OPTIMIZER_MIN_SET_SIZE)

def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
Expand Down