Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f5f2b47
update page
CodingCat Oct 17, 2014
13eaa23
send broadcastInfo in blockManager heartbeat
CodingCat Oct 20, 2014
4b6efc4
including broadcast variable mem usage in executor page
CodingCat Oct 21, 2014
a41e59a
separate broadcast var from nonRddBlocks
CodingCat Oct 21, 2014
6e5da21
storagePage
CodingCat Oct 22, 2014
d64f984
code clean
CodingCat Oct 22, 2014
101f886
fix compile error
CodingCat Oct 22, 2014
8dba568
fix compile error
CodingCat Oct 22, 2014
3bb21c4
remove Heartbeat piggyback mechanism and add async UpdateBlockInfo
CodingCat Oct 29, 2014
fc6c397
add broadcast info
CodingCat Oct 29, 2014
61baafb
update page
Oct 17, 2014
947c880
send broadcastInfo in blockManager heartbeat
Oct 20, 2014
d69f098
report broadcast var in UI
CodingCat Oct 17, 2014
0048743
fix compile issue
CodingCat Nov 2, 2014
35e30ad
code clean
CodingCat Nov 2, 2014
815a1a6
scala style fix
CodingCat Nov 2, 2014
68a75f8
fix test case
CodingCat Nov 2, 2014
2d356ef
fix test cases
CodingCat Nov 2, 2014
9bcaa64
fix livelistenerBus issue
CodingCat Nov 3, 2014
7167b4e
make mima happy
CodingCat Nov 3, 2014
77d836c
remove unnecessary imports
CodingCat Nov 3, 2014
91bb357
fix compile issue
CodingCat Jan 20, 2015
8e34fd1
fix compile issue
CodingCat Feb 12, 2015
98f9dbd
compilation issue
CodingCat Feb 12, 2015
044ecef
fix the bug when updating non-broadcast variable
CodingCat Feb 13, 2015
20a4996
remove unnecessary variable
CodingCat Feb 13, 2015
57bb0ca
tighten the methods implementation in StorageUtils and RDD page & Bro…
CodingCat Feb 16, 2015
4b90add
get rid of mutable variable
CodingCat Feb 16, 2015
b1d40bc
add license info
CodingCat Feb 16, 2015
aa1ca79
apply event filter
CodingCat Feb 16, 2015
761fbc8
implement serialization and deserialization of BlockUpdateEvent
CodingCat Feb 16, 2015
e47843e
addressed the failed test cases
CodingCat Feb 16, 2015
1c03ce5
address failed test cases
CodingCat Feb 16, 2015
65c6aef
address the comments except filter one
CodingCat Feb 17, 2015
b8e5827
move filter logic to EventLoggingListener
CodingCat Feb 17, 2015
2ea8ee3
fix rebase mistake
CodingCat Mar 12, 2015
223cdfb
fix rebase mistake
CodingCat Mar 12, 2015
26977c9
address comments except merging case one
CodingCat Mar 13, 2015
49ee118
display storage level of broadcast partitions
CodingCat Mar 13, 2015
9b1bccf
revise the stuffs
CodingCat Mar 14, 2015
1aec3a8
stylistic fix
CodingCat Mar 15, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,7 @@

package org.apache.spark;

import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
import org.apache.spark.scheduler.*;

