-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24678][Spark-Streaming] Give priority in use of 'PROCESS_LOCAL' for spark-streaming #21658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Would you please add a UT for it. |
|
ok to test. |
| val blockManagers = new HashMap[BlockId, Seq[String]] | ||
| for (i <- 0 until blockIds.length) { | ||
| blockManagers(blockIds(i)) = blockLocations(i).map(_.host) | ||
| blockManagers(blockIds(i)) = blockLocations(i).map(b => s"executor_${b.host}_${b.executorId}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this method should be updated, blockIdsToHosts seems doesn't reflect your change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blockIdsToLocations ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you'd better using ExecutorCacheTaskLocation#toString here instead of manually writing the location hint, which will be more robust.
|
Test build #92634 has finished for PR 21658 at commit
|
|
Please add the UTs as I mentioned before. |
| val blockManagers = new HashMap[BlockId, Seq[String]] | ||
| for (i <- 0 until blockIds.length) { | ||
| blockManagers(blockIds(i)) = blockLocations(i).map(_.host) | ||
| blockManagers(blockIds(i)) = blockLocations(i).map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
blockLoations(i).map { loc =>
xxx
}
|
Test build #92679 has finished for PR 21658 at commit
|
|
Test build #92727 has finished for PR 21658 at commit
|
|
Hi @sharkdtu , did you also verify this in your cluster, to see if the locality is correct or not? |
|
@jerryshao Yeah, I hava verified it in our cluster, and the locality is 'PROCESS_LOCAL'. |
jerryshao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
Test build #92793 has finished for PR 21658 at commit
|
|
Jenkins, retest this please. |
|
Test build #92798 has finished for PR 21658 at commit
|
|
LGTM, merging to master branch. |
|
a late LGTM |
What changes were proposed in this pull request?
Currently,
BlockRDD.getPreferredLocationsonly get hosts info of blocks, which results in subsequent schedule level is not better than 'NODE_LOCAL'. We can just make a small changes, the schedule level can be improved to 'PROCESS_LOCAL'How was this patch tested?
manual test