Skip to content

Commit 59d87d2

Browse files
brkyvzzsxwing
authored andcommitted
[SPARK-17650] malformed url's throw exceptions before bricking Executors
## What changes were proposed in this pull request? When a malformed URL was sent to Executors through `sc.addJar` and `sc.addFile`, the executors become unusable, because they constantly throw `MalformedURLException`s and can never acknowledge that the file or jar is just bad input. This PR tries to fix that problem by making sure MalformedURLs can never be submitted through `sc.addJar` and `sc.addFile`. Another solution would be to blacklist bad files and jars on Executors. Maybe fail the first time, and then ignore the second time (but print a warning message). ## How was this patch tested? Unit tests in SparkContextSuite Author: Burak Yavuz <[email protected]> Closes #15224 from brkyvz/SPARK-17650.
1 parent de333d1 commit 59d87d2

File tree

3 files changed

+51
-7
lines changed

3 files changed

+51
-7
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import java.io._
2121
import java.lang.reflect.Constructor
22-
import java.net.URI
22+
import java.net.{MalformedURLException, URI}
2323
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
2424
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
@@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker
3636
import org.apache.commons.lang3.SerializationUtils
3737
import org.apache.hadoop.conf.Configuration
3838
import org.apache.hadoop.fs.{FileSystem, Path}
39-
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
40-
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
41-
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
42-
TextInputFormat}
39+
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
40+
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
4341
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
4442
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
4543

4644
import org.apache.spark.annotation.DeveloperApi
4745
import org.apache.spark.broadcast.Broadcast
4846
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
49-
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat,
50-
WholeTextFileInputFormat}
47+
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
5148
import org.apache.spark.internal.Logging
5249
import org.apache.spark.internal.config._
5350
import org.apache.spark.io.CompressionCodec
@@ -1452,6 +1449,9 @@ class SparkContext(config: SparkConf) extends Logging {
14521449
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
14531450
"turned on.")
14541451
}
1452+
} else {
1453+
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
1454+
Utils.validateURL(uri)
14551455
}
14561456

14571457
val key = if (!isLocal && scheme == "file") {
@@ -1711,6 +1711,8 @@ class SparkContext(config: SparkConf) extends Logging {
17111711
key = env.rpcEnv.fileServer.addJar(new File(path))
17121712
} else {
17131713
val uri = new URI(path)
1714+
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
1715+
Utils.validateURL(uri)
17141716
key = uri.getScheme match {
17151717
// A JAR file which exists only on the driver node
17161718
case null | "file" =>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,26 @@ private[spark] object Utils extends Logging {
697697
}
698698
}
699699

700+
/**
701+
* Validate that a given URI is actually a valid URL as well.
702+
* @param uri The URI to validate
703+
*/
704+
@throws[MalformedURLException]("when the URI is an invalid URL")
705+
def validateURL(uri: URI): Unit = {
706+
Option(uri.getScheme).getOrElse("file") match {
707+
case "http" | "https" | "ftp" =>
708+
try {
709+
uri.toURL
710+
} catch {
711+
case e: MalformedURLException =>
712+
val ex = new MalformedURLException(s"URI (${uri.toString}) is not a valid URL.")
713+
ex.initCause(e)
714+
throw ex
715+
}
716+
case _ => // will not be turned into a URL anyway
717+
}
718+
}
719+
700720
/**
701721
* Get the path of a temporary directory. Spark's local directories can be configured through
702722
* multiple settings, which are used with the following precedence:

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.io.File
21+
import java.net.MalformedURLException
2122
import java.nio.charset.StandardCharsets
2223
import java.util.concurrent.TimeUnit
2324

@@ -173,6 +174,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
173174
}
174175
}
175176

177+
test("SPARK-17650: malformed url's throw exceptions before bricking Executors") {
178+
try {
179+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
180+
Seq("http", "https", "ftp").foreach { scheme =>
181+
val badURL = s"$scheme://user:pwd/path"
182+
val e1 = intercept[MalformedURLException] {
183+
sc.addFile(badURL)
184+
}
185+
assert(e1.getMessage.contains(badURL))
186+
val e2 = intercept[MalformedURLException] {
187+
sc.addJar(badURL)
188+
}
189+
assert(e2.getMessage.contains(badURL))
190+
assert(sc.addedFiles.isEmpty)
191+
assert(sc.addedJars.isEmpty)
192+
}
193+
} finally {
194+
sc.stop()
195+
}
196+
}
197+
176198
test("addFile recursive works") {
177199
val pluto = Utils.createTempDir()
178200
val neptune = Utils.createTempDir(pluto.getAbsolutePath)

0 commit comments

Comments
 (0)