/**
* Java clients should extend this class instead of implementing
Expand Down Expand Up @@ -94,4 +78,7 @@ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }

@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }

@Override
public void onBlockUpdate(SparkListenerBlockUpdate blockUpdated) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}

@Override
public final void onBlockUpdate(SparkListenerBlockUpdate blockUpdate) {
onEvent(blockUpdate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
tellMaster = true)
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
Expand Down Expand Up @@ -179,7 +179,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
obj
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
logEvent(event, flushLogger = true)
override def onBlockUpdate(event: SparkListenerBlockUpdate) = {
// we only log Broadcast block update for now, as RDD blocks have been logged as part of
// StateCompleted event
if (event.blockId.isBroadcast) {
logEvent(event, flushLogger = true)
}
}

// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[spark] class LiveListenerBus
with SparkListenerBus {

private val logDroppedEvent = new AtomicBoolean(false)

override def onDropEvent(event: SparkListenerEvent): Unit = {
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
Expand All @@ -42,5 +42,4 @@ private[spark] class LiveListenerBus
"the rate at which tasks are being started by the scheduler.")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.storage.{BlockStatus, BlockId, BlockManagerId}
import org.apache.spark.util.{Distribution, Utils}

@DeveloperApi
Expand Down Expand Up @@ -68,6 +68,9 @@ case class SparkListenerJobStart(
val stageIds: Seq[Int] = stageInfos.map(_.stageId)
}

case class SparkListenerBlockUpdate(blockManagerId: BlockManagerId, blockId: BlockId,
blockStatus: BlockStatus) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerJobEnd(
jobId: Int,
Expand Down Expand Up @@ -210,6 +213,11 @@ trait SparkListener {
* Called when the driver removes an executor.
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }

/**
* Called when the driver receives UpdateBlock from an BlockManagerSlaveActor.
*/
def onBlockUpdate(blockUpdateEvent: SparkListenerBlockUpdate) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case logStart: SparkListenerLogStart => // ignore event log metadata
case blockUpdate: SparkListenerBlockUpdate =>
listener.onBlockUpdate(blockUpdate)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ private[spark] class TaskSchedulerImpl(
execId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId): Boolean = {

val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
taskMetrics.flatMap { case (id, metrics) =>
taskIdToTaskSetId.get(id)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sealed abstract class BlockId {

// convenience methods
def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def asBroadcastId = if (isBroadcast) Some(asInstanceOf[BroadcastBlockId]) else None
def isRDD = isInstanceOf[RDDBlockId]
def isShuffle = isInstanceOf[ShuffleBlockId]
def isBroadcast = isInstanceOf[BroadcastBlockId]
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,14 @@ private[spark] class BlockManager(
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
def ifAsync: Boolean = {
blockId.isBroadcast
}
// report broadcast blocks asynchronously and rdd blocks synchronously ,
// in future we only need to change the implementation of ifAsync when we deal with
// other types of blocks
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize,
async = ifAsync)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
Expand All @@ -375,14 +382,15 @@ private[spark] class BlockManager(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
droppedMemorySize: Long = 0L,
async: Boolean = false): Boolean = {
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val inTachyonSize = status.tachyonSize
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize, async)
} else {
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ class BlockManagerMaster(

/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
askDriverExpectingTrue(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Register the BlockManager's id with the driver. */
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
askDriverExpectingTrue(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
logInfo("Registered BlockManager")
}

Expand All @@ -58,10 +58,18 @@ class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long): Boolean = {
val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logDebug(s"Updated info of block $blockId")
tachyonSize: Long,
async: Boolean): Boolean = {
val res = {
if (!async) {
askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
} else {
tell(UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
true
}
}
logInfo("Updated info of block " + blockId + " storageLevel:" + storageLevel)
res
}

Expand Down Expand Up @@ -200,19 +208,26 @@ class BlockManagerMaster(
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null && isDriver) {
tell(StopBlockManagerMaster)
askDriverExpectingTrue(StopBlockManagerMaster)
driverActor = null
logInfo("BlockManagerMaster stopped")
}
}

/** Send a one-way message to the master actor, to which we expect it to reply with true. */
private def tell(message: Any) {
/** Send a message to the master actor, to which we expect it to reply with true. */
private def askDriverExpectingTrue(message: Any) {
if (!askDriverWithReply[Boolean](message)) {
throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
}

/**
* Send a message to the master actor in a fire-and-forget manner
*/
private def tell(message: Any): Unit = {
driverActor ! message
}

/**
* Send a message to the driver actor and get its result within a default timeout, or
* throw a SparkException if this fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
return true
}

blockManagerInfo(blockManagerId).updateBlockInfo(
val updatedBlockStatus = blockManagerInfo(blockManagerId).updateBlockInfo(
blockId, storageLevel, memSize, diskSize, tachyonSize)
if (updatedBlockStatus != null) {
listenerBus.post(SparkListenerBlockUpdate(blockManagerId, blockId, updatedBlockStatus))
}

var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
Expand Down Expand Up @@ -427,15 +430,14 @@ private[spark] class BlockManagerInfo(
_lastSeenMs = System.currentTimeMillis()
}

def updateBlockInfo(
private[storage] def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long) {
tachyonSize: Long): BlockStatus = {

updateLastSeenMs()

if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
Expand Down Expand Up @@ -471,9 +473,10 @@ private[spark] class BlockManagerInfo(
logInfo("Added %s on tachyon on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
}
_blocks.get(blockId)
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val blockStatus: BlockStatus = _blocks.get(blockId)
val blockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
if (blockStatus.storageLevel.useMemory) {
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
Expand All @@ -488,6 +491,9 @@ private[spark] class BlockManagerInfo(
logInfo("Removed %s on %s on tachyon (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
}
BlockStatus(storageLevel, 0, 0, 0)
} else {
null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

trait StorageObjectInfo

@DeveloperApi
class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
var storageLevel: StorageLevel)
extends Ordered[RDDInfo] {
var storageLevel: StorageLevel) extends Ordered[RDDInfo] with StorageObjectInfo {

var numCachedPartitions = 0
var memSize = 0L
Expand Down Expand Up @@ -55,3 +56,33 @@ private[spark] object RDDInfo {
new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
}
}

@DeveloperApi
class BroadcastInfo(
val id: Long,
val name: String,
var storageLevel: StorageLevel) extends Ordered[BroadcastInfo] with StorageObjectInfo {

var memSize = 0L
var diskSize = 0L
var tachyonSize = 0L

override def toString = {
import Utils.bytesToString
("%s\" (%d) StorageLevel: %s; " +
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel, bytesToString(memSize), bytesToString(tachyonSize),
bytesToString(diskSize))
}

override def compare(that: BroadcastInfo): Int = {
if (this.id > that.id) {
1
} else if (this.id == that.id) {
0
} else {
-1
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class StorageStatusListener extends SparkListener {

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
executorIdToStorageStatus.get(formatExecutorId(execId)).foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
if (updatedStatus.storageLevel == StorageLevel.NONE) {
storageStatus.removeBlock(blockId)
Expand Down Expand Up @@ -88,4 +88,23 @@ class StorageStatusListener extends SparkListener {
}
}

override def onBlockUpdate(blockUpdateEvent: SparkListenerBlockUpdate) = synchronized {
val executorId = blockUpdateEvent.blockManagerId.executorId
// we only log Broadcast block update for now, as RDD blocks have been logged as part of
// StateCompleted event
if (blockUpdateEvent.blockId.isBroadcast) {
updateStorageStatus(executorId, Seq((blockUpdateEvent.blockId,
blockUpdateEvent.blockStatus)))
}
}

/**
* In the local mode, there is a discrepancy between the executor ID according to the
* task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this
* results in duplicate rows for the same executor. Thus, in this mode, we aggregate
* these two rows and use the executor ID of "<driver>" to be consistent.
*/
def formatExecutorId(execId: String): String = {
if (execId == "localhost") "<driver>" else execId
}
}
Loading