-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-3913] Spark Yarn Client API change to expose Yarn Resource Capacity, Yarn Application Listener and KillApplication APIs #2786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Spark Yarn Client API change to expose Yarn Resource Capacity, Yarn Application Listener and KillApplication APIs When working with Spark with Yarn cluster mode, we have following issues: 1) We don't know how much yarn max capacity ( memory and cores) before we specify the number of executor and memories for spark drivers and executors. We we set a big number, the job can potentially exceeds the limit and got killed. It would be better we let the application know that the yarn resource capacity a head of time and the spark config can adjusted dynamically. 2) Once job started, we would like some feedback from yarn application. Currently, the spark client basically block the call and returns when the job is finished or failed or killed. If the job runs for few hours, we have no idea how far it has gone, the progress and resource usage, tracking URL etc. This Pull Request will not complete solve the issue #2, but it will allow expose Yarn Application status: such as when the job is started, killed, finished, the tracking URL etc, some limited progress reporting ( for CDH5 we found the progress only reports 0, 10 and 100%) I will have another Pull Request to address the Yarn Application and Spark Job communication issue, that's not covered here. 3) If we decide to stop the spark job, the Spark Yarn Client expose a stop method. But the stop method, in many cases, does not the yarn application. So we need to expose the yarn client's killApplication() API to spark client. The proposed change is to change Client Constructor, change the first argument from ClientArguments to YarnResourceCapacity => ClientArguments Were YarnResourceCapacity contains yarn's max memory and virtual cores as well as overheads. This allows application to adjust the memory and core settings accordingly. For existing application that ignore the YarnResourceCapacity the def toArgs (capacity: YarnResourceCapacity) = new ClientArguments(...) We also defined the YarnApplicationListener interface that expose some of the information about YarnApplicationReport. Client.addYarnApplicaitonListener(listern) will allow them to get call back at different state of the application, so they can react accordingly. For example, onApplicationInit() the callback will invoked when the AppId is available but application is not yet started. Once can use this AppId to kill the application if the run is not longer desired.
|
Can one of the admins verify this patch? |
1 similar comment
|
Can one of the admins verify this patch? |
|
Cool, this is a very good improvement. |
|
what pull request are you referring to here? "I will have another Pull Request to address the Yarn Application and Spark Job communication issue, that's not covered here." The things you mention are all useful but perhaps I'm not seeing the bigger picture on how you view these being used? You added interface to addApplicationListener but how do you see a user doing that or how is that tied to the user? |
|
Tom, thanks for reviewing. I am still working on the second PR, which I haven't submitted yet. The code is currently used in our application and I am pulling them out from our code and make a PR for it. The current code only uses Akka to do the communication, I would like to add the Netty support as well before I submit the Pull Request, that's why I haven't submit it yet. The followings are the use cases in our application, which show how the new APIs are used. I assume other applications will have the similar use cases. Our application doesn't use spark-submit command line to run spark. We submit both hadoop and spark job directly from our servlet application (jetty). We are deploying in Yarn Cluster Mode. We invoke the Spark Client ( in yarn module) directly. Client can't call System.exists, which will shutdown the jetty JVM. Our application will submit and stop spark job, monitoring the spark job progress, get the states from the spark jobs ( for example, bad data counters ), logging and exceptions. So far the communication is one way (direction) after the job is submitted; we will move to two-ways communication soon. ( for example, a long running spark context, with different short spark actions: distinct counts, samplings, filters, transformations, etc. a pipeline of actions but need to be feedback on each action ( visualization etc.) ) In this particular Pull Request, we only address very limited requirements, the next PR will address the rest of communication mentioned above.
B) We display the progress bar on the UI with the callback C) We get the Yarn Application ID when the spark job is submitted, which can be used for tracking progress or kill the app. ( with next PR, we will be able to directly using the tracking URL to open the spark UI page, show spark job iterations and spark specific progress etc., currently all above are implemented in our application.)
Hope this will give you a better picture as to why this PR is important to us. I will move faster with next PR mentioned. Thanks |
|
Thanks for the explanation. A couple of things.
|
So far, we haven't done this yet. As the communication is one-way push from Server to Application. But we won't like to do something in our next release of application. My next PR, would setup of the communication channel to enable this possibility.
Technically, the second part PR is not directly related to the this PR, event though we used both changes together in our Application. In nutshell, the 2nd PR is simply the following:
I am currently working on isolate the AKKA piece to so that the Netty can be used as the communication layer. In this way, large data size can be transferred. I will make it configurable for people to plugin other network protocols. Due to our own release schedule, I was not able to work as fast as I hoped. But hope this give you the sense what's the overall PR is about. |
|
Sorry, to take so long on this, as I went to working on Hadoop Kerberos authentication Implementation, so I did not get back to this until now. |
|
Can one of the admins verify this patch? |
|
@chesterxgchen thanks for working on this. It seems that major changes have gone into YARN between now and when this was last updated. Would you mind closing this patch for now since it's unlikely to be merged? Feel free to open an updated one against the same issue later and we can move the discussion there. |
|
no problem, thanks. On Mon, Jun 22, 2015 at 8:27 PM, asfgit [email protected] wrote:
|
Spark Yarn Client API change to expose Yarn Resource Capacity, Yarn Application Listener and KillApplication APIs
When working with Spark with Yarn cluster mode, we have following issues:
It would be better we let the application know that the yarn resource capacity a head of time and the spark config can adjusted dynamically.
If the job runs for few hours, we have no idea how far it has gone, the progress and resource usage, tracking URL etc. This Pull Request will not complete solve the issue Removed reference to incubation in Spark user docs. #2, but it will allow expose Yarn Application status: such as when the job is started, killed, finished, the tracking URL etc, some limited progress reporting ( for CDH5 we found the progress only reports 0, 10 and 100%)
I will have another Pull Request to address the Yarn Application and Spark Job communication issue, that's not covered here.
So we need to expose the yarn client's killApplication() API to spark client.
The proposed change is to change Client Constructor, change the first argument from ClientArguments to
YarnResourceCapacity => ClientArguments
Were YarnResourceCapacity contains yarn's max memory and virtual cores as well as overheads.
This allows application to adjust the memory and core settings accordingly.
For existing application that ignore the YarnResourceCapacity the
def toArgs (capacity: YarnResourceCapacity) = new ClientArguments(...)
We also defined the YarnApplicationListener interface that expose some of the information about YarnApplicationReport.
Client.addYarnApplicaitonListener(listerner)
will allow them to get call back at different state of the application, so they can react accordingly.
For example, onApplicationInit() the callback will invoked when the AppId is available but application is not yet started. Once can use this AppId to kill the application if the run is not longer desired.