Skip to content

Commit 9c9f9c2

Browse files
committed
Fix
1 parent 80f3b34 commit 9c9f9c2

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ case class CachedRDDBuilder(
6868
synchronized {
6969
if (_cachedColumnBuffers != null) {
7070
_cachedColumnBuffers.unpersist(blocking)
71+
_cachedColumnBuffers = null
7172
}
7273
}
7374
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.concurrent.duration._
2222
import scala.language.postfixOps
2323

2424
import org.apache.spark.CleanerListener
25+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2526
import org.apache.spark.sql.catalyst.TableIdentifier
2627
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2728
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
@@ -801,10 +802,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
801802
nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null)
802803
}
803804

805+
private def checkIfNoJobTriggered(f: => DataFrame): DataFrame = {
806+
var numJobTrigered = 0
807+
val jobListener = new SparkListener {
808+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
809+
numJobTrigered += 1
810+
}
811+
}
812+
sparkContext.addSparkListener(jobListener)
813+
try {
814+
val df = f
815+
assert(numJobTrigered === 0)
816+
df
817+
} finally {
818+
sparkContext.removeSparkListener(jobListener)
819+
}
820+
}
821+
804822
test("SPARK-23880 table cache should be lazy and don't trigger any jobs") {
805-
val df1 = Seq((1, 2), (2, 3), (3, 4)).toDF("a", "b").filter('a > 1).groupBy().sum("b").cache()
806-
assert(!isMaterialized(df1))
807-
checkAnswer(df1, Row(7L))
808-
assert(isMaterialized(df1))
823+
val cachedDf = checkIfNoJobTriggered {
824+
val df = spark.range(3L).selectExpr("id", "id AS value")
825+
.filter('id > 0).orderBy('id.asc).cache()
826+
assert(!isMaterialized(df))
827+
df
828+
}
829+
checkAnswer(cachedDf, Row(1L, 1L) :: Row(2L, 2L) :: Nil)
830+
assert(isMaterialized(cachedDf))
809831
}
810832
}

0 commit comments

Comments
 (0)