Skip to content

Conversation

@ivoson
Copy link
Contributor

@ivoson ivoson commented Feb 18, 2018

What changes were proposed in this pull request?

This PR backports #20244

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In submitMissingTasks, will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as UnionRDD who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

How was this patch tested?

the exist tests.

…late in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

the exist uts and also add a test case in DAGScheduerSuite to show the exception case.

Author: huangtengfei <[email protected]>

Closes apache#20244 from ivoson/branch-taskpart-mistype.

Change-Id: I634009d51ae40336e9d0717d061213ff7e36e71f
@ivoson
Copy link
Contributor Author

ivoson commented Feb 18, 2018

cc @squito

@squito
Copy link
Contributor

squito commented Feb 19, 2018

Jenkins, Ok to test

@squito
Copy link
Contributor

squito commented Feb 19, 2018

lgtm assuming tests pass

@SparkQA
Copy link

SparkQA commented Feb 21, 2018

Test build #4102 has finished for PR 20635 at commit bd88903.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Feb 21, 2018
…itions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?
This PR backports [#20244](#20244)

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?
the exist tests.

Author: huangtengfei <[email protected]>

Closes #20635 from ivoson/branch-2.1-23053.
@squito
Copy link
Contributor

squito commented Feb 21, 2018

thanks @ivoson , merged!

@ivoson
Copy link
Contributor Author

ivoson commented Feb 21, 2018

thanks for reviewing this @squito

@vanzin vanzin mentioned this pull request May 11, 2018
@asfgit asfgit closed this in 348ddfd May 12, 2018
@ivoson ivoson deleted the branch-2.1-23053 branch June 4, 2018 03:16
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Closes apache#20458
Closes apache#20530
Closes apache#20557
Closes apache#20966
Closes apache#20857
Closes apache#19694
Closes apache#18227
Closes apache#20683
Closes apache#20881
Closes apache#20347
Closes apache#20825
Closes apache#20078

Closes apache#21281
Closes apache#19951
Closes apache#20905
Closes apache#20635

Author: Sean Owen <[email protected]>

Closes apache#21303 from srowen/ClosePRs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants