Skip to content

Conversation

@ivoson
Copy link
Contributor

@ivoson ivoson commented Jan 12, 2018

What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In submitMissingTasks, will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as UnionRDD who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the exception case.

…d is the same when calculate taskSerialization and task partitions

Change-Id: Ib9839ca552653343d264135c116742effa6feb60
@ivoson
Copy link
Contributor Author

ivoson commented Jan 12, 2018

@xuanyuanking could review this please?

@ivoson ivoson closed this Jan 12, 2018
@xuanyuanking
Copy link
Member

reopen this...

@ivoson ivoson reopened this Jan 12, 2018
}

def compute(split: Partition, context: TaskContext): Iterator[Int] = {
parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line is the key point for WrppedPartition and WrappedRDD, please give comments for explaining your intention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the comment, i will work on this.

* With this test case, just want to indicate that we should do taskSerialization and
* part calculate in submitMissingTasks with the same rdd checkpoint status.
*/
test("task part misType with checkpoint rdd in concurrent execution scenes") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "SPARK-23053: avoid CastException in concurrent execution with checkpoint" better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggest.

val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
val part = rdd.partitions(0)
intercept[ClassCastException] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this not a "test", this just a "reproduce" for the problem you want to fix. We should prove your code added in DAGScheduler.scala can fix that problem and with the original code base, a ClassCastException raised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a reproduce case, i will fix this.

Change-Id: Id791079358808d3f6732f0c4fdb2703a75b0677e
Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, cc @zsxwing @jerryshao @gatorsmile @cloud-fan
hi Shixiong, Saisai, Xiao and Wenchen. We found this streaming job problem in Baidu practical, Spark version 2.1. This triggered by streaming checkpoint workload and the fix patch mainly on core module. Please give a review. Thanks :)

@xuanyuanking
Copy link
Member

@ivoson Tengfei, please post the full stack trace of the ClassCastException.

@ivoson
Copy link
Contributor Author

ivoson commented Jan 12, 2018

@xuanyuanking ok,here is the stack trace of the exception.

java.lang.ClassCastException: org.apache.spark.rdd.CheckpointRDDPartition cannot be cast to org.apache.spark.streaming.rdd.MapWithStateRDDPartition
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:152)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for finding & working on this @ivoson. Overall this fix looks great, I just have some small comments for clarity.

Two other small asks:

  1. Both in the PR description and the JIRA, can you add that this error occurs because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop?

  2. I saw you put the stack trace in a PR comment, can you also put it on the JIRA? I think that is easier to find for most users.

JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
var taskBinaryBytes: Array[Byte] = null
// Add synchronized block to avoid rdd deserialized from taskBinaryBytes has diff checkpoint
// status with the rdd when create ShuffleMapTask or ResultTask.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reword this a bit:

taskBinaryBytes and partitions are both effected by the checkpoint status. We need this synchronization in case another concurrent job is checkpointing this RDD, so we get a consistent view of both variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advise.

// set checkpointDir.
val tempDir = Utils.createTempDir()
val checkpointDir = File.createTempFile("temp", "", tempDir)
checkpointDir.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you make a tempfile for the checkpoint dir and then delete it? why not just checkpointDir = new File(tempDir, "checkpointing")? Or even just checkpointDir = Utils.createTempDir()?

