Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,10 @@ final class ShuffleBlockFetcherIterator(
hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
} else {
remoteBlockBytes += blockInfos.map(_._2).sum
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
val (_, timeCost) = Utils.timeTakenMs {
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
}
logDebug(s"Collected remote fetch requests for $address in $timeCost ms")
}
}
val numRemoteBlocks = collectedRemoteRequests.map(_.blocks.size).sum
Expand Down Expand Up @@ -435,26 +438,26 @@ final class ShuffleBlockFetcherIterator(
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = {
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = Seq.empty[FetchBlockInfo]
val curBlocks = new ArrayBuffer[FetchBlockInfo]()

while (iterator.hasNext) {
val (blockId, size, mapIndex) = iterator.next()
assertPositiveBlockSize(blockId, size)
curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex))
curBlocks += FetchBlockInfo(blockId, size, mapIndex)
curRequestSize += size
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks, address, isLast = false,
collectedRemoteRequests)
val retBlocks =
createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests)
curBlocks.clear()
curBlocks ++= retBlocks
curRequestSize = curBlocks.map(_.size).sum
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
curBlocks = createFetchRequests(curBlocks, address, isLast = true,
collectedRemoteRequests)
curRequestSize = curBlocks.map(_.size).sum
createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can we skip updating curBlocks and curRequestSize here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, we are leaving this method and no need to update local variables here.

}
}

Expand Down Expand Up @@ -994,7 +997,7 @@ object ShuffleBlockFetcherIterator {
blocks: Seq[FetchBlockInfo],
doBatchFetch: Boolean): Seq[FetchBlockInfo] = {
val result = if (doBatchFetch) {
var curBlocks = new ArrayBuffer[FetchBlockInfo]
val curBlocks = new ArrayBuffer[FetchBlockInfo]
val mergedBlockInfo = new ArrayBuffer[FetchBlockInfo]

def mergeFetchBlockInfo(toBeMerged: ArrayBuffer[FetchBlockInfo]): FetchBlockInfo = {
Expand Down