Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Fix"
This reverts commit 2b58189.
  • Loading branch information
maropu committed Apr 20, 2018
commit 10f83f78684ff0405287857f3554101c55a3c8d7
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class CacheManager extends Logging {

/** Clears all cached tables. */
def clearCache(): Unit = writeLock {
cachedData.asScala.foreach(_.cachedRepresentation.clearCache())
cachedData.asScala.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
cachedData.clear()
}

Expand Down Expand Up @@ -119,39 +119,12 @@ class CacheManager extends Logging {
while (it.hasNext) {
val cd = it.next()
if (cd.plan.find(_.sameResult(plan)).isDefined) {
cd.cachedRepresentation.clearCache(blocking)
cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
it.remove()
}
}
}

/**
* Materialize the cache that refers to the given physical plan.
* [[InMemoryTableScanExec]] calls this function to build a [[RDD]] for the cache and
* it may trigger a job like sampling job for range partitioner, or broadcast job.
*/
private[execution] def materializeCacheData(plan: SparkPlan)
: Option[InMemoryRelation] = writeLock {
val it = cachedData.iterator()
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
while (it.hasNext) {
val cd = it.next()
if (cd.cachedRepresentation.child.find(_.sameResult(plan)).isDefined &&
cd.cachedRepresentation._cachedColumnBuffers == null) {
it.remove()
val newCache = cd.cachedRepresentation.copy()(
_cachedColumnBuffers = cd.cachedRepresentation.cachedColumnBuffers,
sizeInBytesStats = cd.cachedRepresentation.sizeInBytesStats,
statsOfPlanToCache = cd.cachedRepresentation.statsOfPlanToCache,
outputOrdering = cd.cachedRepresentation.outputOrdering)
needToRecache += cd.copy(cachedRepresentation = newCache)
}
}

needToRecache.foreach(cachedData.add)
needToRecache.headOption.map(_.cachedRepresentation)
}

/**
* Tries to re-cache all the cache entries that refer to the given plan.
*/
Expand All @@ -165,7 +138,7 @@ class CacheManager extends Logging {
while (it.hasNext) {
val cd = it.next()
if (condition(cd.plan)) {
cd.cachedRepresentation.clearCache()
cd.cachedRepresentation.cachedColumnBuffers.unpersist()
// Remove the cache entry before we create a new one, so that we can have a different
// physical plan.
it.remove()
Expand All @@ -176,14 +149,7 @@ class CacheManager extends Logging {
child = spark.sessionState.executePlan(cd.plan).executedPlan,
tableName = cd.cachedRepresentation.tableName,
logicalPlan = cd.plan)

val newCacheData = cd.copy(cachedRepresentation = newCache)

// Since `FileIndex` is used to build the `SparkPlan` for this cache, we need to refresh
// file indices again to pass the existing tests.
newCacheData.plan.foreach(refreshMetadata)

needToRecache += newCacheData
needToRecache += cd.copy(cachedRepresentation = newCache)
}
}

Expand Down Expand Up @@ -235,15 +201,6 @@ class CacheManager extends Logging {
recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
}

/**
* If `HadoopFsRelation` found, we refresh the metadata. Otherwise, this method does nothing.
*/
private def refreshMetadata(plan: LogicalPlan): Unit = plan match {
case LogicalRelation(hr: HadoopFsRelation, _, _, _) =>
hr.location.refresh()
case _ =>
}

/**
* Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
* [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes
Expand All @@ -252,13 +209,16 @@ class CacheManager extends Logging {
*/
private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem, qualifiedPath: Path): Boolean = {
plan match {
case LogicalRelation(hr: HadoopFsRelation, _, _, _) =>
val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
case _ => false
}
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ case class InMemoryRelation(
storageLevel: StorageLevel,
@transient child: SparkPlan,
tableName: Option[String])(
@transient val _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
val statsOfPlanToCache: Statistics,
statsOfPlanToCache: Statistics,
override val outputOrdering: Seq[SortOrder])
extends logical.LeafNode with MultiInstanceRelation {

Expand Down Expand Up @@ -98,17 +98,13 @@ case class InMemoryRelation(
}
}

@transient lazy val cachedColumnBuffers: RDD[CachedBatch] = if (_cachedColumnBuffers != null) {
_cachedColumnBuffers
} else {
// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
buildBuffers()
}

def clearCache(blocking: Boolean = true): Unit = if (_cachedColumnBuffers != null) {
_cachedColumnBuffers.unpersist(blocking)
}

private def buildBuffers(): RDD[CachedBatch] = {
private def buildBuffers(): Unit = {
val output = child.output
val cached = child.execute().mapPartitionsInternal { rowIterator =>
new Iterator[CachedBatch] {
Expand Down Expand Up @@ -159,7 +155,7 @@ case class InMemoryRelation(
cached.setName(
tableName.map(n => s"In-memory table $n")
.getOrElse(StringUtils.abbreviate(child.toString, 1024)))
cached
_cachedColumnBuffers = cached
}

def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
Expand All @@ -182,6 +178,8 @@ case class InMemoryRelation(
outputOrdering).asInstanceOf[this.type]
}

def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers

override protected def otherCopyArgs: Seq[AnyRef] =
Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,27 +247,14 @@ case class InMemoryTableScanExec(

private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning

private[sql] lazy val cachedColumnBuffers: RDD[CachedBatch] = {
// If `relation._cachedColumnBuffers` is null, the cache in `CacheManager` has not been
// materialized yet. So, we try to materialize the cache here.
if (relation._cachedColumnBuffers == null) {
val cacheManager = sqlContext.sparkSession.sharedState.cacheManager
cacheManager.materializeCacheData(relation.child).map(_.cachedColumnBuffers).getOrElse {
// In the case where the cache already has been cleared, we need to build a RDD here
relation.cachedColumnBuffers
}
} else {
relation._cachedColumnBuffers
}
}

private def filteredCachedBatches(): RDD[CachedBatch] = {
// Using these variables here to avoid serialization of entire objects (if referenced directly)
// within the map Partitions closure.
val schema = stats.schema
val schemaIndex = schema.zipWithIndex
val buffers = relation.cachedColumnBuffers

cachedColumnBuffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) =>
buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) =>
val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
}

def rddIdOf(tableName: String): Int = rddIdOf(spark.table(tableName))

def rddIdOf(df: DataFrame): Int = {
val plan = df.queryExecution.sparkPlan
def rddIdOf(tableName: String): Int = {
val plan = spark.table(tableName).queryExecution.sparkPlan
plan.collect {
case m: InMemoryTableScanExec =>
m.cachedColumnBuffers.id
case InMemoryTableScanExec(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + plan)
}.head
Expand Down Expand Up @@ -267,8 +265,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext

spark.catalog.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId),
"Uncached in-memory table should have been unpersisted")
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
}
Expand All @@ -294,20 +291,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("CACHE LAZY TABLE testData")
assertCached(spark.table("testData"))

val rddId = rddIdOf("testData")
assert(
!isMaterialized(rddIdOf("testData")),
!isMaterialized(rddId),
"Lazily cached in-memory table shouldn't be materialized eagerly")

sql("SELECT COUNT(*) FROM testData").collect()
val rddId = rddIdOf("testData")
assert(
isMaterialized(rddId),
"Lazily cached in-memory table should have been materialized")

spark.catalog.uncacheTable("testData")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId),
"Uncached in-memory table should have been unpersisted")
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}

