-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21922] Fix duration always updating when task failed but status is still RUN… #19132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Though this may fix the problem it is 100% the wrong way to do it for multiple reasons:
Without a deeper look at the issue I would instead look at why the problem is occurring and fixing that rather than putting a bandaid on the UI like this. (Note: I've done this before and got the same response, nothing personally :) I would recommend closing this and opening a new PR once you figure out a new solution. |
|
Thanks for your recommendation @ajbozarth .Could you put a link for your pr?For the problem you mentioned,i have thought about them. |
…till RUNNING" This reverts commit 03a33a9.
|
@ajbozarth I have updated the implementation which only access FS in FSHistoryServerProvider |
ajbozarth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the update, this is a much better implementation, just one nit otherwise LGTM
| attempt = uiData.taskInfo.attemptNumber, | ||
| launchTime = new Date(uiData.taskInfo.launchTime), | ||
| duration = uiData.taskDuration, | ||
| duration = uiData.taskDuration(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unrelated and unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since i have changed taskDuration below,if do not add () here a compiler error will occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
odd, I thought for sure it'd be fine, then this LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ajbozarth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here what if we call the REST API on history server to get stage info? Looks like we may still have this issue since we don't have last update time here, what do you think @ajbozarth ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, @jerryshao .IIUC, the ui in AllStagesResource.scala is passed from ApiRootResource which also create sparkUI by FSHistoryProvider.So we can also get lastUpdateTime from this ui in AllStagesResource and pass to the taskDuration interface.I think it is another problem for REST?Should we fix here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if it is not a big change I think it should be fixed here. Because currently with this fix UI and REST API are inconsistent.
|
Ping @kiszk @jerryshao can you help reivew this? |
|
Is this a problem only in History UI, or it also has issues in Live UI? From my understanding you only pass a last update time for history UI, so is it intended? Also you mentioned "When executor failed and task metrics have not send to driver,the status will always be 'RUNNING'", is this a bug in scheduler? |
|
It just appear on history ui since Live ui will use heartbeat metric result @jerryshao(From my understanding) .It appears when Driver exit unnormally,then status of taskinfo can not be updated correctlly. |
|
So your PR description is quite confusing, would you please elaborate your problem in detail and describe how to reproduce the issue. |
|
Done @jerryshao |
|
Still I have a question about history server, is you event log an incomplete event log or completed when you met such issue? |
|
@jerryshao Thanks for your time. IIUC, event log is completed since driver has not dropped any event of executor which has problem described above.See below,driver only drop two events after shutting down: And below showing an executor has this problem: Case here is a job running today of our cluster. |
|
ok to test. |
jerryshao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please make sure everything is OK in Live UI?
| val operationGraphListener: RDDOperationGraphListener, | ||
| var appName: String, | ||
| val basePath: String, | ||
| val lastUpdateTime: Long = -1L, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to user Option[Long] = None as default value to reflect there's no update time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated @jerryshao Thanks for your time.
|
Looks like I don't have the Jenkins permission to trigger UT 😞 . Let me ping @srowen to trigger the test. |
|
Yes i have confirmed.Below is a testing job: @jerryshao |
|
ok to test |
|
@jerryshao, for triggering tests on Jenkins, I think this should be added by its admin manually as well if I understood correctly. In my case, I asked this to Josh Rosen before via email privately. I am quite sure you are facing the same issue I (and Holden, Felix and Takuya) met before if I understood correctly. |
|
Thanks @HyukjinKwon , I will ping Josh about this thing 😄 . |
|
Test build #81678 has finished for PR 19132 at commit
|
|
@jerryshao @HyukjinKwon Jenkins done.Thanks too much. |
|
Overall LGTM, @ajbozarth can you please review again? |
|
I've been following, still LGTM |
| } yield { | ||
| AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) | ||
| AllStagesResource.stageUiToStageData( | ||
| status, stageInfo, stageUiData, includeDetails = false, Some(ui)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good design to pass in SparkUI to only get lastUpdate, the API looks weird to add this SparkUI argument, the fix here only just make it work. It is better to add one more field in StageUIData or TaskUIData if possible.
|
Test build #81709 has finished for PR 19132 at commit
|
|
Test build #81710 has finished for PR 19132 at commit
|
| var memoryBytesSpilled: Long = _ | ||
| var diskBytesSpilled: Long = _ | ||
| var isBlacklisted: Int = _ | ||
| var jobLastUpdateTime: Option[Long] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to rename to stageLastUpdateTime or just lastUpdateTime? Since this structure is unrelated to job, would be better to not involve "job".
| @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { | ||
| withStageAttempt(stageId, stageAttemptId) { stage => | ||
| val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq | ||
| val tasks = stage.ui.taskData.values.map{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The style should be changed to map { AllStagesResource.convertTaskData(_, ui.lastUpdateTime) }, requires whitespace between { and }. You could check other similar codes about the style.
| val taskData = if (includeDetails) { | ||
| Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) | ||
| Some(stageUiData.taskData.map { case (k, v) => | ||
| k -> convertTaskData(v, stageUiData.lastUpdateTime) } ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I may not stated clearly, it should be:
Some(stageUiData.taskData.map { case (k, v) =>
k -> convertTaskData(v, stageUiData.lastUpdateTime) })
No whitespace after "}".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patient.Actually i did not check carefully.
|
Test build #81764 has finished for PR 19132 at commit
|
|
Test build #81768 has finished for PR 19132 at commit
|
|
LGTM, merging to master, if possible to 2.2. |
|
Cannot cleanly merge to 2.2, so this will only land to master branch. |
|
@jerryshao Thanks. |





…NING
What changes were proposed in this pull request?
When driver quit abnormally which cause executor shutdown and task metrics can not be sent to driver for updating.In this case the status will always be 'RUNNING' and the duration on history UI will be 'CurrentTime - launchTime' which increase infinitely.
We can fix this time by modify time of event log since this time has gotten when
FSHistoryProviderfetch event log from File System.And the result picture is uploaded in SPARK-21922.
How to reproduce?
(1) Submit a job to spark on yarn
(2) Mock an oom(or other case can make driver quit abnormally) senario for driver
(3) Make sure executor is running task when driver quitting
(4) Open the history server and checkout result
It is not a corner case since there are many such jobs in our current cluster.
How was this patch tested?
Deploy historyserver and open a job has this problem.