Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
afaf7bd
[SPARK-33084][CORE][SQL]Add jar support ivy path
AngersZhuuuu Oct 7, 2020
51daf9a
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Oct 7, 2020
3579de0
Update SparkContextSuite.scala
AngersZhuuuu Oct 7, 2020
d6e8caf
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
AngersZhuuuu Oct 7, 2020
169e1f8
Update Utils.scala
AngersZhuuuu Oct 8, 2020
0e589ec
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Nov 21, 2020
b3e3211
resolve conflict
AngersZhuuuu Nov 23, 2020
9161340
Update SessionState.scala
AngersZhuuuu Nov 23, 2020
0e3c1ec
Update sql-ref-syntax-aux-resource-mgmt-add-jar.md
AngersZhuuuu Nov 24, 2020
300ca56
follow comment
AngersZhuuuu Nov 24, 2020
63e877b
https://github.com/apache/spark/pull/29966#discussion_r529242208
AngersZhuuuu Nov 24, 2020
733e62c
follow comment
AngersZhuuuu Nov 24, 2020
b60ba1e
Update sql-ref-syntax-aux-resource-mgmt-add-jar.md
AngersZhuuuu Nov 24, 2020
883b9d3
fix uri re-use
AngersZhuuuu Nov 28, 2020
208afc2
follow comment
AngersZhuuuu Nov 28, 2020
ba9ea29
add warn message whe multiple trasitive
AngersZhuuuu Nov 28, 2020
10b3737
move DependencyUtils
AngersZhuuuu Nov 28, 2020
7f878c2
add ut
AngersZhuuuu Nov 28, 2020
d2c1950
Update SessionState.scala
AngersZhuuuu Nov 28, 2020
2200076
Update SessionState.scala
AngersZhuuuu Nov 28, 2020
5a9cc30
Update DependencyUtils.scala
AngersZhuuuu Nov 29, 2020
875d8a7
Add end to end test
AngersZhuuuu Nov 29, 2020
e921245
Update SQLQuerySuite.scala
AngersZhuuuu Nov 29, 2020
614a865
follow comment
AngersZhuuuu Nov 30, 2020
8c5cb7c
Update SparkContext.scala
AngersZhuuuu Nov 30, 2020
f460974
fix local path with comma
AngersZhuuuu Nov 30, 2020
1f7dc01
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
AngersZhuuuu Dec 1, 2020
050c410
follow comment
AngersZhuuuu Dec 1, 2020
ff611a6
Update SessionState.scala
AngersZhuuuu Dec 1, 2020
03aca3b
split UT and use simply dependency ivy path
AngersZhuuuu Dec 1, 2020
653b919
Update DependencyUtils.scala
AngersZhuuuu Dec 1, 2020
6e48275
Update SparkContext.scala
AngersZhuuuu Dec 1, 2020
bdc5035
follow comment
AngersZhuuuu Dec 2, 2020
9c22882
follow comment
AngersZhuuuu Dec 2, 2020
9c88f8d
follow comment
AngersZhuuuu Dec 2, 2020
8220e5a
Update SparkContextSuite.scala
AngersZhuuuu Dec 2, 2020
49ac62c
follow comment
AngersZhuuuu Dec 2, 2020
b69a62e
Update DependencyUtils.scala
AngersZhuuuu Dec 2, 2020
273a5ac
Follow comment
AngersZhuuuu Dec 3, 2020
ebe1c9c
Update DependencyUtils.scala
AngersZhuuuu Dec 4, 2020
6034fb2
Update sql-ref-syntax-aux-resource-mgmt-add-jar.md
AngersZhuuuu Dec 5, 2020
e22e398
Update SparkContext.scala
AngersZhuuuu Dec 7, 2020
afea73f
Update SparkContext.scala
AngersZhuuuu Dec 7, 2020
13000f2
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Dec 14, 2020
bce3d40
Update SparkContext.scala
AngersZhuuuu Dec 14, 2020
d53f302
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
AngersZhuuuu Dec 14, 2020
57c351d
Update HiveQuerySuite.scala
AngersZhuuuu Dec 15, 2020
8c53b83
follow comment
AngersZhuuuu Dec 22, 2020
4048c5b
https://github.com/apache/spark/pull/29966#discussion_r547040115
AngersZhuuuu Dec 22, 2020
aa53482
Merge branch 'master' into support-add-jar-ivy
AngersZhuuuu Dec 22, 2020
2ffb431
Update SQLQuerySuite.scala
AngersZhuuuu Dec 22, 2020
8c18cdf
Update SparkContext.scala
AngersZhuuuu Dec 23, 2020
6bd41cd
Update SparkSubmit.scala
AngersZhuuuu Dec 23, 2020
fbc236c
follwo comment
AngersZhuuuu Dec 23, 2020
90491d5
Update DependencyUtils.scala
AngersZhuuuu Dec 23, 2020
75ff3ce
Update SparkContextSuite.scala
AngersZhuuuu Dec 23, 2020
4c44dae
follow comment remove default value
AngersZhuuuu Dec 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
follow comment
  • Loading branch information
