Skip to content

Commit 62010af

Browse files
committed
Merge branch 'master' into SPARK-10117
2 parents a97ee97 + 67580f1 commit 62010af

File tree

35 files changed

+778
-121
lines changed

35 files changed

+778
-121
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ public UnsafeShuffleExternalSorter(
122122
this.maxRecordSizeBytes = pageSizeBytes - 4;
123123
this.writeMetrics = writeMetrics;
124124
initializeForWriting();
125+
126+
// preserve first page to ensure that we have at least one page to work with. Otherwise,
127+
// other operators in the same task may starve this sorter (SPARK-9709).
128+
acquireNewPageIfNecessary(pageSizeBytes);
125129
}
126130

127131
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1516,8 +1516,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15161516
*/
15171517
@DeveloperApi
15181518
def getRDDStorageInfo: Array[RDDInfo] = {
1519+
getRDDStorageInfo(_ => true)
1520+
}
1521+
1522+
private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
15191523
assertNotStopped()
1520-
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
1524+
val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
15211525
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
15221526
rddInfos.filter(_.isCached)
15231527
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy.history
1919

2020
import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
21+
import java.util.UUID
2122
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
2223
import java.util.zip.{ZipEntry, ZipOutputStream}
2324

@@ -73,7 +74,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
7374
// The modification time of the newest log detected during the last scan. This is used
7475
// to ignore logs that are older during subsequent scans, to avoid processing data that
7576
// is already known.
76-
private var lastModifiedTime = -1L
77+
private var lastScanTime = -1L
7778

7879
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
7980
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -179,15 +180,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
179180
*/
180181
private[history] def checkForLogs(): Unit = {
181182
try {
183+
val newLastScanTime = getNewLastScanTime()
182184
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
183185
.getOrElse(Seq[FileStatus]())
184-
var newLastModifiedTime = lastModifiedTime
185186
val logInfos: Seq[FileStatus] = statusList
186187
.filter { entry =>
187188
try {
188189
getModificationTime(entry).map { time =>
189-
newLastModifiedTime = math.max(newLastModifiedTime, time)
190-
time >= lastModifiedTime
190+
time >= lastScanTime
191191
}.getOrElse(false)
192192
} catch {
193193
case e: AccessControlException =>
@@ -224,12 +224,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
224224
}
225225
}
226226

227-
lastModifiedTime = newLastModifiedTime
227+
lastScanTime = newLastScanTime
228228
} catch {
229229
case e: Exception => logError("Exception in checking for event log updates", e)
230230
}
231231
}
232232

