File tree Expand file tree Collapse file tree 2 files changed +3
-2
lines changed
main/scala/org/apache/spark/streaming/scheduler
test/scala/org/apache/spark/streaming Expand file tree Collapse file tree 2 files changed +3
-2
lines changed Original file line number Diff line number Diff line change @@ -116,8 +116,7 @@ private[streaming] class ReceivedBlockTracker(
116116 // a few thousand elements. So we explicitly allocate a collection for serialization which
117117 // we know doesn't have this issue. (See SPARK-26734).
118118 val streamIdToBlocks = streamIds.map { streamId =>
119- (streamId,
120- mutable.ArrayBuffer (getReceivedBlockQueue(streamId).clone(): _* ))
119+ (streamId, mutable.ArrayBuffer (getReceivedBlockQueue(streamId).clone(): _* ))
121120 }.toMap
122121 val allocatedBlocks = AllocatedBlocks (streamIdToBlocks)
123122 if (writeToLog(BatchAllocationEvent (batchTime, allocatedBlocks))) {
Original file line number Diff line number Diff line change @@ -113,6 +113,8 @@ class ReceivedBlockTrackerSuite
113113 BatchAllocationEvent (1 , AllocatedBlocks (Map (streamId -> blockInfos)))
114114 getWrittenLogData() shouldEqual expectedWrittenData1
115115 getWriteAheadLogFiles() should have size 1
116+
117+ receivedBlockTracker.stop()
116118 }
117119
118120 test(" recovery with write ahead logs should remove only allocated blocks from received queue" ) {
You can’t perform that action at this time.
0 commit comments