AngersZhuuuu committed Dec 2, 2020
commit 9c88f8d90b0121d158f8b9ff1597f620ff571dba
29 changes: 10 additions & 19 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1904,16 +1904,15 @@ class SparkContext(config: SparkConf) extends Logging {
if (path == null || path.isEmpty) {
logWarning("null or empty path specified as parameter to addJar")
} else {
var schema = ""
val keys = if (path.contains("\\") && Utils.isWindows) {
val (keys, schema) = if (path.contains("\\") && Utils.isWindows) {
// For local paths with backslashes on Windows, URI throws an exception
addLocalJarFile(new File(path))
(addLocalJarFile(new File(path)), "local")
} else {
val uri = new Path(path).toUri
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
schema = uri.getScheme
schema match {
val uriSchema = uri.getScheme
val jarPaths = uriSchema match {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
Expand All @@ -1928,28 +1927,20 @@ class SparkContext(config: SparkConf) extends Logging {
DependencyUtils.resolveMavenDependencies(URI.create(path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if two added jars have the same dependency with different versions? e.g.,

sc.addJar("ivy://lib1:1.0?transitive=true") // --> it depends on `libX v1.0`
sc.addJar("ivy://lib2:1.0?transitive=true") // --> it depends on `libX v2.0`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case _ => checkRemoteJarFile(path)
}
(jarPaths, uriSchema)
}
if (keys.nonEmpty) {
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty)
if (added.nonEmpty) {
if (schema != "ivy") {
logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp $timestamp")
} else {
logInfo(s"Added dependency jars of ivy uri $path at ${added.mkString(",")}" +
s" with timestamp $timestamp")
}
val jarMessage = if (schema != "ivy") "JAR" else "dependency jars of ivy uri"
logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
postEnvironmentUpdate()
}
if (existed.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to check if this warning message is shown only once by using LogAppender?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to check if this warning message is shown only once by using LogAppender?

Sure

if (schema != "ivy") {
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
"is not supported in the current version.")
} else {
logWarning(s"The dependency jars of ivy URI with $path at" +
s" ${existed.mkString(",")} has been added already." +
s" Overwriting of added jars is not supported in the current version.")
}
val jarMessage = if (schema != "ivy") "JAR" else "dependency jars of ivy uri"
logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." +
s" Overwriting of added jar is not supported in the current version.")
}
}
}
Expand Down
82 changes: 38 additions & 44 deletions core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.util

import java.io.File
import java.net.{URI, URISyntaxException}
import java.net.URI

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -48,6 +48,10 @@ private[spark] object DependencyUtils extends Logging {
IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath)
}

private def checkInvalidQueryString(tokens: Array[String]): Boolean = {
tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1))
}

