Skip to content
Prev Previous commit
Update flag settings and tests.
  • Loading branch information
ooq committed Aug 24, 2016
commit a58314cc110899b19499b1d06bdc04cf8439a79f
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,12 @@ case class HashAggregateExec(

// The name for Fast HashMap
private var fastHashMapTerm: String = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a more descriptive name than "fast", there can always be faster implementation?

// whether vectorized hashmap or row based hashmap is enabled
// we make sure that at most one of the two flags is true
// i.e., assertFalse(isVectorizedHashMapEnabled && isRowBasedHashMapEnabled)
private var isFastHashMapEnabled: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFastHashMapEnabled = isVectorizedHashMapEnabled || isRowBasedHashMapEnabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag check if one of them is enabled. As some of the generated code is same for both hash maps, the flag could make condition-checking clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, what I meant was that we can even initialize it with isVectorizedHashMapEnabled || isRowBasedHashMapEnabled to make the implied semantics clear.


// whether a vectorized hashmap is used instead
// we have decided to always use the row-based hashmap,
// but the vectorized hashmap can still be switched on for testing and benchmarking purposes.
private var isVectorizedHashMapEnabled: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is only used in testing/benchmarking, is it worthy to put this piece in the production code?

private var isRowBasedHashMapEnabled: Boolean = false
// auxiliary flag, true if any of two above is true
private var isFastHashMapEnabled: Boolean = isVectorizedHashMapEnabled || isRowBasedHashMapEnabled

// The name for UnsafeRow HashMap
private var hashMapTerm: String = _
Expand Down Expand Up @@ -499,63 +498,35 @@ case class HashAggregateExec(
isSupported && isNotByteArrayDecimalType
}

/**
* Requirement check for vectorized hash map.
*/
private def enableVectorizedHashMap(ctx: CodegenContext): Boolean = {
checkIfFastHashMapSupported(ctx)
}

/**
* Requirement check for row-based hash map.
*/
private def enableRowBasedHashMap(ctx: CodegenContext): Boolean = {
checkIfFastHashMapSupported(ctx)
}
private def enableTwoLevelHashMap(ctx: CodegenContext) = {
if (!checkIfFastHashMapSupported(ctx)) {
if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) {
logInfo("spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but"
+ " current version of codegened fast hashmap does not support this aggregate.")
}
} else {
isFastHashMapEnabled = true

private def setFastHashMapImpl(ctx: CodegenContext) = {
sqlContext.conf.enforceFastAggHashMapImpl match {
case "rowbased" =>
if (!enableRowBasedHashMap(ctx)) {
if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) {
logWarning("spark.sql.codegen.aggregate.map.enforce.impl is set to rowbased, but"
+ " current version of codegened row-based hashmap does not support this aggregate.")
}
} else {
isRowBasedHashMapEnabled = true
}
case "vectorized" =>
if (!enableVectorizedHashMap(ctx)) {
if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) {
logWarning("spark.sql.codegen.aggregate.map.enforce.impl is set to vectorized, but"
+ " current version of codegened vectorized hashmap does not support this aggregate.")
}
} else {
isVectorizedHashMapEnabled = true
}
case "skip" =>
// no need to do anything, default sets all flags to be false
case _ =>
if (sqlContext.conf.enforceFastAggHashMapImpl != "auto") {
logWarning("spark.sql.codegen.aggregate.map.enforce.impl should be set to one of the "
+ "following: rowbased, vectorized, skip, auto(default).")
}
if (enableRowBasedHashMap(ctx)) {
isRowBasedHashMapEnabled = true
} else if (enableVectorizedHashMap(ctx)) {
// Because enableVectorizedHashMap() and enableRowBasedHashMap() are identical currently,
// this should never be reached. We vision this codepath to be useful as our support for
// the two fast hash map extends.
isVectorizedHashMapEnabled = true
}
// This is for testing/benchmarking only.
// We enforce to first level to be a vectorized hashmap, instead of the default row-based one.
sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match {
case "true" => isVectorizedHashMapEnabled = true
case null | "" | "false" => None }
}
isFastHashMapEnabled = isVectorizedHashMapEnabled || isRowBasedHashMapEnabled
}

