-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40640][CORE] SparkHadoopUtil to set origin of hadoop/hive config options #38084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40640][CORE] SparkHadoopUtil to set origin of hadoop/hive config options #38084
Conversation
The options passed from spark conf, hive-site.xml, AWS env vars now all record this in their source attribute of the entries. The Configuration Writable methods do not propagate this, so it is not as useful cluster-wide than it could be. It does help with some of the basic troubleshooting. Change-Id: Id3c7cbf1f1d17ef1bf3cc7d8144c84260861a4e5
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Outdated
Show resolved
Hide resolved
|
as well as Atilla's suggestion, i plan to have HiveClientImpl resolve and propagate the origin |
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
Co-authored-by: Attila Zsolt Piros <[email protected]>
…te.scala Co-authored-by: Attila Zsolt Piros <[email protected]>
* update tests to match changed strings * HiveClientImpl.newHiveConf() does some of this too. It doesn't pass on the origin from any hadoop conf passed in because that conf comes in as JIterable[JMap.Entry[String, String]]; reflection could be used *or* the params changed, which introduces risk to what is just a minor debug change. * SparkHadoopUtil.extractOrigin() is there for anyone who ever wants to do that in future. Change-Id: Ia961c6b084e4cdba7e55d5687b32d9ea14c73b84
remove the unused ref to SparkHadoopUtil in HiveClientImpl Change-Id: Id604178023762f8712dc727073232f6dcf5ca7cd
| * @param defVal default value | ||
| * @return the origin of the current entry in the configuration, or the default. | ||
| */ | ||
| def extractOrigin(hadoopConf: Configuration, key: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed as this doesn't depends on the state of the SparkHadoopUtil.
Even in the SparkHadoopUtilSuite line 157 we can just remove the new (and the parens) and then you can access the method of the object:
So instead of:
val origin = new SparkHadoopUtil().extractOrigin(hadoopConf, key, null)
We can:
val origin = SparkHadoopUtil.extractOrigin(hadoopConf, key, null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok; i was just going with the rest
| * @param defVal default value | ||
| * @return the origin of the current entry in the configuration, or the default. | ||
| */ | ||
| def extractOrigin(hadoopConf: Configuration, key: String, defVal: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need defVal and empty string is fine for an unknown source.
| def extractOrigin(hadoopConf: Configuration, key: String, defVal: String): String = { | ||
| val sources = hadoopConf.getPropertySources(key) | ||
| if (sources != null && sources.length > 0) { | ||
| sources(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has the first one has special meaning? Is it the one which wins? The getPropertySources javadoc does not really clarify this:
* returns a list of the sources of the resource. The older sources are
* the first ones in the list. So for example if a configuration is set from
* the command line, and then written out to a file that is read back in the
* first entry would indicate that it was set from the command line, while
* the second one would indicate the file that the new configuration was read
* in from.
If the first wins let's mention that here in a comment.
Otherwise we could give back a coma separated String easily: sources.mkString(", ") if that's fits better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do the mkstring(); it can only help debugging to know that entire chain. This is what cloudstore does, incidentally. I also plan to improve the s3a credential providers to log at debug their sources as passed in, maybe even some simple crc2 checksum (so you can use crc32 on the command like to verify your secrets are in, but not enough that if someone has the log they can determine a valid set of credentials)
| * @param defVal default value | ||
| * @return the origin of the current entry in the configuration, or the default. | ||
| */ | ||
| def extractOrigin(hadoopConf: Configuration, key: String, defVal: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to use the name propertySource (or propertySources if all is needed see below) to decrease the need of mapping of names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you've just pointed out that I have always my ordering of where changes come from wrong. I was of the belief that newest came first. Now I will have to review where else I've used that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So do we know something about the orders?
- Which one wins if one key is set from two different sources? The last one?
- Does the last on is the first one in the
getPropertySources()result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have the answer we should fix the documentation (in the Hadoop code).
1. Review comments 2. AWS env var propagation records hostname on which these were added. This helps identify issues where configurations are being created in the cluster, rather than on the deploying host -the one with the secrets. 3. extracted the env propagation method into its own private[deploy] method so the unit tests can validate this works. 4, reviewed comments on other setters and updated them to explicitly state which hadoop versions have the issues which need fixing. Change-Id: Ie58856ce621e288c4a8bb06083e43f450f4ec26d
|
one more idea. To help debug the origin of config files, how about we always patch the config with (if unset) some property like spark.diagnostics.config.origin to that hostname? |
Ok but let's do that in separate PR. |
|
that last change seems to break a yarn test |
|
Interesting locally (on its own) it is passing: |
|
Those are flaky tests: Please re-trigger the check! |
| import java.security.PrivilegedExceptionAction | ||
| import java.text.DateFormat | ||
| import java.util.{Arrays, Date, Locale} | ||
| import java.util |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-import java.util.{Arrays, Date, Locale}
-import java.util
+import java.util.{Arrays, Date, Locale, Map => JMap}| hadoopConf.set("fs.s3a.session.token", sessionToken) | ||
| } | ||
| } | ||
| val env: util.Map[String, String] = System.getenv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- val env: util.Map[String, String] = System.getenv
+ val env: JMap[String, String] = System.getenv| // Exposed for testing | ||
| private[deploy] def appendS3CredentialsFromEnvironment( | ||
| hadoopConf: Configuration, | ||
| env: util.Map[String, String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- hadoopConf: Configuration,
- env: util.Map[String, String]): Unit = {
+ hadoopConf: Configuration,
+ env: JMap[String, String]): Unit = {| } | ||
| } | ||
| val env: util.Map[String, String] = System.getenv | ||
| appendS3CredentialsFromEnvironment(hadoopConf, env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If env is used by only appendS3CredentialsFromEnvironment, shall we invoke System.genenv inside that method instead of here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i want it isolated for testing; we can't set env vars in a unit test, but we can instead build up a map and verify propagation there.
| hadoopConf.set(key.substring("spark.hadoop.".length), value, | ||
| "Set by Spark from keys starting with 'spark.hadoop'") | ||
| } | ||
| val setBySpark = "Set by Spark to default values" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you define these constants in object SparkHadoopUtil, please?
"Set by Spark from hive-site.xml""Set by Spark from keys starting with 'spark.hive'""Set by Spark from keys starting with 'spark.hadoop'""Set by Spark to default values"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy to
|
|
||
| private val setToDefaultValues = "Set by Spark to default values" | ||
| private val hadoopPropagation = "Set by Spark from keys starting with 'spark.hadoop'" | ||
| private val hivePropagation = "Set by Spark from keys starting with 'spark.hive'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove these string duplications by https://github.com/apache/spark/pull/38084/files#r989713735.
| /** | ||
| * The explicit buffer size propagation records this. | ||
| */ | ||
| test("buffer size propagation") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- test("buffer size propagation") {
+ test("SPARK-40640: buffer size propagation") {| assertConfigMatches(hadoopConf, "io.file.buffer.size", "123", BUFFER_SIZE.key) | ||
| } | ||
|
|
||
| test("aws credentials from environment variables") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- test("aws credentials from environment variables") {
+ test("SPARK-40640: aws credentials from environment variables") {| hadoopConf: Configuration, | ||
| key: String, | ||
| expected: String, | ||
| expectedSource: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation: Add two more spaces for parameters.
private def assertConfigMatches(
hadoopConf: Configuration,
key: String,
expected: String,
expectedSource: String): Unit = {
assertConfigValue(hadoopConf, key, expected)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need to tweak my scala ide rules, i see
| private def assertConfigSourceContains( | ||
| hadoopConf: Configuration, | ||
| key: String, | ||
| expectedSource: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Indentation. Two more spaces for parameters.
| val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++ | ||
| sparkConf.getAll.toMap ++ extraConfig).toMap | ||
| confMap.foreach { case (k, v) => hiveConf.set(k, v) } | ||
| val fromSpark = "Set by Spark" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this is tricky, let's collect this into the same place (https://github.com/apache/spark/pull/38084/files#r989713735).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gentle ping about this leftover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is non-test code string, we should have in the same place with the others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, what is the solution there? Even when exported with private[spark] it wasn't picked up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, what is the solution there? Even when exported with private[spark] it wasn't picked up
Okay, let me make a PR to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I finished the first round review, @steveloughran . In general, the PR looks helpful.
* pull out source strings and env vars into constants; use * add a new test case Change-Id: I88f525745c046cc7040eab5501449447a0d5924c
| for ((key, value) <- conf.getAll if key.startsWith("spark.hive.")) { | ||
| hadoopConf.set(key.substring("spark.".length), value) | ||
| hadoopConf.set(key.substring("spark.".length), value, | ||
| SOURCE_SPARK_HIVE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it into a single line?
hadoopConf.set(key.substring("spark.".length), value, SOURCE_SPARK_HIVE)| hadoopConf.set("fs.s3a.session.token", sessionToken) | ||
| } | ||
| } | ||
| appendS3CredentialsFromEnvironment(hadoopConf, System.getenv) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still worrying about the chance of the regression of getting and passing a large map of strings here. There were some reports like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you really want to hand over a map as a dependency injection, can you build a small one by reusing the existing logic only like System.getenv("AWS_ACCESS_KEY_ID")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having it pass down separate arguments; no map at all
|
|
||
| /** | ||
| * Return a hostname without throwing an exception if the system | ||
| * does not know its own name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering when does this happen? Apache Spark has the following codes.
core/src/main/scala/org/apache/spark/util/Utils.scala: val address = InetAddress.getLocalHost
core/src/main/scala/org/apache/spark/util/Utils.scala: logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
core/src/main/scala/org/apache/spark/util/Utils.scala: logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is an interesting q. tracing the history from hadoop NetUtil and back, came in with https://issues.apache.org/jira/browse/HADOOP-1028 *and no explicit "we have seen this". I have seen machines with the wrong name, wrong IP, and even localhost not mapped to a 127. address. but not this. will cut and assume nicholas just added it to save on exception propagation in java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inlining
| assertConfigValue(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", "true") | ||
| assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com") | ||
| assertConfigMatches(hadoopConf, "orc.filterPushdown", "true", | ||
| SOURCE_SPARK_HADOOP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to split the line? Otherwise, please make it as a single line.
| /** | ||
| * spark.hive.* is passed to the hadoop config as hive.*. | ||
| */ | ||
| test("spark.hive propagation") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test case prefix, SPARK-40640: .
* move from env var map to explicit args of values; test gets leaner Change-Id: I74f76427321296583235c9600b10a70cbd42777a
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a PR to your branch, @steveloughran .
Minor changes
|
thanks, merged in. when i tried that xref of the constant to hive class things wouldn't resolve properly; clearly some local build issue. |
|
test failure seems nowhere near this code |
|
Ya, I agree with you. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master for Apache Spark 3.4.0.
|
Thank you, @steveloughran and @attilapiros ! |
|
thx. still not sufficient, but it may help. |
…ig options ### What changes were proposed in this pull request? The options passed from spark conf, hive-site.xml, AWS env vars now all record this in their source attribute of the entries. The Configuration Writable methods do not propagate this, so it is not as useful cluster-wide than it could be. It does help with some of the basic troubleshooting. ### Why are the changes needed? Helps when troubleshooting where options make their way down. These can be examined and logged later. For example, my cloudstore diagnosticss JAR can do this in its storediag command and in an s3a AWS credential provider. I may add some of that logging at debug to the ASF hadoop implementations. https://github.com/steveloughran/cloudstore ### Does this PR introduce _any_ user-facing change? Not *really*. It's a very low level diagnostics feature in the Hadoop configuration classes. ### How was this patch tested? New tests added; existing tests enhanced. Closes apache#38084 from steveloughran/SPARK-40640-spark-conf-propagation. Lead-authored-by: Steve Loughran <[email protected]> Co-authored-by: Steve Loughran <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
The options passed from spark conf, hive-site.xml, AWS env vars now all record this in their source attribute of the entries.
The Configuration Writable methods do not propagate this, so it is not as useful cluster-wide than it could be. It does help with some of the basic troubleshooting.
Why are the changes needed?
Helps when troubleshooting where options make their way down. These can be examined
and logged later.
For example, my cloudstore diagnosticss JAR can do this in its storediag command
and in an s3a AWS credential provider. I may add some of that logging
at debug to the ASF hadoop implementations.
https://github.com/steveloughran/cloudstore
Does this PR introduce any user-facing change?
Not really. It's a very low level diagnostics feature in the Hadoop configuration classes.
How was this patch tested?
New tests added; existing tests enhanced.