-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26205][SQL] Optimize InSet Expression for bytes, shorts, ints, dates #23171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
50c9bf0
bab82f2
7b4d6a2
66d00a3
fcef14a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe | |
| import org.apache.spark.sql.catalyst.expressions.codegen.Block._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.util.TypeUtils | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
||
|
|
@@ -375,6 +376,25 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with | |
| } | ||
|
|
||
| override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| if (canBeComputedUsingSwitch && hset.size <= SQLConf.get.optimizerInSetSwitchThreshold) { | ||
| genCodeWithSwitch(ctx, ev) | ||
| } else { | ||
| genCodeWithSet(ctx, ev) | ||
| } | ||
| } | ||
|
|
||
| override def sql: String = { | ||
| val valueSQL = child.sql | ||
| val listSQL = hset.toSeq.map(Literal(_).sql).mkString(", ") | ||
| s"($valueSQL IN ($listSQL))" | ||
| } | ||
|
|
||
| private def canBeComputedUsingSwitch: Boolean = child.dataType match { | ||
| case ByteType | ShortType | IntegerType | DateType => true | ||
| case _ => false | ||
| } | ||
|
|
||
| private def genCodeWithSet(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| nullSafeCodeGen(ctx, ev, c => { | ||
| val setTerm = ctx.addReferenceObj("set", set) | ||
| val setIsNull = if (hasNull) { | ||
|
|
@@ -389,10 +409,30 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with | |
| }) | ||
| } | ||
|
|
||
| override def sql: String = { | ||
| val valueSQL = child.sql | ||
| val listSQL = hset.toSeq.map(Literal(_).sql).mkString(", ") | ||
| s"($valueSQL IN ($listSQL))" | ||
| private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| val caseValuesGen = hset.filter(_ != null).map(Literal(_).genCode(ctx)) | ||
| val valueGen = child.genCode(ctx) | ||
|
|
||
| val caseBranches = caseValuesGen.map(literal => | ||
| code""" | ||
| case ${literal.value}: | ||
| ${ev.value} = true; | ||
| break; | ||
| """) | ||
|
|
||
| ev.copy(code = | ||
| code""" | ||
| ${valueGen.code} | ||
| ${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = ${valueGen.isNull}; | ||
| ${CodeGenerator.JAVA_BOOLEAN} ${ev.value} = false; | ||
| if (!${valueGen.isNull}) { | ||
| switch (${valueGen.value}) { | ||
| ${caseBranches.mkString("")} | ||
|
||
| default: | ||
| ${ev.isNull} = $hasNull; | ||
| } | ||
| } | ||
| """) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -171,6 +171,16 @@ object SQLConf { | |
| .intConf | ||
| .createWithDefault(10) | ||
|
|
||
| val OPTIMIZER_INSET_SWITCH_THRESHOLD = | ||
| buildConf("spark.sql.optimizer.inSetSwitchThreshold") | ||
| .internal() | ||
| .doc("Configures the max set size in InSet for which Spark will generate code with " + | ||
| "switch statements. This is applicable only to bytes, shorts, ints, dates.") | ||
| .intConf | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To prevent user configuration errors, can we have a meaningful min/max check? .checkValue(v => v > 0 && v < ???, ...)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kiszk @mgaido91 we had a discussion about generating codes bigger than 64KB. I am wondering if we still want to split the switch-based logic into multiple methods if we have this check suggested by @dongjoon-hyun. I've implemented the split logic locally. However, the code looks more complicated and we will need some extensions to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure why you'd need any extension. We have other parts of the code with swtich which are split. I think in general it is safer to have it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 could you point me to an example?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, you're right sorry, I was remembering wrongly. There were switch based expressions for for splitting them we migrated them to a do while approach. Since the whole point of this PR is to introduce the switch construct, then I agree with you that the best way is to add a constraint here in order to have the number small enough not to cause issues with code generation.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about the default and max values then? The switch logic was faster than
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, sounds fine to me. Please add a comment in the codegen part in order to explain why we are not splitting the code. Thanks.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'll add a comment. |
||
| .checkValue(threshold => threshold >= 0 && threshold <= 600, "The max set size " + | ||
| "for using switch statements in InSet must be positive and less than or equal to 600") | ||
|
||
| .createWithDefault(400) | ||
|
|
||
| val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level") | ||
| .internal() | ||
| .doc("Configures the log level for logging the change from the original plan to the new " + | ||
|
|
@@ -1701,6 +1711,8 @@ class SQLConf extends Serializable with Logging { | |
|
|
||
| def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) | ||
|
|
||
| def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD) | ||
|
|
||
| def optimizerPlanChangeLogLevel: String = getConf(OPTIMIZER_PLAN_CHANGE_LOG_LEVEL) | ||
|
|
||
| def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,11 +23,12 @@ import scala.collection.immutable.HashSet | |
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.RandomDataGenerator | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} | ||
| import org.apache.spark.sql.catalyst.analysis.TypeCheckResult | ||
| import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext | ||
| import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
||
|
|
@@ -241,6 +242,52 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { | |
| } | ||
| } | ||
|
|
||
| test("SPARK-26205: Optimize InSet for bytes, shorts, ints, dates using switch statements") { | ||
|
||
| val byteValues = Set[Any](1.toByte, 2.toByte, Byte.MinValue, Byte.MaxValue) | ||
| val shortValues = Set[Any](-10.toShort, 20.toShort, Short.MinValue, Short.MaxValue) | ||
| val intValues = Set[Any](20, -100, 30, Int.MinValue, Int.MaxValue) | ||
| val dateValues = Set[Any]( | ||
| CatalystTypeConverters.convertToCatalyst(Date.valueOf("2017-01-01")), | ||
| CatalystTypeConverters.convertToCatalyst(Date.valueOf("1950-01-02"))) | ||
|
|
||
| def check(presentValue: Expression, absentValue: Expression, values: Set[Any]): Unit = { | ||
| require(presentValue.dataType == absentValue.dataType) | ||
|
|
||
| val nullLiteral = Literal(null, presentValue.dataType) | ||
|
|
||
| checkEvaluation(InSet(nullLiteral, values), expected = null) | ||
| checkEvaluation(InSet(nullLiteral, values + null), expected = null) | ||
| checkEvaluation(InSet(presentValue, values), expected = true) | ||
| checkEvaluation(InSet(presentValue, values + null), expected = true) | ||
| checkEvaluation(InSet(absentValue, values), expected = false) | ||
| checkEvaluation(InSet(absentValue, values + null), expected = null) | ||
| } | ||
|
|
||
| def checkAllTypes(): Unit = { | ||
| check(presentValue = Literal(2.toByte), absentValue = Literal(3.toByte), byteValues) | ||
| check(presentValue = Literal(Byte.MinValue), absentValue = Literal(5.toByte), byteValues) | ||
| check(presentValue = Literal(20.toShort), absentValue = Literal(-14.toShort), shortValues) | ||
| check(presentValue = Literal(Short.MaxValue), absentValue = Literal(30.toShort), shortValues) | ||
| check(presentValue = Literal(20), absentValue = Literal(-14), intValues) | ||
| check(presentValue = Literal(Int.MinValue), absentValue = Literal(2), intValues) | ||
| check( | ||
| presentValue = Literal(Date.valueOf("2017-01-01")), | ||
| absentValue = Literal(Date.valueOf("2017-01-02")), | ||
| dateValues) | ||
| check( | ||
| presentValue = Literal(Date.valueOf("1950-01-02")), | ||
| absentValue = Literal(Date.valueOf("2017-10-02")), | ||
| dateValues) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.OPTIMIZER_INSET_SWITCH_THRESHOLD.key -> "0") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After https://github.com/apache/spark/pull/23171/files#r261888276, we need to increase this from |
||
| checkAllTypes() | ||
| } | ||
| withSQLConf(SQLConf.OPTIMIZER_INSET_SWITCH_THRESHOLD.key -> "20") { | ||
| checkAllTypes() | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please add a test case that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean testing that if the set size is 100 and
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My question addressed what you are talking here. The current implementation can accept large int value (e.g. Integer.MAX) for |
||
|
|
||
| test("SPARK-22501: In should not generate codes beyond 64KB") { | ||
| val N = 3000 | ||
| val sets = (1 to N).map(i => Literal(i.toDouble)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not changed. To reduce the code diff more clearly, could you move
override def sqlandprivate def canBeComputedUsingSwitchaftergenCodeWithSwitch?