/**
* Parse URI query string's parameter value of `transitive` and `exclude`.
* Other invalid parameters will be ignored.
Expand All @@ -71,13 +75,13 @@ private[spark] object DependencyUtils extends Logging {
(false, "")
} else {
val mapTokens = uriQuery.split("&").map(_.split("="))
if (mapTokens.exists(token =>
token.length != 2 || StringUtils.isBlank(token(0)) || StringUtils.isBlank(token(1)))) {
throw new URISyntaxException(uri.toString, s"Invalid query string: $uriQuery")
if (mapTokens.exists(checkInvalidQueryString)) {
throw new IllegalArgumentException(
s"Invalid query string in ivy uri ${uri.toString}: $uriQuery")
}
val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
// Parse transitive parameters (e.g., transitive=true) in an ivy URL, default value is false
var transitive: Boolean = false
var transitive = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be a bit more functional / Scala-idiomatic, I think we can do something like groupedParams.get("transitive").takeRight(1).headOption.getOrElse(false) instead of the foreach call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be a bit more functional / Scala-idiomatic, I think we can do something like groupedParams.get("transitive").takeRight(1).headOption.getOrElse(false) instead of the foreach call

In this way will lose warning message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

val transitiveParams = groupedParams.get("transitive")
if (transitiveParams.map(_.size).getOrElse(0) > 1) {
  // log warning
}
val transitive = transitiveParams.flatMap(_.takeRight(1).headOption).getOrElse(false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should do like this:

    val transitiveParams = groupedParams.get("transitive")
      if (transitiveParams.map(_.size).getOrElse(0) > 1) {
        logWarning("It's best to specify `transitive` parameter in ivy URL query only once." +
          " If there are multiple `transitive` parameter, we will select the last one")
      }
      val transitive = transitiveParams.flatMap(_.takeRight(1).map { case (_, value) =>
        value match {
          case "true" => true
          case _ => false
        }
      }.headOption).getOrElse(false)

is equal with current logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, forgot about having to unpack the value. To be a bit more concise you can also do:

val transitive = transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false)

But I think either way looks good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I have too few ideas on how to make code more concise.
Simpler is better, Updated!

groupedParams.get("transitive").foreach { params =>
if (params.length > 1) {
logWarning("It's best to specify `transitive` parameter in ivy URL query only once." +
Expand All @@ -94,27 +98,20 @@ private[spark] object DependencyUtils extends Logging {
val exclusionList = groupedParams.get("exclude").map { params =>
params.map(_._2).flatMap { excludeString =>
val excludes = excludeString.split(",")
if (excludes.map(_.split(":")).exists(token =>
token.length != 2 || StringUtils.isBlank(token(0)) || StringUtils.isBlank(token(1)))) {
throw new URISyntaxException(uri.toString, "Invalid exclude string: " +
"expected 'org:module,org:module,..', found " + excludeString)
if (excludes.map(_.split(":")).exists(checkInvalidQueryString)) {
throw new IllegalArgumentException(
s"Invalid exclude string in ivy uri ${uri.toString}:" +
s" expected 'org:module,org:module,..', found " + excludeString)
}
excludes
}.mkString(",")
}.getOrElse("")

val invalidParams = groupedParams
.filter(entry => !Seq("transitive", "exclude").contains(entry._1))
.keys.toArray.sorted
val validParams = Set("transitive", "exclude")
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq.sorted
if (invalidParams.nonEmpty) {
logWarning(
s"Invalid parameters `${invalidParams.mkString(",")}` found in URI query `$uriQuery`.")
}

groupedParams.foreach { case (key: String, values: Array[(String, String)]) =>
if (key != "transitive" || key != "exclude") {
logWarning("Invalid parameter")
}
logWarning(s"Invalid parameters `${invalidParams.mkString(",")}` found " +
s"in ivy URI query `$uriQuery`.")
}

(transitive, exclusionList)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if an invalid param is given in hive, e.g., invalidParam=xxxx? It is just ignored? Could you add tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should at least warn on invalid param?

Expand All @@ -137,32 +134,29 @@ private[spark] object DependencyUtils extends Logging {
* @return Comma separated string list of jars downloaded.
*/
def resolveMavenDependencies(uri: URI): Seq[String] = {
try {
val ivyProperties = DependencyUtils.getIvyProperties()
val authority = uri.getAuthority
if (authority == null) {
throw new URISyntaxException(
uri.toString, "Invalid url: Expected 'org:module:version', found null")
}
if (authority.split(":").length != 3) {
throw new URISyntaxException(
uri.toString, "Invalid url: Expected 'org:module:version', found " + authority)
}
val ivyProperties = DependencyUtils.getIvyProperties()
val authority = uri.getAuthority
if (authority == null) {
throw new IllegalArgumentException(
s"Invalid ivy url authority in uri ${uri.toString}:" +
s" Expected 'org:module:version', found null.")
}
if (authority.split(":").length != 3) {
throw new IllegalArgumentException(
s"Invalid ivy uri authority in uri ${uri.toString}:" +
s" Expected 'org:module:version', found $authority.")
}

val (transitive, exclusionList) = parseQueryParams(uri)
val (transitive, exclusionList) = parseQueryParams(uri)

resolveMavenDependencies(
transitive,
exclusionList,
authority,
ivyProperties.repositories,
ivyProperties.ivyRepoPath,
Option(ivyProperties.ivySettingsPath)
).split(",")
} catch {
case e: URISyntaxException =>
throw new IllegalArgumentException(e.getMessage)
}
resolveMavenDependencies(
transitive,
exclusionList,
authority,
ivyProperties.repositories,
ivyProperties.ivyRepoPath,
Option(ivyProperties.ivySettingsPath)
).split(",")
}

def resolveMavenDependencies(
Expand Down