Skip to content
Prev Previous commit
Next Next commit
pass ArrayBuffer not Seq
  • Loading branch information
yaooqinn committed Jun 25, 2021
commit f8942209f2a00ef32940cd2805474c117fd89d5e
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,15 @@ final class ShuffleBlockFetcherIterator(
}

private def createFetchRequest(
blocks: Seq[FetchBlockInfo],
blocks: ArrayBuffer[FetchBlockInfo],
address: BlockManagerId): FetchRequest = {
logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address "
+ s"with ${blocks.size} blocks")
FetchRequest(address, blocks)
FetchRequest(address, blocks.toSeq)
}

private def createFetchRequests(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, do we really need to modify this method ? (other than a retBlocks = blocks.toSeq part).

curBlocks: Seq[FetchBlockInfo],
curBlocks: ArrayBuffer[FetchBlockInfo],
address: BlockManagerId,
isLast: Boolean,
collectedRemoteRequests: ArrayBuffer[FetchRequest]): ArrayBuffer[FetchBlockInfo] = {
Expand Down Expand Up @@ -436,26 +436,25 @@ final class ShuffleBlockFetcherIterator(
address: BlockManagerId,
blockInfos: Seq[(BlockId, Long, Int)],
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = {
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[FetchBlockInfo]()
val curBlocks = new ArrayBuffer[FetchBlockInfo]()

while (iterator.hasNext) {
val (blockId, size, mapIndex) = iterator.next()
for ((blockId, size, mapIndex) <- blockInfos) {
assertPositiveBlockSize(blockId, size)
curBlocks += FetchBlockInfo(blockId, size, mapIndex)
curRequestSize += size
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests)
val rest = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests)
curBlocks.clear()
curBlocks ++= rest
Copy link
Contributor

@mridulm mridulm Jun 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of clearing it, create new instance ? If code evolves to use curBlocks in createFetchRequests or downstream (as Seq), emptying it from under it would result in surprises.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If code evolves to use curBlocks in createFetchRequests or downstream (as Seq)

I not sure it's a good idea to use an ArrayBuffer as Seq implicitly.

Copy link
Contributor

@mridulm mridulm Jun 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is already doing this for mergeContinuousShuffleBlockIdsIfNeeded ? To be fair, it is done such that the clear should not impact that - but it is better to prevent accidental issues in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is already doing this for mergeContinuousShuffleBlockIdsIfNeeded?
Not by this PR. mergeContinuousShuffleBlockIdsIfNeeded takes both Seq for handling local/host-local blocks and ArrayBuffer for the remotes. This PR modified it to ensure that it always returns a new copy for both doBatchFetch and !doBatchFetch. And before this PR, !doBatchFetch branch might potentially result in surprises.

Copy link
Member Author

@yaooqinn yaooqinn Jun 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is already doing this for mergeContinuousShuffleBlockIdsIfNeeded?

Not by this PR. mergeContinuousShuffleBlockIdsIfNeeded takes both Seq for handling local/host-local blocks and ArrayBuffer for the remotes. This PR modified it to ensure that it always returns a new copy in the ArrayBuffer form for both doBatchFetch and !doBatchFetch. And before this PR, !doBatchFetch branch might potentially result in surprises.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you insist, I can revert the last commit f894220. I do not have a strong opinion on it.

curRequestSize = curBlocks.map(_.size).sum
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
createFetchRequests(curBlocks.toSeq, address, isLast = true, collectedRemoteRequests)
createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can we skip updating curBlocks and curRequestSize here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, we are leaving this method and no need to update local variables here.

}
}

Expand Down Expand Up @@ -989,14 +988,16 @@ object ShuffleBlockFetcherIterator {
*
* @param blocks blocks to be merged if possible. May contains already merged blocks.
* @param doBatchFetch whether to merge blocks.
* @return the input blocks if doBatchFetch=false, or the merged blocks if doBatchFetch=true.
* @return an new copy of the input blocks if doBatchFetch=false,
* or the merged blocks if doBatchFetch=true.
*/
def mergeContinuousShuffleBlockIdsIfNeeded(
blocks: Seq[FetchBlockInfo],
doBatchFetch: Boolean): Seq[FetchBlockInfo] = {
val result = if (doBatchFetch) {
doBatchFetch: Boolean): ArrayBuffer[FetchBlockInfo] = {
val mergedBlockInfo = new ArrayBuffer[FetchBlockInfo]

if (doBatchFetch) {
val curBlocks = new ArrayBuffer[FetchBlockInfo]
val mergedBlockInfo = new ArrayBuffer[FetchBlockInfo]

def mergeFetchBlockInfo(toBeMerged: ArrayBuffer[FetchBlockInfo]): FetchBlockInfo = {
val startBlockId = toBeMerged.head.blockId.asInstanceOf[ShuffleBlockId]
Expand Down Expand Up @@ -1052,11 +1053,10 @@ object ShuffleBlockFetcherIterator {
if (curBlocks.nonEmpty) {
mergedBlockInfo += mergeFetchBlockInfo(curBlocks)
}
mergedBlockInfo
} else {
blocks
mergedBlockInfo ++= blocks
}
result.toSeq
mergedBlockInfo
}

/**
Expand Down