Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[sql] class CacheManager extends Logging {
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val planToCache = query.queryExecution.analyzed.canonicalized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need these changes? LogicalPlan.canonicalized is a lazy val and we don't have performance penalty even we use un-canonicalized plan as key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need them for now.
I'll revert the changes.

if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
Expand All @@ -106,7 +106,7 @@ private[sql] class CacheManager extends Logging {

/** Removes the data for the given [[Dataset]] from the cache */
private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val planToCache = query.queryExecution.analyzed.canonicalized
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
Expand All @@ -120,7 +120,7 @@ private[sql] class CacheManager extends Logging {
private[sql] def tryUncacheQuery(
query: Dataset[_],
blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val planToCache = query.queryExecution.analyzed.canonicalized
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
val found = dataIndex >= 0
if (found) {
Expand All @@ -137,7 +137,8 @@ private[sql] class CacheManager extends Logging {

/** Optionally returns cached data for the given [[LogicalPlan]]. */
private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
cachedData.find(cd => plan.sameResult(cd.plan))
val canonicalized = plan.canonicalized
cachedData.find(cd => canonicalized.sameResult(cd.plan))
}

/** Replaces segments of the given logical plan with cached versions where possible. */
Expand All @@ -155,8 +156,9 @@ private[sql] class CacheManager extends Logging {
* function will over invalidate.
*/
private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
val canonicalized = plan.canonicalized
cachedData.foreach {
case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
case data if data.plan.collect { case p if p.sameResult(canonicalized) => p }.nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant, sameResult already compares the canonicalized plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.
For example, if the cached plan is LocalRelation (which is canonicalized) and the plan argument is SubqueryAlias(LocalRelation) (which is not canonicalized), it will fail to find the same-result plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation of sameResult:

def sameResult(plan: PlanType): Boolean = {
    val left = this.canonicalized
    val right = plan.canonicalized
    ......

Seems we do canonicalize plans?

Copy link
Member Author

@ueshin ueshin Jun 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some plans like LocalRelation, LogicalRDD, etc. override the sameResult and they don't use canonicalized plan to compare.

For example, LocalRelation overrides like as follows:

  override def sameResult(plan: LogicalPlan): Boolean = plan match {
    case LocalRelation(otherOutput, otherData) =>
      otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
    case _ => false
  }

If the plan is SubqueryAlias(LocalRelation), the method returns false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just fix those implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes, we should for now, but we can't force to use canonicalized plan and it is difficult to check all implementations so I thought CacheManager should canonicalize plans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I would prefer to fix in in sameResult if possible so that all uses of that function will benefit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll fix them, too.

data.cachedRepresentation.recache()
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
selectStar,
Seq(Row(1, "1")))
}

test("SPARK-15915 CacheManager should use canonicalized plan for planToCache") {
val localRelation = Seq(1, 2, 3).toDF()
localRelation.createOrReplaceTempView("localRelation")

spark.catalog.cacheTable("localRelation")
assert(
localRelation.queryExecution.withCachedData.collect {
case i: InMemoryRelation => i
}.size == 1)
}
}