Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,17 @@ package object config {
.intConf
.createWithDefault(3)

private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's a good idea to do this at reducer side, because there may be a lot of reducers fetching data from one shuffle service at the same time, and you wouldn't know that at reducer side. cc @jinxing64

Copy link
Contributor

@jiangxb1987 jiangxb1987 Jul 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this won't resolve all problems, but it is still good to add the limit to prevent fetching too much blocks from an address at a time.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When shuffle service gets OOM, there are always lots of (thousands of) reducers(maybe from different apps) fetching blocks. I'm not sure if it will help much to limit in-flight blocks from reducer.

Also we've already have maxReqsInFlight and maxBytesInFlight. Is it little bit redundant to to have maxBlocksInFlightPerAddress?

Sorry for this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinxing64 After your fix for lazily loading the open blocks iterator, I am not seeing issues with the NM crashing on my end. However, in cases where a request was being made with a high no. of blocks which were under the max constraints caused increased load. This is an added layer of defense which can mitigate the issue.

.doc("This configuration limits the number of remote blocks being fetched per reduce task" +
" from a given host port. When a large number of blocks are being requested from a given" +
" address in a single fetch or simultaneously, this could crash the serving executor or" +
" Node Manager. This is especially useful to reduce the load on the Node Manager when" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we say shuffle service instead of Node Manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the shuffle service fails it can take down the Node Manager which is more severe and hence i have used it. And in the following sentence i have mentioned the external shuffle. If it is not clear, I am okay to change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Node Manager is for YARN only? Shuffle service is more general

" external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.intConf
.checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
.createWithDefault(Int.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @tgravescs shall we change the default value to 20 or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine leaving it maxvalue for now to not change current behavior just like we have done with some of these other related configs. I would like to get more runtime on this in production and then we can set it later. Perhaps in 2.3, it would be nice to pull this back into branch 2.2 as well master.


private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
.doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -52,6 +52,8 @@ import org.apache.spark.util.io.ChunkedByteBufferOutputStream
* @param streamWrapper A function to wrap the returned input stream.
* @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
* @param maxReqsInFlight max number of remote requests to fetch blocks at any given point.
* @param maxBlocksInFlightPerAddress max number of shuffle blocks being fetched at any given point
* for a given remote host:port.
* @param maxReqSizeShuffleToMem max size (in bytes) of a request that can be shuffled to memory.
* @param detectCorrupt whether to detect any corruption in fetched blocks.
*/
Expand All @@ -64,6 +66,7 @@ final class ShuffleBlockFetcherIterator(
streamWrapper: (BlockId, InputStream) => InputStream,
maxBytesInFlight: Long,
maxReqsInFlight: Int,
maxBlocksInFlightPerAddress: Int,
maxReqSizeShuffleToMem: Long,
detectCorrupt: Boolean)
extends Iterator[(BlockId, InputStream)] with TempShuffleFileManager with Logging {
Expand Down Expand Up @@ -110,12 +113,21 @@ final class ShuffleBlockFetcherIterator(
*/
private[this] val fetchRequests = new Queue[FetchRequest]

/**
* Queue of fetch requests which could not be issued the first time they were dequeued. These
* requests are tried again when the fetch constraints are satisfied.
*/
private[this] val deferredFetchRequests = new HashMap[BlockManagerId, Queue[FetchRequest]]()

/** Current bytes in flight from our requests */
private[this] var bytesInFlight = 0L

/** Current number of requests in flight */
private[this] var reqsInFlight = 0

/** Current number of blocks in flight per host:port */
private[this] val numBlocksInFlightPerAddress = new HashMap[BlockManagerId, Int]()

/**
* The blocks that can't be decompressed successfully, it is used to guarantee that we retry
* at most once for those corrupted blocks.
Expand Down Expand Up @@ -248,7 +260,8 @@ final class ShuffleBlockFetcherIterator(
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)
logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize
+ ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)

// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
Expand Down Expand Up @@ -277,11 +290,13 @@ final class ShuffleBlockFetcherIterator(
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= targetRequestSize) {
if (curRequestSize >= targetRequestSize ||
curBlocks.size >= maxBlocksInFlightPerAddress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have a lot of adjacent fetch requests in the queue, shall we shuffle the request queue before fetching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
logDebug(s"Creating fetch request of $curRequestSize at $address "
+ s"with ${curBlocks.size} blocks")
curBlocks = new ArrayBuffer[(BlockId, Long)]
logDebug(s"Creating fetch request of $curRequestSize at $address")
curRequestSize = 0
}
}
Expand Down Expand Up @@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
result match {
case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this earlier? e.g. right after the fetch result is enqueued to results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. Infact, we could also move the other bookkeeping stuff right after the fetch result is enqueued.

I would also want to look at the initialization of the BlockFetchingListener to see the effects of this as it would increase the size of the closure. Can we have a separate JIRA filed for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
Expand Down Expand Up @@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
}

private def fetchUpToMaxBytes(): Unit = {
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 ||
(reqsInFlight + 1 <= maxReqsInFlight &&
bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
sendRequest(fetchRequests.dequeue())
// Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote host
// immediately, defer the request until the next time it can be processed.

// Process any outstanding deferred fetch requests if possible.
if (deferredFetchRequests.nonEmpty) {
for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the traffic is heavy, it may takes some time to finish the iterator, do you have any idea on reducing the effort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get you. We are just iterating to check if there are requests that can be scheduled. And they are handled asynchronously by the send calls. What effort are you addressing to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking maybe we can avoid adding extra deferredFetchRequests to handle deferred fetch requests, instead we can iterator over the fetchRequests to send the requests that are not maxed out, this way we may simplify the logic. Would you like to try?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes. That was the first choice and I gave it a try to avoid adding any extra bookkeeping. The are issues with that approach. Say you have a request which has to be deferred. You just remove it and push at the end and continue.

  • This is good as far as you don't meet the deferred request again.
  • Now if you meet the deferred request again, it may or may not be schedulable based on whether the remote finished processing earlier request. This would lead going up in circles (wasted effort). To avoid this we have to know when to stop. We would have to keep a marker for request which was already deferred so that we know we have to stop. But this marker would be only for a single request which corresponds to one remote. In the meanwhile other remotes could have finished processing their earlier requests and we can schedule requests to them. For this we can no longer stop at the first marker for a single address. We would have to check the requests again.

This makes it more complicated than scheduling all that's possible in a single shot and deferring what it encounters on its way. The next time we try to clear any backlog from previous run and after doing so proceed normally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, make sense.

while (isRemoteBlockFetchable(defReqQueue) &&
!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the request.blocks.size is above the config value, then isRemoteAddressMaxedOut() will always return false, and thus we won't exit the while loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check the no. of blocks being added to a fetch request. If it is larger than the configured no. we create a new request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val request = defReqQueue.dequeue()
logDebug(s"Processing deferred fetch request for $remoteAddress with "
+ s"${request.blocks.length} blocks")
send(remoteAddress, request)
if (defReqQueue.isEmpty) {
deferredFetchRequests -= remoteAddress
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can leave the empty queue here, as we may still have fetch requests to put in this queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have to unnecessarily iterate through the map for all the block manager ids for which we deferred fetch requests at an earlier point to check if they have any pending fetch requests when they don't.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see

}
}
}
}

// Process any regular fetch requests if possible.
while (isRemoteBlockFetchable(fetchRequests)) {
val request = fetchRequests.dequeue()
val remoteAddress = request.address
if (isRemoteAddressMaxedOut(remoteAddress, request)) {
logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks")
val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]())
defReqQueue.enqueue(request)
deferredFetchRequests(remoteAddress) = defReqQueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the defReqQueue is mutable, so we don't need to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is the first time that we want to defer a request, defReqQueue has to be associated with its corresponding 'remoteAddress

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right

} else {
send(remoteAddress, request)
}
}

def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
sendRequest(request)
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
}

def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
fetchReqQueue.nonEmpty &&
(bytesInFlight == 0 ||
(reqsInFlight + 1 <= maxReqsInFlight &&
bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
}

// Checks if sending a new fetch request will exceed the max no. of blocks being fetched from a
// given remote address.
def isRemoteAddressMaxedOut(remoteAddress: BlockManagerId, request: FetchRequest): Boolean = {
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size >
maxBlocksInFlightPerAddress
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
48 * 1024 * 1024,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true)

// 3 local blocks fetched in initialization
Expand Down Expand Up @@ -187,6 +188,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
48 * 1024 * 1024,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true)

verify(blocks(ShuffleBlockId(0, 0, 0)), times(0)).release()
Expand Down Expand Up @@ -254,6 +256,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
48 * 1024 * 1024,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true)

// Continue only after the mock calls onBlockFetchFailure
Expand Down Expand Up @@ -319,6 +322,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
48 * 1024 * 1024,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true)

// Continue only after the mock calls onBlockFetchFailure
Expand Down Expand Up @@ -400,6 +404,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
48 * 1024 * 1024,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
false)

// Continue only after the mock calls onBlockFetchFailure
Expand Down Expand Up @@ -457,6 +462,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
(_, in) => in,
maxBytesInFlight = Int.MaxValue,
maxReqsInFlight = Int.MaxValue,
maxBlocksInFlightPerAddress = Int.MaxValue,
maxReqSizeShuffleToMem = 200,
detectCorrupt = true)
}
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.reducer.maxBlocksInFlightPerAddress</code></td>
<td>Int.MaxValue</td>
<td>
This configuration limits the number of remote blocks being fetched per reduce task from a
given host port. When a large number of blocks are being requested from a given address in a
single fetch or simultaneously, this could crash the serving executor or Node Manager. This
is especially useful to reduce the load on the Node Manager when external shuffle is enabled.
You can mitigate this issue by setting it to a lower value.
</td>
<td><code>spark.reducer.maxReqSizeShuffleToMem</code></td>
<td>Long.MaxValue</td>
<td>
Expand Down