Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove duplicating code, use configuration key instead of string literal
  • Loading branch information
HeartSaVioR committed Jul 20, 2018
commit abec57f331bbdad6ef4689e2790d4a9fbb989715
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
Expand All @@ -53,13 +54,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest

import testImplicits._

def testWithAggrOptions(testName: String, pairs: (String, String)*)(testFun: => Any): Unit = {
val confAndTestNamePostfixMatrix = List(
(Seq("spark.sql.streaming.advanced.removeRedundantInStatefulAggregation" -> "false"), ""),
(Seq("spark.sql.streaming.advanced.removeRedundantInStatefulAggregation" -> "true"),
" : enable remove redundant in stateful aggregation")
)
val confAndTestNamePostfixMatrix = List(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this within beforeAll and afterAll with spark.conf.set / spark.conf.unset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withSQLConf looks like used widely between SQL unit tests, and does additional work (SparkSession.setActiveSession), so I'm not sure it will work technically same. Moreover, we need to run same test "multiple times", with changing configuration.

Could you propose your code if you don't really mind? Thanks in advance!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. "we need to run same test "multiple times", with changing configuration." I missed this.
We could consider like:

override def beforeEach(): Unit = {
super.beforeEach()
// Note that there are many tests here that require record-level filtering set to be true.
spark.conf.set(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key, "true")
}
override def afterEach(): Unit = {
try {
spark.conf.unset(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key)
} finally {
super.afterEach()
}

e.g.,

StreamingAggregationSuite {
  override def afterAll(): Unit = {
    // false
  }

  override def beforeAll(): Unit = {
    // false
  }
}

RemoveRedundantStreamingAggregationSuite extends StreamingAggregationSuite {
  override def afterAll(): Unit = {
    // true
  }

  override def beforeAll(): Unit = {
    // true
  }
}

but I believe the current implementation works too in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'd like to wait for other reviewers regarding opinions/suggestions on this. Let me keep this as it is until then.

(Seq(SQLConf.ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION.key -> "false"), ""),
(Seq(SQLConf.ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION.key -> "true"),
" : enable remove redundant in stateful aggregation")
)

def testWithAggrOptions(testName: String, pairs: (String, String)*)(testFun: => Any): Unit = {
confAndTestNamePostfixMatrix.foreach {
case (conf, testNamePostfix) => withSQLConf(pairs ++ conf: _*) {
test(testName + testNamePostfix)(testFun)
Expand All @@ -69,12 +70,6 @@ class StreamingAggregationSuite extends StateStoreMetricsTest

def testQuietlyWithAggrOptions(testName: String, pairs: (String, String)*)
(testFun: => Any): Unit = {
val confAndTestNamePostfixMatrix = List(
(Seq("spark.sql.streaming.advanced.removeRedundantInStatefulAggregation" -> "false"), ""),
(Seq("spark.sql.streaming.advanced.removeRedundantInStatefulAggregation" -> "true"),
" : enable remove redundant in stateful aggregation")
)

confAndTestNamePostfixMatrix.foreach {
case (conf, testNamePostfix) => withSQLConf(pairs ++ conf: _*) {
testQuietly(testName + testNamePostfix)(testFun)
Expand Down Expand Up @@ -568,7 +563,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
}

testWithAggrOptions("SPARK-23004: Ensure that TypedImperativeAggregate functions " +
"do not throw errors", "spark.sql.shuffle.partitions" -> "1") {
"do not throw errors", SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
// See the JIRA SPARK-23004 for more details. In short, this test reproduces the error
// by ensuring the following.
// - A streaming query with a streaming aggregation.
Expand Down