Skip to content

Commit d1c511c

Browse files
committed
Merge remote-tracking branch 'origin/master' into avro-ignore-extension
2 parents 565e599 + 8b7d4f8 commit d1c511c

File tree

50 files changed

+1349
-494
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1349
-494
lines changed

bin/docker-image-tool.sh

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ function build {
4949
# Set image build arguments accordingly if this is a source repo and not a distribution archive.
5050
IMG_PATH=resource-managers/kubernetes/docker/src/main/dockerfiles
5151
BUILD_ARGS=(
52+
${BUILD_PARAMS}
5253
--build-arg
5354
img_path=$IMG_PATH
5455
--build-arg
@@ -57,13 +58,14 @@ function build {
5758
else
5859
# Not passed as an argument to docker, but used to validate the Spark directory.
5960
IMG_PATH="kubernetes/dockerfiles"
60-
BUILD_ARGS=()
61+
BUILD_ARGS=(${BUILD_PARAMS})
6162
fi
6263

6364
if [ ! -d "$IMG_PATH" ]; then
6465
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
6566
fi
6667
local BINDING_BUILD_ARGS=(
68+
${BUILD_PARAMS}
6769
--build-arg
6870
base_img=$(image_ref spark)
6971
)
@@ -101,6 +103,8 @@ Options:
101103
-t tag Tag to apply to the built image, or to identify the image to be pushed.
102104
-m Use minikube's Docker daemon.
103105
-n Build docker image with --no-cache
106+
-b arg Build arg to build or push the image. For multiple build args, this option needs to
107+
be used separately for each build arg.
104108
105109
Using minikube when building images will do so directly into minikube's Docker daemon.
106110
There is no need to push the images into minikube in that case, they'll be automatically
@@ -130,7 +134,8 @@ TAG=
130134
BASEDOCKERFILE=
131135
PYDOCKERFILE=
132136
NOCACHEARG=
133-
while getopts f:mr:t:n option
137+
BUILD_PARAMS=
138+
while getopts f:mr:t:n:b: option
134139
do
135140
case "${option}"
136141
in
@@ -139,6 +144,7 @@ do
139144
r) REPO=${OPTARG};;
140145
t) TAG=${OPTARG};;
141146
n) NOCACHEARG="--no-cache";;
147+
b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};;
142148
m)
143149
if ! which minikube 1>/dev/null; then
144150
error "Cannot find minikube."

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl(
697697
* do not also submit those same tasks. That also means that a task completion from an earlier
698698
* attempt can lead to the entire stage getting marked as successful.
699699
*/
700-
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
700+
private[scheduler] def markPartitionCompletedInAllTaskSets(
701+
stageId: Int,
702+
partitionId: Int,
703+
taskInfo: TaskInfo) = {
701704
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
702-
tsm.markPartitionCompleted(partitionId)
705+
tsm.markPartitionCompleted(partitionId, taskInfo)
703706
}
704707
}
705708

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ private[spark] class TaskSetManager(
758758
}
759759
// There may be multiple tasksets for this stage -- we let all of them know that the partition
760760
// was completed. This may result in some of the tasksets getting completed.
761-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
761+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
762762
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
763763
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
764764
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -769,9 +769,12 @@ private[spark] class TaskSetManager(
769769
maybeFinishTaskSet()
770770
}
771771

