Skip to content

Commit 7fedc5a

Browse files
Merge remote-tracking branch 'asf/master' into spark-sink-test
2 parents abc20cb + 6a13dca commit 7fedc5a

File tree

78 files changed

+1977
-1262
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1977
-1262
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6666

6767
/**
6868
* Whether the cleaning thread will block on cleanup tasks.
69-
* This is set to true only for tests.
69+
*
70+
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
71+
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
72+
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
73+
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
74+
* longer in scope.
7075
*/
7176
private val blockOnCleanupTasks = sc.conf.getBoolean(
72-
"spark.cleaner.referenceTracking.blocking", false)
77+
"spark.cleaner.referenceTracking.blocking", true)
7378

7479
@volatile private var stopped = false
7580

@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
174179
private def blockManagerMaster = sc.env.blockManager.master
175180
private def broadcastManager = sc.env.broadcastManager
176181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
177-
178-
// Used for testing. These methods explicitly blocks until cleanup is completed
179-
// to ensure that more reliable testing.
180182
}
181183

182184
private object ContextCleaner {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
210210
"MapOutputTracker",
211211
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
212212

213+
// Let the user specify short names for shuffle managers
214+
val shortShuffleMgrNames = Map(
215+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
216+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
217+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
218+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
219+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
220+
221+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
222+
213223
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
214224
"BlockManagerMaster",
215225
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
216226

217227
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
218-
serializer, conf, securityManager, mapOutputTracker)
228+
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
219229

220230
val connectionManager = blockManager.connectionManager
221231

@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
250260
"."
251261
}
252262