233+
private def getNewLastScanTime(): Long = {
234+
val fileName = "." + UUID.randomUUID().toString
235+
val path = new Path(logDir, fileName)
236+
val fos = fs.create(path)
237+
238+
try {
239+
fos.close()
240+
fs.getFileStatus(path).getModificationTime
241+
} catch {
242+
case e: Exception =>
243+
logError("Exception encountered when attempting to update last scan time", e)
244+
lastScanTime
245+
} finally {
246+
fs.delete(path)
247+
}
248+
}
249+
233250
override def writeEventLogs(
234251
appId: String,
235252
attemptId: Option[String],

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
3838
* is equivalent to one Spark-level shuffle block.
3939
*/
4040
class NettyBlockRpcServer(
41+
appId: String,
4142
serializer: Serializer,
4243
blockManager: BlockDataManager)
4344
extends RpcHandler with Logging {
@@ -55,7 +56,7 @@ class NettyBlockRpcServer(
5556
case openBlocks: OpenBlocks =>
5657
val blocks: Seq[ManagedBuffer] =
5758
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
58-
val streamId = streamManager.registerStream(blocks.iterator.asJava)
59+
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
5960
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
6061
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
6162

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
4949
private[this] var appId: String = _
5050

5151
override def init(blockDataManager: BlockDataManager): Unit = {
52-
val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
52+
val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
5353
var serverBootstrap: Option[TransportServerBootstrap] = None
5454
var clientBootstrap: Option[TransportClientBootstrap] = None
5555
if (authEnabled) {

core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
4141

4242
// In certain join operations, prepare can be called on the same partition multiple times.
4343
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
44-
private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
44+
private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
4545

4646
/**
4747
* Prepare a partition for a single call to compute.

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1666,7 +1666,7 @@ abstract class RDD[T: ClassTag](
16661666
import Utils.bytesToString
16671667

16681668
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
1669-
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1669+
val storageInfo = rdd.context.getRDDStorageInfo(_.id == rdd.id).map(info =>
16701670
" CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
16711671
info.numCachedPartitions, bytesToString(info.memSize),
16721672
bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,6 @@ class DAGScheduler(
11011101
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
11021102
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
11031103
} else {
1104-
11051104
// It is likely that we receive multiple FetchFailed for a single stage (because we have
11061105
// multiple tasks running concurrently on different executors). In that case, it is
11071106
// possible the fetch failure has already been handled by the scheduler.
@@ -1117,6 +1116,11 @@ class DAGScheduler(
11171116
if (disallowStageRetryForTest) {
11181117
abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
11191118
None)
1119+
} else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
1120+
abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
1121+
s"has failed the maximum allowable number of " +
1122+
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
1123+
s"Most recent failure reason: ${failureMessage}", None)
11201124
} else if (failedStages.isEmpty) {
11211125
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
11221126
// in that case the event will already have been scheduled.
@@ -1240,10 +1244,17 @@ class DAGScheduler(
12401244
if (errorMessage.isEmpty) {
12411245
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
12421246
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
1247+
1248+
// Clear failure count for this stage, now that it's succeeded.
1249+
// We only limit consecutive failures of stage attempts,so that if a stage is
1250+
// re-used many times in a long-running job, unrelated failures don't eventually cause the
1251+
// stage to be aborted.
1252+
stage.clearFailures()
12431253
} else {
12441254
stage.latestInfo.stageFailed(errorMessage.get)
12451255
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
12461256
}
1257+
12471258
outputCommitCoordinator.stageEnd(stage.id)
12481259
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
12491260
runningStages -= stage

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite
4646
* be updated for each attempt.
4747
*
4848
*/
49-
private[spark] abstract class Stage(
49+
private[scheduler] abstract class Stage(
5050
val id: Int,
5151
val rdd: RDD[_],
5252
val numTasks: Int,
@@ -92,6 +92,29 @@ private[spark] abstract class Stage(
9292
*/
9393
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
9494

95+
/**
96+
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
97+
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
98+
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
99+
* multiple tasks from the same stage attempt fail (SPARK-5945).
100+
*/
101+
private val fetchFailedAttemptIds = new HashSet[Int]
102+
103+
private[scheduler] def clearFailures() : Unit = {
104+
fetchFailedAttemptIds.clear()
105+
}
106+
107+
/**
108+
* Check whether we should abort the failedStage due to multiple consecutive fetch failures.
109+
*
110+
* This method updates the running set of failed stage attempts and returns
111+
* true if the number of failures exceeds the allowable number of failures.
112+
*/
113+
private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
114+
fetchFailedAttemptIds.add(stageAttemptId)
115+
fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
116+
}
117+
95118
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
96119
def makeNewStageAttempt(
97120
numPartitionsToCompute: Int,
@@ -110,3 +133,8 @@ private[spark] abstract class Stage(
110133
case _ => false
111134
}
112135
}
136+
137+
private[scheduler] object Stage {
138+
// The number of consecutive failures allowed before a stage is aborted
139+
val MAX_CONSECUTIVE_FETCH_FAILURES = 4
140+
}

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator(
260260
fetchRequests ++= Utils.randomize(remoteRequests)
261261

262262
// Send out initial requests for blocks, up to our maxBytesInFlight
263-
while (fetchRequests.nonEmpty &&
264-
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
265-
sendRequest(fetchRequests.dequeue())
266-
}
263+
fetchUpToMaxBytes()
267264

268265
val numFetches = remoteRequests.size - fetchRequests.size
269266
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
@@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator(
296293
case _ =>
297294
}
298295
// Send fetch requests up to maxBytesInFlight
299-
while (fetchRequests.nonEmpty &&
300-
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
301-
sendRequest(fetchRequests.dequeue())
302-
}
296+
fetchUpToMaxBytes()
303297

304298
result match {
305299
case FailureFetchResult(blockId, address, e) =>
@@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator(
315309
}
316310
}
317311

312+
private def fetchUpToMaxBytes(): Unit = {
313+
// Send fetch requests up to maxBytesInFlight
314+
while (fetchRequests.nonEmpty &&
315+
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
316+
sendRequest(fetchRequests.dequeue())
317+
}
318+
}
319+
318320
private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = {
319321
blockId match {
320322
case ShuffleBlockId(shufId, mapId, reduceId) =>

0 commit comments

Comments
 (0)