-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23880][SQL] Do not trigger any jobs for caching data #21018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #89083 has finished for PR 21018 at commit
|
|
I'll fix the failure tonight (jst) |
|
Test build #89162 has finished for PR 21018 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach of this pr is; cache() just registers an entry without building a RDD in CacheManager and then InMemoryTableScanExec re-registers an entry to build (materialize) a RDD in CacheManager. So, I added this function for InMemoryTableScanExec to re-register these entries in CacheManager. But, I don't think this is the best, so I'd like to have any suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to involve CacheManager? Shall we just make the creation of RDD lazy in InMemoryRelation and trigger the materialization in InMemoryTableScanExec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought, since InMemoryRelation was copied in a tree sometimes, the lazy update of _cachedColumnBuffers always didn't lead to the materialization of the corresponding cache entry in CacheManager (maybe...). If so, following queries might have unnecessary matiralization repeatedly? Therefore, I though we needed to directly update the entry in CacheManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like
class InMemoryRelation(private var _cachedColumnBuffers: RDD[CachedBatch] = null) {
def cachedColumnBuffers = {
if (_cachedColumnBuffers == null) {
synchronized {
if (_cachedColumnBuffers == null) {
_cachedColumnBuffers = buildBuffer()
}
}
}
_cachedColumnBuffers
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once it's materialized, it's still materialized after copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update today
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the suggested, but some queries didn't work well;
the query in the description was ok, but a query below wrongly cached two different RDDs (I checked the Strorage tab in the web UI);
scala> sql("SET spark.sql.crossJoin.enabled=true")
scala> val df = spark.range(100000000L).cache()
scala> df.join(df).show
This is because Analyzer copies an InMemoryRelation (_cachedColumnBuffers = null) node via newInstance then they build RDDs, respectively. Thought?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class CachedRDDBuilder(private var _cachedColumnBuffers: RDD[CachedBatch] = null) {
def cachedColumnBuffers = {
if (_cachedColumnBuffers == null) {
synchronized {
if (_cachedColumnBuffers == null) {
_cachedColumnBuffers = buildBuffer()
}
}
}
_cachedColumnBuffers
}
}
class InMemoryRelation(cacheBuilder: CachedRDDBuilder = new CachedRDDBuilder()) {
// newInstance should keep the existing CachedRDDBuilder
def newInstance()...
}
then in the physical plan and cache manager, just call relation.cacheBuilder.cachedColumnBuffers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aha, I'll fix that way. Thanks!
|
Test build #89265 has finished for PR 21018 at commit
|
7ae9a5c to
50dc700
Compare
|
Test build #89537 has finished for PR 21018 at commit
|
|
retest this please |
|
Test build #89563 has finished for PR 21018 at commit
|
|
retest this please |
|
Test build #89598 has finished for PR 21018 at commit
|
This reverts commit 2b58189.
|
Test build #89632 has finished for PR 21018 at commit
|
|
Test build #89666 has finished for PR 21018 at commit
|
|
@cloud-fan @viirya could you check this? Thanks! |
| if (_cachedColumnBuffers != null) { | ||
| synchronized { | ||
| if (_cachedColumnBuffers != null) { | ||
| _cachedColumnBuffers.unpersist(blocking) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also do _cachedColumnBuffers = null so that unpersist won't be called twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) | ||
| } | ||
|
|
||
| test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this test prove we don't trigger jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's more clear to create a listener and explicitly show we don't trigger any jobs after calling Dataset.cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
Don't forget to update PR description too. :) |
|
|
||
| @transient val partitionStatistics = new PartitionStatistics(output) | ||
|
|
||
| val child: SparkPlan = cacheBuilder.child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose child: SparkPlan? As it is a logical.LeafNode, it's a bit weird to have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since InMemoryTableScanExec and the other places reference this variable, I kept this public. But, ya, I feel the name is a little weird. So, I renamed child to cachedPlan.
| Statistics(sizeInBytes = sizeInBytesStats.value.longValue) | ||
| def cachedColumnBuffers: RDD[CachedBatch] = { | ||
| if (_cachedColumnBuffers == null) { | ||
| synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_cachedColumnBuffers is private[sql], so I'm not sure if this synchronized can be very effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel thread contention is low here, so I like simpler code. But, I welcome suggestions for more efficient&simpler code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not care about thread-safety at all or do it right. Please prove CachedRDDBuilder will never be accessed by multiple threads and remove these synchronized, or making _cachedColumnBuffers private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll recheck and update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this pr w/o synchronized, I found multi-thread queries wrongly built four RDDs for a single cache;
val cachedDf = spark.range(1000000).selectExpr("id AS k", "id AS v").cache
for (i <- 0 to 3) {
val thread = new Thread {
override def run {
// Start a job in each thread
val df = cachedDf.filter('k > 5).groupBy().sum("v")
df.collect
}
}
thread.start
}
Either way, I think we should make _cachedColumnBuffers private, so I fixed.
|
Test build #89710 has finished for PR 21018 at commit
|
|
Test build #89718 has finished for PR 21018 at commit
|
|
Test build #89762 has finished for PR 21018 at commit
|
|
retest this please. |
|
|
||
| @transient val partitionStatistics = new PartitionStatistics(output) | ||
|
|
||
| val cachedPlan: SparkPlan = cacheBuilder.cachedPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be def, or it will be serialized.
| sparkContext.addSparkListener(jobListener) | ||
| try { | ||
| val df = f | ||
| assert(numJobTrigered === 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before this assert, we should make sure the event queue is empty, via sparkContext.listenerBus.waitUntilEmpty
|
LGTM |
|
Test build #89768 has finished for PR 21018 at commit
|
| } | ||
| } | ||
|
|
||
| test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the changes in this PR, this test still can pass. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I'll recheck. Thanks!
|
Test build #89812 has finished for PR 21018 at commit
|
|
retest this please |
|
Test build #89814 has finished for PR 21018 at commit
|
|
retest this please |
|
Test build #89823 has finished for PR 21018 at commit
|
|
retest this please |
|
Test build #89828 has finished for PR 21018 at commit
|
|
Test build #89829 has finished for PR 21018 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
This pr fixed code so that
cachecould prevent any jobs from being triggered.For example, in the current master, an operation below triggers a actual job;
This triggers a job while the cache should be lazy. The problem is that, when creating
InMemoryRelation, we build the RDD, which callsSparkPlan.executeand may trigger jobs, like sampling job for range partitioner, or broadcast job.This pr removed the code to build a cached
RDDin the constructor ofInMemoryRelationand addedCachedRDDBuilderto lazily build theRDDinInMemoryRelation. Then, the first call ofCachedRDDBuilder.cachedColumnBufferstriggers a job to materialize the cache inInMemoryTableScanExec.How was this patch tested?
Added tests in
CachedTableSuite.