253-
// Let the user specify short names for shuffle managers
254-
val shortShuffleMgrNames = Map(
255-
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256-
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258-
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
260-
261-
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
262-
263263
// Warn about deprecated spark.cache.class property
264264
if (conf.contains("spark.cache.class")) {
265265
logWarning("The spark.cache.class property is no longer being used! Specify storage " +

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
315315
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
316316
}
317317

318+
def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
319+
val file = new DataInputStream(new FileInputStream(filename))
320+
val length = file.readInt()
321+
val obj = new Array[Byte](length)
322+
file.readFully(obj)
323+
sc.broadcast(obj)
324+
}
325+
318326
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
319327
// The right way to implement this would be to use TypeTags to get the full
320328
// type of T. Since I don't want to introduce breaking changes throughout the

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ private[spark] class Worker(
7272
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
7373

7474
val testing: Boolean = sys.props.contains("spark.testing")
75-
val masterLock: Object = new Object()
7675
var master: ActorSelection = null
7776
var masterAddress: Address = null
7877
var activeMasterUrl: String = ""
@@ -145,18 +144,16 @@ private[spark] class Worker(
145144
}
146145

147146
def changeMaster(url: String, uiUrl: String) {
148-
masterLock.synchronized {
149-
activeMasterUrl = url
150-
activeMasterWebUiUrl = uiUrl
151-
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
152-
masterAddress = activeMasterUrl match {
153-
case Master.sparkUrlRegex(_host, _port) =>
154-
Address("akka.tcp", Master.systemName, _host, _port.toInt)
155-
case x =>
156-
throw new SparkException("Invalid spark URL: " + x)
157-
}
158-
connected = true
147+
activeMasterUrl = url
148+
activeMasterWebUiUrl = uiUrl
149+
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
150+
masterAddress = activeMasterUrl match {
151+
case Master.sparkUrlRegex(_host, _port) =>
152+
Address("akka.tcp", Master.systemName, _host, _port.toInt)
153+
case x =>
154+
throw new SparkException("Invalid spark URL: " + x)
159155
}
156+
connected = true
160157
}
161158

162159
def tryRegisterAllMasters() {
@@ -199,9 +196,7 @@ private[spark] class Worker(
199196
}
200197

201198
case SendHeartbeat =>
202-
masterLock.synchronized {
203-
if (connected) { master ! Heartbeat(workerId) }
204-
}
199+
if (connected) { master ! Heartbeat(workerId) }
205200

206201
case WorkDirCleanup =>
207202
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
@@ -244,27 +239,21 @@ private[spark] class Worker(
244239
manager.start()
245240
coresUsed += cores_
246241
memoryUsed += memory_
247-
masterLock.synchronized {
248-
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
249-
}
242+
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
250243
} catch {
251244
case e: Exception => {
252245
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
253246
if (executors.contains(appId + "/" + execId)) {
254247
executors(appId + "/" + execId).kill()
255248
executors -= appId + "/" + execId
256249
}
257-
masterLock.synchronized {
258-
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
259-
}
250+
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
260251
}
261252
}
262253
}
263254

264255
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
265-
masterLock.synchronized {
266-
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
267-
}
256+
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
268257
val fullId = appId + "/" + execId
269258
if (ExecutorState.isFinished(state)) {
270259
executors.get(fullId) match {
@@ -330,9 +319,7 @@ private[spark] class Worker(
330319
case _ =>
331320
logDebug(s"Driver $driverId changed state to $state")
332321
}
333-
masterLock.synchronized {
334-
master ! DriverStateChanged(driverId, state, exception)
335-
}
322+
master ! DriverStateChanged(driverId, state, exception)
336323
val driver = drivers.remove(driverId).get
337324
finishedDrivers(driverId) = driver
338325
memoryUsed -= driver.driverDesc.mem

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ private[spark] class Executor(
9999
private val urlClassLoader = createClassLoader()
100100
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
101101

102+
// Set the classloader for serializer
103+
env.serializer.setDefaultClassLoader(urlClassLoader)
104+
102105
// Akka's message frame size. If task result is bigger than this, we use the block manager
103106
// to send the result back.
104107
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.nio._
2222
import java.nio.channels._
2323
import java.nio.channels.spi._
2424
import java.net._
25+
import java.util.{Timer, TimerTask}
2526
import java.util.concurrent.atomic.AtomicInteger
2627

2728
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
@@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
6162
var ackMessage: Option[Message] = None
6263

6364
def markDone(ackMessage: Option[Message]) {
64-
this.synchronized {
65-
this.ackMessage = ackMessage
66-
completionHandler(this)
67-
}
65+
this.ackMessage = ackMessage
66+
completionHandler(this)
6867
}
6968
}
7069

7170
private val selector = SelectorProvider.provider.openSelector()
71+
private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
7272

7373
// default to 30 second timeout waiting for authentication
7474
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
75+
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
7576

7677
private val handleMessageExecutor = new ThreadPoolExecutor(
7778
conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
652653
}
653654
}
654655
if (bufferMessage.hasAckId()) {
655-
val sentMessageStatus = messageStatuses.synchronized {
656+
messageStatuses.synchronized {
656657
messageStatuses.get(bufferMessage.ackId) match {
657658
case Some(status) => {
658659
messageStatuses -= bufferMessage.ackId
659-
status
660+
status.markDone(Some(message))
660661
}
661662
case None => {
662-
throw new Exception("Could not find reference for received ack message " +
663-
message.id)
663+
/**
664+
* We can fall down on this code because of following 2 cases
665+
*
666+
* (1) Invalid ack sent due to buggy code.
667+
*
668+
* (2) Late-arriving ack for a SendMessageStatus
669+
* To avoid unwilling late-arriving ack
670+
* caused by long pause like GC, you can set
671+
* larger value than default to spark.core.connection.ack.wait.timeout
672+
*/
673+
logWarning(s"Could not find reference for received ack Message ${message.id}")
664674
}
665675
}
666676
}
667-
sentMessageStatus.markDone(Some(message))
668677
} else {
669678
var ackMessage : Option[Message] = None
670679
try {
@@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
836845
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
837846
: Future[Message] = {
838847
val promise = Promise[Message]()
848+
849+
val timeoutTask = new TimerTask {
850+
override def run(): Unit = {
851+
messageStatuses.synchronized {
852+
messageStatuses.remove(message.id).foreach ( s => {
853+
promise.failure(
854+
new IOException(s"sendMessageReliably failed because ack " +
855+
"was not received within ${ackTimeout} sec"))
856+
})
857+
}
858+
}
859+
}
860+
839861
val status = new MessageStatus(message, connectionManagerId, s => {
862+
timeoutTask.cancel()
840863
s.ackMessage match {
841-
case None => // Indicates a failure where we either never sent or never got ACK'd
864+
case None => // Indicates a failure where we either never sent or never got ACK'd
842865
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
843866
case Some(ackMessage) =>
844867
if (ackMessage.hasError) {
@@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
852875
messageStatuses.synchronized {
853876
messageStatuses += ((message.id, status))
854877
}
878+
879+
ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
855880
sendMessage(connectionManagerId, message)
856881
promise.future
857882
}
@@ -861,6 +886,7 @@ private[spark] class ConnectionManager(
861886
}
862887

863888
def stop() {
889+
ackTimeoutMonitor.cancel()
864890
selectorThread.interrupt()
865891
selectorThread.join()
866892
selector.close()

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
9595
* If the elements in RDD do not vary (max == min) always returns a single bucket.
9696
*/
9797
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
98-
// Compute the minimum and the maxium
98+
// Scala's built-in range has issues. See #SI-8782
99+
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
100+
val span = max - min
101+
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
102+
}
103+
// Compute the minimum and the maximum
99104
val (max: Double, min: Double) = self.mapPartitions { items =>
100105
Iterator(items.foldRight(Double.NegativeInfinity,
101106
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
107112
throw new UnsupportedOperationException(
108113
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
109114
}
110-
val increment = (max-min)/bucketCount.toDouble
111-
val range = if (increment != 0) {
112-
Range.Double.inclusive(min, max, increment)
115+
val range = if (min != max) {
116+
// Range.Double.inclusive(min, max, increment)
117+
// The above code doesn't always work. See Scala bug #SI-8782.
118+
// https://issues.scala-lang.org/browse/SI-8782
119+
customRange(min, max, bucketCount)
113120
} else {
114121
List(min, min)
115122
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ private[spark] class EventLoggingListener(
127127
logEvent(event, flushLogger = true)
128128
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
129129
logEvent(event, flushLogger = true)
130+
// No-op because logging every update would be overkill
131+
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
130132

131133
/**
132134
* Stop logging events.

core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,32 +63,35 @@ extends DeserializationStream {
6363
def close() { objIn.close() }
6464
}
6565

66-
private[spark] class JavaSerializerInstance(counterReset: Int) extends SerializerInstance {
67-
def serialize[T: ClassTag](t: T): ByteBuffer = {
66+
67+
private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader)
68+
extends SerializerInstance {
69+
70+
override def serialize[T: ClassTag](t: T): ByteBuffer = {
6871
val bos = new ByteArrayOutputStream()
6972
val out = serializeStream(bos)
7073
out.writeObject(t)
7174
out.close()
7275
ByteBuffer.wrap(bos.toByteArray)
7376
}
7477

75-
def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
78+
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
7679
val bis = new ByteBufferInputStream(bytes)
7780
val in = deserializeStream(bis)
78-
in.readObject().asInstanceOf[T]
81+
in.readObject()
7982
}
8083

81-
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
84+
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
8285
val bis = new ByteBufferInputStream(bytes)
8386
val in = deserializeStream(bis, loader)
84-
in.readObject().asInstanceOf[T]
87+
in.readObject()
8588
}
8689

87-
def serializeStream(s: OutputStream): SerializationStream = {
90+
override def serializeStream(s: OutputStream): SerializationStream = {
8891
new JavaSerializationStream(s, counterReset)
8992
}
9093

91-
def deserializeStream(s: InputStream): DeserializationStream = {
94+
override def deserializeStream(s: InputStream): DeserializationStream = {
9295
new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader)
9396
}
9497

@@ -109,7 +112,10 @@ private[spark] class JavaSerializerInstance(counterReset: Int) extends Serialize
109112
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
110113
private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
111114

112-
def newInstance(): SerializerInstance = new JavaSerializerInstance(counterReset)
115+
override def newInstance(): SerializerInstance = {
116+
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
117+
new JavaSerializerInstance(counterReset, classLoader)
118+
}
113119

114120
override def writeExternal(out: ObjectOutput) {
115121
out.writeInt(counterReset)

0 commit comments

Comments
 (0)