private def doProduceWithKeys(ctx: CodegenContext): String = {
val initAgg = ctx.freshName("initAgg")
ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
setFastHashMapImpl(ctx)
if (sqlContext.conf.enableTwoLevelAggMap) {
enableTwoLevelHashMap(ctx)
} else {
sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match {
case "true" => logWarning("Two level hashmap is disabled but vectorized hashmap is " +
"enabled.")
case null | "" | "false" => None
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want to wrap line 521-529 in a function like checkAndEnableTwoLevelHashMap, and do a call like

isFastHashMapEnabled = checkAndEnableTwoLevelHashMap(ctx)

Current impl of enableTwoLevelHashMap has a side effect of updating boolean value isFastHashMapEnabled. User is not clear of the side effect if he doesn't inspect the code of enableTwoLevelHashMap

fastHashMapTerm = ctx.freshName("fastHashMap")
val fastHashMapClassName = ctx.freshName("FastHashMap")
val fastHashMapGenerator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,15 @@ object SQLConf {
.intConf
.createWithDefault(40)

val FAST_AGG_MAP_IMPL =
SQLConfigBuilder("spark.sql.codegen.aggregate.map.enforce.impl")
val ENABLE_TWOLEVEL_AGG_MAP =
SQLConfigBuilder("spark.sql.codegen.aggregate.map.twolevel.enable")
.internal()
.doc("Sets the implementation for fast hash map during aggregation. Could be one of the " +
"following: rowbased, vectorized, skip, auto. Defaults to auto, and should only be other " +
"values for testing purposes.")
.stringConf
.transform(_.toLowerCase())
.checkValues(Set("rowbased", "vectorized", "skip", "auto"))
.createWithDefault("auto")
.doc("Enable two-level aggregate hash map. When enabled, records will first be " +
"inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " +
"2nd-level, larger, slower map when 1st level is full or keys cannot be found. " +
"When disabled, records go directly to the 2nd level. Defaults to true.")
.booleanConf
.createWithDefault(true)

val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
.internal()
Expand Down Expand Up @@ -675,7 +674,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)

def enforceFastAggHashMapImpl: String = getConf(FAST_AGG_MAP_IMPL)
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)

def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,63 @@

package org.apache.spark.sql

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
import org.scalatest.BeforeAndAfter

abstract class AggregateHashMapSuite extends DataFrameAggregateSuite {
import testImplicits._
class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {

protected def setAggregateHashMapImpl(): Unit
protected override def beforeAll(): Unit = {
sparkConf.set("spark.sql.codegen.fallback", "false")
sparkConf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
super.beforeAll()
}

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "false",
"configuration parameter changed in test body")
}
}

class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {

protected override def beforeAll(): Unit = {
sparkConf.set("spark.sql.codegen.fallback", "false")
sparkConf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
super.beforeAll()
}

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "true",
"configuration parameter changed in test body")
}
}

class TwoLevelAggregateHashMapWithVectorizedMapSuite extends DataFrameAggregateSuite with
BeforeAndAfter {

protected override def beforeAll(): Unit = {
setAggregateHashMapImpl()
sparkConf.set("spark.sql.codegen.fallback", "false")
super.beforeAll()
sparkConf.set("spark.sql.codegen.fallback", "false")
sparkConf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkConf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
super.beforeAll()
}

test("SQL decimal test") {
checkAnswer(
decimalData.groupBy('a cast DecimalType(10, 2)).agg(avg('b cast DecimalType(10, 2))),
Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.5)),
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.5)),
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(1.5))))
// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "true",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable") == "true",
"configuration parameter changed in test body")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"),
Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil)
}

test("SQL decimal test (used for catching certain demical handling bugs in aggregates)") {
checkAnswer(
decimalData.groupBy('a cast DecimalType(10, 2)).agg(avg('b cast DecimalType(10, 2))),
Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.5)),
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.5)),
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(1.5))))
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}

Expand Down Expand Up @@ -146,13 +147,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", 0)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", 3)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}

Expand Down Expand Up @@ -184,13 +186,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}

Expand Down Expand Up @@ -221,13 +224,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}

Expand Down Expand Up @@ -268,13 +272,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "10")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,9 +998,9 @@ class HashAggregationQuerySuite extends AggregationQuerySuite
class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {

override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
Seq(0, 10).foreach { maxColumnarHashMapColumns =>
withSQLConf("spark.sql.codegen.aggregate.map.columns.max" ->
maxColumnarHashMapColumns.toString) {
Seq("true", "false").foreach { enableTwoLevelMaps =>
withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enable" ->
enableTwoLevelMaps) {
(1 to 3).foreach { fallbackStartsAt =>
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") {
Expand Down