772-
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
772+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
773773
partitionToIndex.get(partitionId).foreach { index =>
774774
if (!successful(index)) {
775+
if (speculationEnabled && !isZombie) {
776+
successfulTaskDurations.insert(taskInfo.duration)
777+
}
775778
tasksSuccessful += 1
776779
successful(index) = true
777780
if (tasksSuccessful == numTasks) {

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
443443
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
444444
}
445445

446-
test("coalesced RDDs with partial locality") {
446+
test("coalesced RDDs with partial locality") {
447447
// Make an RDD that has some locality preferences and some without. This can happen
448448
// with UnionRDD
449449
val data = sc.makeRDD((1 to 9).map(i => {
@@ -846,6 +846,28 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
846846
assert(partitions(1) === Seq((1, 3), (3, 8), (3, 8)))
847847
}
848848

849+
test("cartesian on empty RDD") {
850+
val a = sc.emptyRDD[Int]
851+
val b = sc.parallelize(1 to 3)
852+
val cartesian_result = Array.empty[(Int, Int)]
853+
assert(a.cartesian(a).collect().toList === cartesian_result)
854+
assert(a.cartesian(b).collect().toList === cartesian_result)
855+
assert(b.cartesian(a).collect().toList === cartesian_result)
856+
}
857+
858+
test("cartesian on non-empty RDDs") {
859+
val a = sc.parallelize(1 to 3)
860+
val b = sc.parallelize(2 to 4)
861+
val c = sc.parallelize(1 to 1)
862+
val a_cartesian_b =
863+
Array((1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4))
864+
val a_cartesian_c = Array((1, 1), (2, 1), (3, 1))
865+
val c_cartesian_a = Array((1, 1), (1, 2), (1, 3))
866+
assert(a.cartesian[Int](b).collect().toList.sorted === a_cartesian_b)
867+
assert(a.cartesian[Int](c).collect().toList.sorted === a_cartesian_c)
868+
assert(c.cartesian[Int](a).collect().toList.sorted === c_cartesian_a)
869+
}
870+
849871
test("intersection") {
850872
val all = sc.parallelize(1 to 10)
851873
val evens = sc.parallelize(2 to 10 by 2)

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
13651365
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
13661366
}
13671367

1368+
test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
1369+
val conf = new SparkConf().set("spark.speculation", "true")
1370+
sc = new SparkContext("local", "test", conf)
1371+
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1372+
sc.conf.set("spark.speculation.multiplier", "0.0")
1373+
sc.conf.set("spark.speculation.quantile", "0.1")
1374+
sc.conf.set("spark.speculation", "true")
1375+
1376+
sched = new FakeTaskScheduler(sc)
1377+
sched.initialize(new FakeSchedulerBackend())
1378+
1379+
val dagScheduler = new FakeDAGScheduler(sc, sched)
1380+
sched.setDAGScheduler(dagScheduler)
1381+
1382+
val taskSet1 = FakeTask.createTaskSet(10)
1383+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet1.tasks.map { task =>
1384+
task.metrics.internalAccums
1385+
}
1386+
1387+
sched.submitTasks(taskSet1)
1388+
sched.resourceOffers(
1389+
(0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1390+
1391+
val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get
1392+
1393+
// fail fetch
1394+
taskSetManager1.handleFailedTask(
1395+
taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED,
1396+
FetchFailed(null, 0, 0, 0, "fetch failed"))
1397+
1398+
assert(taskSetManager1.isZombie)
1399+
assert(taskSetManager1.runningTasks === 9)
1400+
1401+
val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
1402+
sched.submitTasks(taskSet2)
1403+
sched.resourceOffers(
1404+
(11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1405+
1406+
// Complete the 2 tasks and leave 8 task in running
1407+
for (id <- Set(0, 1)) {
1408+
taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1409+
assert(sched.endedTasks(id) === Success)
1410+
}
1411+
1412+
val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get
1413+
assert(!taskSetManager2.successfulTaskDurations.isEmpty())
1414+
taskSetManager2.checkSpeculatableTasks(0)
1415+
}
1416+
13681417
private def createTaskResult(
13691418
id: Int,
13701419
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

dev/deps/spark-deps-hadoop-2.6

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ JavaEWAH-0.3.2.jar
22
RoaringBitmap-0.5.11.jar
33
ST4-4.0.4.jar
44
activation-1.1.1.jar
5-
aircompressor-0.8.jar
5+
aircompressor-0.10.jar
66
antlr-2.7.7.jar
77
antlr-runtime-3.4.jar
88
antlr4-runtime-4.7.jar
@@ -157,8 +157,9 @@ objenesis-2.1.jar
157157
okhttp-3.8.1.jar
158158
okio-1.13.0.jar
159159
opencsv-2.3.jar
160-
orc-core-1.4.4-nohive.jar
161-
orc-mapreduce-1.4.4-nohive.jar
160+
orc-core-1.5.2-nohive.jar
161+
orc-mapreduce-1.5.2-nohive.jar
162+
orc-shims-1.5.2.jar
162163
oro-2.0.8.jar
163164
osgi-resource-locator-1.0.1.jar
164165
paranamer-2.8.jar

dev/deps/spark-deps-hadoop-2.7

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ JavaEWAH-0.3.2.jar
22
RoaringBitmap-0.5.11.jar
33
ST4-4.0.4.jar
44
activation-1.1.1.jar
5-
aircompressor-0.8.jar
5+
aircompressor-0.10.jar
66
antlr-2.7.7.jar
77
antlr-runtime-3.4.jar
88
antlr4-runtime-4.7.jar
@@ -158,8 +158,9 @@ objenesis-2.1.jar
158158
okhttp-3.8.1.jar
159159
okio-1.13.0.jar
160160
opencsv-2.3.jar
161-
orc-core-1.4.4-nohive.jar
162-
orc-mapreduce-1.4.4-nohive.jar
161+
orc-core-1.5.2-nohive.jar
162+
orc-mapreduce-1.5.2-nohive.jar
163+
orc-shims-1.5.2.jar
163164
oro-2.0.8.jar
164165
osgi-resource-locator-1.0.1.jar
165166
paranamer-2.8.jar

dev/deps/spark-deps-hadoop-3.1

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ RoaringBitmap-0.5.11.jar
44
ST4-4.0.4.jar
55
accessors-smart-1.2.jar
66
activation-1.1.1.jar
7-
aircompressor-0.8.jar
7+
aircompressor-0.10.jar
88
antlr-2.7.7.jar
99
antlr-runtime-3.4.jar
1010
antlr4-runtime-4.7.jar
@@ -176,8 +176,9 @@ okhttp-2.7.5.jar
176176
okhttp-3.8.1.jar
177177
okio-1.13.0.jar
178178
opencsv-2.3.jar
179-
orc-core-1.4.4-nohive.jar
180-
orc-mapreduce-1.4.4-nohive.jar
179+
orc-core-1.5.2-nohive.jar
180+
orc-mapreduce-1.5.2-nohive.jar
181+
orc-shims-1.5.2.jar
181182
oro-2.0.8.jar
182183
osgi-resource-locator-1.0.1.jar
183184
paranamer-2.8.jar

docs/mllib-data-types.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ Refer to the [`Matrix` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.
317317
from pyspark.mllib.linalg import Matrix, Matrices
318318

319319
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
320-
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
320+
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
321321

322322
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
323323
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
@@ -624,7 +624,7 @@ from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
624624

625625
# Create an RDD of coordinate entries.
626626
# - This can be done explicitly with the MatrixEntry class:
627-
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)])
627+
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
628628
# - or using (long, long, float) tuples:
629629
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
630630

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
5858
options: Map[String, String],
5959
files: Seq[FileStatus]): Option[StructType] = {
6060
val conf = spark.sparkContext.hadoopConfiguration
61+
val parsedOptions = new AvroOptions(options)
6162

6263
// Schema evolution is not supported yet. Here we only pick a single random sample file to
6364
// figure out the schema of the whole dataset.
@@ -74,7 +75,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
7475
}
7576

7677
// User can specify an optional avro json schema.
77-
val avroSchema = options.get(AvroFileFormat.AvroSchema)
78+
val avroSchema = parsedOptions.schema
7879
.map(new Schema.Parser().parse)
7980
.getOrElse {
8081
val in = new FsInput(sampleFile.getPath, conf)
@@ -112,10 +113,9 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
112113
job: Job,
113114
options: Map[String, String],
114115
dataSchema: StructType): OutputWriterFactory = {
115-
val recordName = options.getOrElse("recordName", "topLevelRecord")
116-
val recordNamespace = options.getOrElse("recordNamespace", "")
116+
val parsedOptions = new AvroOptions(options)
117117
val outputAvroSchema = SchemaConverters.toAvroType(
118-
dataSchema, nullable = false, recordName, recordNamespace)
118+
dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
119119

120120
AvroJob.setOutputKeySchema(job, outputAvroSchema)
121121
val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
@@ -158,11 +158,12 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
158158

159159
val broadcastedConf =
160160
spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
161+
val parsedOptions = new AvroOptions(options)
161162

162163
(file: PartitionedFile) => {
163164
val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
164165
val conf = broadcastedConf.value.value
165-
val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
166+
val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)
166167

167168
// TODO Removes this check once `FileFormat` gets a general file filtering interface method.
168169
// Doing input file filtering is improper because we may generate empty tasks that process no
@@ -233,8 +234,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
233234
private[avro] object AvroFileFormat {
234235
val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
235236

236-
val AvroSchema = "avroSchema"
237-
238237
class SerializableConfiguration(@transient var value: Configuration)
239238
extends Serializable with KryoSerializable {
240239
@transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)

0 commit comments

Comments
 (0)