Expand Down Expand Up @@ -798,18 +794,4 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
}
}

def isMaterialized(df: DataFrame): Boolean = {
val cache = spark.sharedState.cacheManager.lookupCachedData(df)
assert(cache.isDefined, "DataFrame is not cached\n" + df.queryExecution.analyzed)
cache.get.cachedRepresentation._cachedColumnBuffers != null &&
isMaterialized(rddIdOf(df))
}

test("SPARK-23880 table cache should be lazy and don't trigger any jobs") {
val df = spark.range(100L).filter('id > 10).groupBy().sum("id").cache
assert(!isMaterialized(df))
checkAnswer(df, Row(4895L))
assert(isMaterialized(df))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
def rddIdOf(tableName: String): Int = {
val plan = table(tableName).queryExecution.sparkPlan
plan.collect {
case m: InMemoryTableScanExec =>
m.cachedColumnBuffers.id
case InMemoryTableScanExec(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + plan)
}.head
Expand Down Expand Up @@ -172,12 +172,12 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("CACHE LAZY TABLE src")
assertCached(table("src"))

val rddId = rddIdOf("src")
assert(
!isMaterialized(rddIdOf("src")),
!isMaterialized(rddId),
"Lazily cached in-memory table shouldn't be materialized eagerly")

sql("SELECT COUNT(*) FROM src").collect()
val rddId = rddIdOf("src")
assert(
isMaterialized(rddId),
"Lazily cached in-memory table should have been materialized")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,8 @@ class PartitionedTablePerfStatsSuite
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)

spark.catalog.cacheTable("test")
spark.catalog.refreshByPath(dir.getAbsolutePath)
// `refreshByPath` does file listing internally, so we need to reset here
HiveCatalogMetrics.reset()
spark.catalog.refreshByPath(dir.getAbsolutePath)
assert(spark.sql("select * from test").count() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
Expand Down