Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.sql.execution

import org.apache.spark.metrics.source.CodegenMetrics
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.expressions.scalalang.typed
Expand Down Expand Up @@ -145,10 +143,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
.select("int")

val plan = df.queryExecution.executedPlan
assert(!plan.find(p =>
assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.children(0)
.isInstanceOf[SortMergeJoinExec]).isDefined)
.isInstanceOf[SortMergeJoinExec]).isEmpty)
assert(df.collect() === Array(Row(1), Row(2)))
}
}
Expand Down Expand Up @@ -181,6 +179,13 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._2
}

def genCode(ds: Dataset[_]): Seq[CodeAndComment] = {
val plan = ds.queryExecution.executedPlan
val wholeStageCodeGenExecs = plan.collect { case p: WholeStageCodegenExec => p }
assert(wholeStageCodeGenExecs.nonEmpty, "WholeStageCodegenExec is expected")
wholeStageCodeGenExecs.map(_.doCodeGen()._2)
}

ignore("SPARK-21871 check if we can get large code size when compiling too long functions") {
val codeWithShortFunctions = genGroupByCode(3)
val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
Expand Down Expand Up @@ -241,9 +246,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
val df = spark.range(100)
val join = df.join(df, "id")
val plan = join.queryExecution.executedPlan
assert(!plan.find(p =>
assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0).isDefined,
p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0).isEmpty,
"codegen stage IDs should be preserved through ReuseExchange")
checkAnswer(join, df.toDF)
}
Expand All @@ -253,18 +258,12 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
import testImplicits._

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME.key -> "true") {
val bytecodeSizeHisto = CodegenMetrics.METRIC_COMPILATION_TIME

// the same query run twice should hit the codegen cache
spark.range(3).select('id + 2).collect
val after1 = bytecodeSizeHisto.getCount
spark.range(3).select('id + 2).collect
val after2 = bytecodeSizeHisto.getCount // same query shape as above, deliberately
// bytecodeSizeHisto's count is always monotonically increasing if new compilation to
// bytecode had occurred. If the count stayed the same that means we've got a cache hit.
assert(after1 == after2, "Should hit codegen cache. No new compilation to bytecode expected")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this can be flaky? If the generated code is the same then we should hit the cache. also cc @rednaxelafx

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 288738 did not equal 288749 Should hit codegen cache. No new compilation to bytecode expected
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite$$anonfun$8$$anonfun$apply$mcV$sp$4.apply$mcV$sp(WholeStageCodegenSuite.scala:354)
	at org.apache.spark.sql.catalyst.plans.PlanTestBase$class.withSQLConf(PlanTest.scala:176)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(WholeStageCodegenSuite.scala:35)
	at org.apache.spark.sql.test.SQLTestUtilsBase$class.withSQLConf(SQLTestUtils.scala:175)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite.withSQLConf(WholeStageCodegenSuite.scala:35)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite$$anonfun$8.apply$mcV$sp(WholeStageCodegenSuite.scala:344)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite$$anonfun$8.apply(WholeStageCodegenSuite.scala:341)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite$$anonfun$8.apply(WholeStageCodegenSuite.scala:341)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:74)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(WholeStageCodegenSuite.scala:35)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
	at org.apache.spark.sql.execution.WholeStageCodegenSuite.runTest(WholeStageCodegenSuite.scala:35)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:35)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:35)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The histogram of the time it took to compile source code text (in milliseconds) is not stable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original version of this test shouldn't be flaky if it's run alone; but yes if there are Spark tests in parallel in the same JVM, then the CodegenMetrics can be modified as a side effect of other tests, so it's better to fix it.

The above comment is from @rednaxelafx


// a different query can result in codegen cache miss, that's by design
// the same query run twice should produce identical code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please add a phrase at the end of this line, something like:

, which would imply a hit in the generated code cache.

val ds1 = spark.range(3).select('id + 2)
val code1 = genCode(ds1)
val ds2 = spark.range(3).select('id + 2)
val code2 = genCode(ds2) // same query shape as above, deliberately
assert(code1 == code2, "Should produce same code")
}
}

Expand Down