Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,14 @@ public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
}

/**
* The threshold for number of IOExceptions while merging shuffle blocks to a shuffle partition.
* When the number of IOExceptions while writing to merged shuffle data/index/meta file exceed
* this threshold then the shuffle server will respond back to client to stop pushing shuffle
* blocks for this shuffle partition.
*/
public int ioExceptionsThresholdDuringMerge() {
return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ class BlockPushErrorHandler implements ErrorHandler {
public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
"Couldn't find an opportunity to write block";

/**
* String constant used for generating exception messages indicating the server encountered
* IOExceptions multiple times, greater than the configured threshold, while trying to merged
* shuffle blocks of the same shuffle partition. When the client receives this this response,
* it will stop pushing any more blocks for the same shuffle partition.
*/
public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
"IOExceptions exceeded the threshold";

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time out or a connection closed exception, no need to retry.
Expand Down
Loading