(CheckpointSuite does this so it can call sc.setCheckpointDir, but you're not doing that here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the code again and yes checkpointDir = Utils.createTempDir() is enough for this case, will fix this.


val checkpointRunnable = new Runnable {
override def run() = {
// Simply simulate what RDD.doCheckpoint() do here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove "simply" here and elsewhere in comments. Also "do" -> "does"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix this.

// serialization can start.
semaphore1.release()
// Wait until taskBinary serialization finished in submitMissingTasksThread.
semaphore2.acquire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a bit easier to follow if you rename your semaphores a bit.

semaphore1 -> doCheckpointStarted
semaphore2 -> taskBinaryBytesFinished

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advise. will fix this.

checkpointData.cpState = CheckpointState.Checkpointed
rdd.markCheckpointed()
}
semaphore1.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then this would be another semaphore checkpointStateUpdated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advise

// Wait until checkpoint status changed to Checkpointed in checkpointThread.
semaphore1.acquire()

// Part calculated with rdd checkpoint already finished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment above this:

Now we're done simulating the interleaving that might happen within the scheduler -- we'll check to make sure the final state is OK by simulating a couple steps that normally happen on the executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advise, it is really helpful for understanding, will update this.

val taskContext = mock(classOf[TaskContext])
doNothing().when(taskContext).killTaskIfInterrupted()

// ClassCastException is expected with errPart.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bit easier to follow if you say

Make sure our test case is setup correctly -- we expect a ClassCastException here if we use the rdd.partitions after checkpointing was done, but our binary bytes is from before it finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advise, it is really helpful for understanding, will update this.

submitMissingTasksThread.start()
submitMissingTasksThread.join()

Utils.deleteRecursively(tempDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be done in a finally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix this.

Change-Id: Id49321f8761d03159fd48b6f607084f2cc6fa4ff
@ivoson
Copy link
Contributor Author

ivoson commented Feb 6, 2018

@squito Hi Rashid, thanks for your review and advise. The PR description and JIRA have been updated , and also put the stack trace on the JIRA.
The last commit addresses the comments you left. Thanks again for the advise.

* With this test case, just want to indicate that we should do taskSerialization and
* part calculate in submitMissingTasks with the same rdd checkpoint status.
*/
test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @ivoson -- I'm really sorry but I only just realized that this "test" is really just a repro, and it passes both before and after the actual code changes, since you've replicated the internal logic we're fixing. As such, I don't think its actually useful as a test case -- perhaps it should get added to the jira as a repro.

I appreciate the work that went into writing this as it helped make the issue clear to me. I am not sure if there is a good way to test this. If we can't come up with anything, we should just commit your actual fix, but give me a day or two to think about it ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito thanks for reply. I understand this, technically it may not be a UT case, just simulate the scene with exception. I also wonder if there is a good way to test this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @ivoson -- I haven't come up with a better way to test this, so I think for now you should

(1) change the PR to only include the changes to the DAGScheduler (also undo the protected[spark] changes elsewhere)
(2) put this repro on the jira as its a pretty good for showing whats going on.

if we come up with a way to test it, we can always do that later on.

thanks and sorry for the back and forth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @squito , it's fine. The pr and jira have been updated. Thanks for your patient and review.

Change-Id: I6c308d5953a243e30dae87e8109e25d5df5a3f91
Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

asfgit pushed a commit that referenced this pull request Feb 13, 2018
…late in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the exception case.

Author: huangtengfei <[email protected]>

Closes #20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000)
Signed-off-by: Imran Rashid <[email protected]>
asfgit pushed a commit that referenced this pull request Feb 13, 2018
…late in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the exception case.

Author: huangtengfei <[email protected]>

Closes #20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000)
Signed-off-by: Imran Rashid <[email protected]>
@squito
Copy link
Contributor

squito commented Feb 13, 2018

merged to master / 2.3 / 2.2

I hit a merge conflict trying to merge to 2.1 -- feel free to open another PR for that version.

@asfgit asfgit closed this in 091a000 Feb 13, 2018
ivoson pushed a commit to ivoson/spark that referenced this pull request Feb 18, 2018
…late in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

the exist uts and also add a test case in DAGScheduerSuite to show the exception case.

Author: huangtengfei <[email protected]>

Closes apache#20244 from ivoson/branch-taskpart-mistype.

Change-Id: I634009d51ae40336e9d0717d061213ff7e36e71f
@ivoson
Copy link
Contributor Author

ivoson commented Feb 18, 2018

thank you for reviewing this @squito

asfgit pushed a commit that referenced this pull request Feb 21, 2018
…itions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?
This PR backports [#20244](#20244)

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?
the exist tests.

Author: huangtengfei <[email protected]>

Closes #20635 from ivoson/branch-2.1-23053.
@ivoson ivoson deleted the branch-taskpart-mistype branch June 4, 2018 03:16
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…late in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the exception case.

Author: huangtengfei <[email protected]>

Closes apache#20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000)
Signed-off-by: Imran Rashid <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants