Skip to content

Commit a4f771a

Browse files
committed
merge with master
2 parents 065af19 + 3bd8ddf commit a4f771a

File tree

11 files changed

+228
-36
lines changed

11 files changed

+228
-36
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,8 @@ private[spark] class TaskSetManager(
874874
// and we are not using an external shuffle server which could serve the shuffle outputs.
875875
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
876876
// so we would need to rerun these tasks on other executors.
877-
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
877+
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
878+
&& !isZombie) {
878879
for ((tid, info) <- taskInfos if info.executorId == execId) {
879880
val index = taskInfos(tid).index
880881
if (successful(index)) {
@@ -906,8 +907,6 @@ private[spark] class TaskSetManager(
906907
* Check for tasks to be speculated and return true if there are any. This is called periodically
907908
* by the TaskScheduler.
908909
*
909-
* TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
910-
* we don't scan the whole task set. It might also help to make this sorted by launch time.
911910
*/
912911
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
913912
// Can't speculate if we only have one task, and no need to speculate if the task set is a
@@ -927,7 +926,8 @@ private[spark] class TaskSetManager(
927926
// TODO: Threshold should also look at standard deviation of task durations and have a lower
928927
// bound based on that.
929928
logDebug("Task length threshold for speculation: " + threshold)
930-
for ((tid, info) <- taskInfos) {
929+
for (tid <- runningTasksSet) {
930+
val info = taskInfos(tid)
931931
val index = info.index
932932
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
933933
!speculatableTasks.contains(index)) {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.util.Random
20+
import java.util.{Properties, Random}
2121

2222
import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
@@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when}
2828
import org.apache.spark._
2929
import org.apache.spark.internal.config
3030
import org.apache.spark.internal.Logging
31+
import org.apache.spark.serializer.SerializerInstance
3132
import org.apache.spark.storage.BlockManagerId
3233
import org.apache.spark.util.{AccumulatorV2, ManualClock}
3334

@@ -664,6 +665,67 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
664665
assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize"))
665666
}
666667

668+
test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") {
669+
val conf = new SparkConf().set("spark.speculation", "true")
670+
sc = new SparkContext("local", "test", conf)
671+
672+
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
673+
sched.initialize(new FakeSchedulerBackend() {
674+
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {}
675+
})
676+
677+
// Keep track of the number of tasks that are resubmitted,
678+
// so that the test can check that no tasks were resubmitted.
679+
var resubmittedTasks = 0
680+
val dagScheduler = new FakeDAGScheduler(sc, sched) {
681+
override def taskEnded(
682+
task: Task[_],
683+
reason: TaskEndReason,
684+
result: Any,
685+
accumUpdates: Seq[AccumulatorV2[_, _]],
686+
taskInfo: TaskInfo): Unit = {
687+
super.taskEnded(task, reason, result, accumUpdates, taskInfo)
688+
reason match {
689+
case Resubmitted => resubmittedTasks += 1
690+
case _ =>
691+
}
692+
}
693+
}
694+
sched.setDAGScheduler(dagScheduler)
695+
696+
val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
697+
override def index: Int = 0
698+
}, Seq(TaskLocation("host1", "execA")), new Properties, null)
699+
val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
700+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
701+
702+
// Offer host1, which should be accepted as a PROCESS_LOCAL location
703+
// by the one task in the task set
704+
val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get
705+
706+
// Mark the task as available for speculation, and then offer another resource,
707+
// which should be used to launch a speculative copy of the task.
708+
manager.speculatableTasks += singleTask.partitionId
709+
val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get
710+
711+
assert(manager.runningTasks === 2)
712+
assert(manager.isZombie === false)
713+
714+
val directTaskResult = new DirectTaskResult[String](null, Seq()) {
715+
override def value(resultSer: SerializerInstance): String = ""
716+
}
717+
// Complete one copy of the task, which should result in the task set manager
718+
// being marked as a zombie, because at least one copy of its only task has completed.
719+
manager.handleSuccessfulTask(task1.taskId, directTaskResult)
720+
assert(manager.isZombie === true)
721+
assert(resubmittedTasks === 0)
722+
assert(manager.runningTasks === 1)
723+
724+
manager.executorLost("execB", "host2", new SlaveLost())
725+
assert(manager.runningTasks === 0)
726+
assert(resubmittedTasks === 0)
727+
}
728+
667729
test("speculative and noPref task should be scheduled after node-local") {
668730
sc = new SparkContext("local", "test")
669731
sched = new FakeTaskScheduler(

examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
/**
4343
* An example demonstrating BucketedRandomProjectionLSH.
4444
* Run with:
45-
* bin/run-example org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample
45+
* bin/run-example ml.JavaBucketedRandomProjectionLSHExample
4646
*/
4747
public class JavaBucketedRandomProjectionLSHExample {
4848
public static void main(String[] args) {

examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
/**
4343
* An example demonstrating MinHashLSH.
4444
* Run with:
45-
* bin/run-example org.apache.spark.examples.ml.JavaMinHashLSHExample
45+
* bin/run-example ml.JavaMinHashLSHExample
4646
*/
4747
public class JavaMinHashLSHExample {
4848
public static void main(String[] args) {

examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
2828
/**
2929
* An example demonstrating BucketedRandomProjectionLSH.
3030
* Run with:
31-
* bin/run-example org.apache.spark.examples.ml.BucketedRandomProjectionLSHExample
31+
* bin/run-example ml.BucketedRandomProjectionLSHExample
3232
*/
3333
object BucketedRandomProjectionLSHExample {
3434
def main(args: Array[String]): Unit = {

examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
2828
/**
2929
* An example demonstrating MinHashLSH.
3030
* Run with:
31-
* bin/run-example org.apache.spark.examples.ml.MinHashLSHExample
31+
* bin/run-example ml.MinHashLSHExample
3232
*/
3333
object MinHashLSHExample {
3434
def main(args: Array[String]): Unit = {

python/pyspark/ml/feature.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, HasOutp
258258
def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
259259
bucketLength=None):
260260
"""
261-
__init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
261+
__init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, \
262262
bucketLength=None)
263263
"""
264264
super(BucketedRandomProjectionLSH, self).__init__()

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
141141
}
142142

143143
saveDataIntoTable(
144-
sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
144+
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
145145
} else {
146146
assert(table.schema.isEmpty)
147147

@@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand(
151151
table.storage.locationUri
152152
}
153153
val result = saveDataIntoTable(
154-
sparkSession, table, tableLocation, query, mode, tableExists = false)
154+
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
155155
val newTable = table.copy(
156156
storage = table.storage.copy(locationUri = tableLocation),
157157
// We will use the schema of resolved.relation as the schema of the table (instead of

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1836,18 +1836,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18361836
test("insert data to a data source table which has a not existed location should succeed") {
18371837
withTable("t") {
18381838
withTempDir { dir =>
1839-
val path = dir.toURI.toString.stripSuffix("/")
18401839
spark.sql(
18411840
s"""
18421841
|CREATE TABLE t(a string, b int)
18431842
|USING parquet
1844-
|OPTIONS(path "$path")
1843+
|OPTIONS(path "$dir")
18451844
""".stripMargin)
18461845
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1847-
assert(table.location == path)
1846+
assert(table.location == dir.getAbsolutePath)
18481847

18491848
dir.delete
1850-
val tableLocFile = new File(table.location.stripPrefix("file:"))
1849+
val tableLocFile = new File(table.location)
18511850
assert(!tableLocFile.exists)
18521851
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
18531852
assert(tableLocFile.exists)
@@ -1878,16 +1877,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18781877
test("insert into a data source table with no existed partition location should succeed") {
18791878
withTable("t") {
18801879
withTempDir { dir =>
1881-
val path = dir.toURI.toString.stripSuffix("/")
18821880
spark.sql(
18831881
s"""
18841882
|CREATE TABLE t(a int, b int, c int, d int)
18851883
|USING parquet
18861884
|PARTITIONED BY(a, b)
1887-
|LOCATION "$path"
1885+
|LOCATION "$dir"
18881886
""".stripMargin)
18891887
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1890-
assert(table.location == path)
1888+
assert(table.location == dir.getAbsolutePath)
18911889

18921890
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
18931891
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1906,15 +1904,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
19061904
test("read data from a data source table which has a not existed location should succeed") {
19071905
withTable("t") {
19081906
withTempDir { dir =>
1909-
val path = dir.toURI.toString.stripSuffix("/")
19101907
spark.sql(
19111908
s"""
19121909
|CREATE TABLE t(a string, b int)
19131910
|USING parquet
1914-
|OPTIONS(path "$path")
1911+
|OPTIONS(path "$dir")
19151912
""".stripMargin)
19161913
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1917-
assert(table.location == path)
1914+
assert(table.location == dir.getAbsolutePath)
19181915

19191916
dir.delete()
19201917
checkAnswer(spark.table("t"), Nil)
@@ -1939,7 +1936,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
19391936
|CREATE TABLE t(a int, b int, c int, d int)
19401937
|USING parquet
19411938
|PARTITIONED BY(a, b)
1942-
|LOCATION "${dir.toURI}"
1939+
|LOCATION "$dir"
19431940
""".stripMargin)
19441941
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
19451942
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1952,4 +1949,51 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
19521949
}
19531950
}
19541951
}
1952+
1953+
Seq(true, false).foreach { shouldDelete =>
1954+
val tcName = if (shouldDelete) "non-existent" else "existed"
1955+
test(s"CTAS for external data source table with a $tcName location") {
1956+
withTable("t", "t1") {
1957+
withTempDir {
1958+
dir =>
1959+
if (shouldDelete) {
1960+
dir.delete()
1961+
}
1962+
spark.sql(
1963+
s"""
1964+
|CREATE TABLE t
1965+
|USING parquet
1966+
|LOCATION '$dir'
1967+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1968+
""".stripMargin)
1969+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1970+
assert(table.location == dir.getAbsolutePath)
1971+
1972+
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
1973+
}
1974+
// partition table
1975+
withTempDir {
1976+
dir =>
1977+
if (shouldDelete) {
1978+
dir.delete()
1979+
}
1980+
spark.sql(
1981+
s"""
1982+
|CREATE TABLE t1
1983+
|USING parquet
1984+
|PARTITIONED BY(a, b)
1985+
|LOCATION '$dir'
1986+
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
1987+
""".stripMargin)
1988+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
1989+
assert(table.location == dir.getAbsolutePath)
1990+
1991+
val partDir = new File(dir, "a=3")
1992+
assert(partDir.exists())
1993+
1994+
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
1995+
}
1996+
}
1997+
}
1998+
}
19551999
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -284,19 +284,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
284284
sql("DROP TABLE hiveTableWithStructValue")
285285
}
286286

287-
test("Reject partitioning that does not match table") {
288-
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
289-
sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
290-
val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
291-
.toDF("id", "data", "part")
292-
293-
intercept[AnalysisException] {
294-
// cannot partition by 2 fields when there is only one in the table definition
295-
data.write.partitionBy("part", "data").insertInto("partitioned")
296-
}
297-
}
298-
}
299-
300287
test("Test partition mode = strict") {
301288
withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
302289
sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")

0 commit comments

Comments
 (0)