Skip to content

Commit c3cc277

Browse files
committed
Move TaskMetrics block status update into helper function.
1 parent 41daee8 commit c3cc277

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -804,9 +804,7 @@ private[spark] class BlockManager(
804804
if (tellMaster && info.tellMaster) {
805805
reportBlockStatus(blockId, putBlockStatus)
806806
}
807-
Option(TaskContext.get()).foreach { c =>
808-
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
809-
}
807+
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
810808
}
811809
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
812810
if (level.replication > 1) {
@@ -960,9 +958,7 @@ private[spark] class BlockManager(
960958
if (tellMaster && info.tellMaster) {
961959
reportBlockStatus(blockId, putBlockStatus)
962960
}
963-
Option(TaskContext.get()).foreach { c =>
964-
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
965-
}
961+
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
966962
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
967963
if (level.replication > 1) {
968964
val remoteStartTime = System.currentTimeMillis
@@ -1267,9 +1263,7 @@ private[spark] class BlockManager(
12671263
reportBlockStatus(blockId, status, droppedMemorySize)
12681264
}
12691265
if (blockIsUpdated) {
1270-
Option(TaskContext.get()).foreach { c =>
1271-
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
1272-
}
1266+
addUpdatedBlockStatusToTaskMetrics(blockId, status)
12731267
}
12741268
status.storageLevel
12751269
}
@@ -1310,6 +1304,7 @@ private[spark] class BlockManager(
13101304
logWarning(s"Asked to remove block $blockId, which does not exist")
13111305
case Some(info) =>
13121306
removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
1307+
addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
13131308
}
13141309
}
13151310

@@ -1328,8 +1323,11 @@ private[spark] class BlockManager(
13281323
if (tellMaster) {
13291324
reportBlockStatus(blockId, BlockStatus.empty)
13301325
}
1326+
}
1327+
1328+
private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
13311329
Option(TaskContext.get()).foreach { c =>
1332-
c.taskMetrics().incUpdatedBlockStatuses(blockId -> BlockStatus.empty)
1330+
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
13331331
}
13341332
}
13351333

0 commit comments

Comments
 (0)