Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fail pending blocks before writing when number of IOExceptions have e…
…xceeded threshold
  • Loading branch information
otterc committed Dec 9, 2020
commit 0bce63198e2e5304dd31aa39602f16d9d7e90af4
Original file line number Diff line number Diff line change
Expand Up @@ -526,21 +526,17 @@ private boolean isDuplicateBlock() {
* This is only invoked when the stream is able to write. The stream first writes any deferred
* block parts buffered in memory.
*/
private void writeAnyDeferredBufs() throws IOException {
if (deferredBufs != null && !deferredBufs.isEmpty()) {
for (ByteBuffer deferredBuf : deferredBufs) {
writeBuf(deferredBuf);
}
deferredBufs = null;
private void writeDeferredBufs() throws IOException {
for (ByteBuffer deferredBuf : deferredBufs) {
writeBuf(deferredBuf);
}
deferredBufs = null;
}

/**
* This throws RuntimeException which will abort the merge of a particular shuffle partition.
* This throws RuntimeException if the number of IOExceptions have exceeded threshold.
*/
private void incrementIOExceptionsAndAbortIfNecessary() {
// Update the count of IOExceptions
partitionInfo.incrementIOExceptions();
private void abortIfNecessary() {
if (partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
deferredBufs = null;
throw new RuntimeException(String.format("%s when merging %s",
Expand All @@ -549,6 +545,16 @@ private void incrementIOExceptionsAndAbortIfNecessary() {
}
}

/**
* This increments the number of IOExceptions and throws RuntimeException if it exceeds the
* threshold which will abort the merge of a particular shuffle partition.
*/
private void incrementIOExceptionsAndAbortIfNecessary() {
// Update the count of IOExceptions
partitionInfo.incrementIOExceptions();
abortIfNecessary();
}

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
// When handling the block data using StreamInterceptor, it can help to reduce the amount
Expand Down Expand Up @@ -586,6 +592,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
deferredBufs = null;
return;
}
abortIfNecessary();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should invoke this here.
Invoking inside onComplete should be sufficient.

Copy link
Contributor

@Victsm Victsm Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purpose is to only do this check once per block instead of once per buf and avoid throwing unnecessary exceptions from onData which leads to channel close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Victsm This change is to address this comment:
#30433 (comment)

The scenario is that there can be pending streams which are waiting on the lock for the partitionInfo and meanwhile the exception threshold has met. When the pending stream acquires the lock it will attempt to write to the data file even though the exception threshold is reached.

I have added the UT testPendingBlockIsAbortedImmediately to verify this.

Purpose is to only do this check once per block instead of once per buf and avoid throwing unnecessary exceptions from onData which leads to channel close.

We already throw IOExceptions when write to data file fails. I don't see how throwing IOException exceeded threshold makes it any worse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I see the issue mentioned in that comment is that we are not preventing new blocks to be merged when the IOException threshold is reached.
To do that, we only need to invoke abortIfNecessary inside onComplete, whether we still have any deferredBuf to write at that point.
This way, for normal case without IOException, we are only invoking abortIfNecessary once per block.
By invoking it here, we would invoke it once per buf for normal case.

Of course, if we only check inside onComplete, we would delay rejection of these pending blocks until we reach their stream's end.
I think this is a reasonable tradeoff to make, considering that majority of the time the code is executing for normal case instead of the exception case.

Copy link
Contributor Author

@otterc otterc Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. I think the intention is to not have the server attempt writing if the threshold is reached for the partition. Probably this check here makes this behavior more accurate.

However, I don't have a strong opinion on this since the assumption is that if these number of IOExceptions have already reached the threshold, any further writes will result in an IOException as well. With that assumption, the write in onData after this threshold is met, will very likely throw IOException as well and since the threshold is already met, the server will instead throw IOExceptions exceeded threshold.

I can remove it from onData. @Ngone51 What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 @mridulm , by throwing exception inside onComplete, we are still effectively rejecting any new blocks from being successfully appended to the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had an offline discussion with @mridulm and @otterc , the concern about closing channels with throwing exception mid-stream seems negligible since it only happens after reaching the max IOException count.
Calling abortIfNecessary inside onData should also have negligible performance implications.
With those, I think it should be fine to keep this part of the code as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering does the server closes the channel or client would stop streaming the remaining data when we throw the exception in onData. Otherwise, we'd receive the following data from the client and throw the exception for multiple times.

(I didn't find anywhere we close the channel or the client may have a chance to stop streaming. I may miss it somewhere.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 I think that's an existing behavior in Spark with how it uses StreamInterceptor to stream data coming from a RPC message that's out of the frame.
This is also documented with SPARK-6237 in #21346:

* An error while reading data from the stream
* ({@link org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
* will fail the entire channel. A failure in "post-processing" the stream in
* {@link org.apache.spark.network.client.StreamCallback#onComplete(String)} will result in an
* rpcFailure, but the channel will remain active.

It is the server which closes the channel if exception is thrown from onData.
Once an exception gets thrown from onData while StreamInterceptor hasn't finished processing all the out of frame bytes for a given RPC message, the TransportFrameDecoder will no longer be able to successfully decode following RPC messages from this channel.
Thus, the server needs to close the channel at this point.
Once the channel gets closed, the client will no longer be able to transfer any more data to the server using the same channel.
The connection needs to be reestablished at this time, resetting state on the client side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client does receive the exception thrown from onData. I simulated an exception from onData at the server for a particular push block.
Below are the logs of the client.
Note: I am running an older version of magnet which still uses ShuffleBlockId for shuffle push and some other classes are old.

20/12/11 19:14:48 INFO TransportClientFactory: Successfully created connection to ltx1-hcl3213.grid.linkedin.com/10.150.24.33:7337 after 12 ms (5 ms spent in bootstraps)
20/12/11 19:14:56 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_1_7_7, and will not retry (1 retries)
org.apache.spark.network.shuffle.BlockPushException: ^H^@^@^@^_application_1602506816280_53624^@^@^@
shuffle_1_7_7^@^@^@^Gjava.io.IOException: Destination failed while reading stream
        at org.apache.spark.network.server.TransportRequestHandler$3.onFailure(TransportRequestHandler.java:244)
        at org.apache.spark.network.client.StreamInterceptor.exceptionCaught(StreamInterceptor.java:58)
        at org.apache.spark.network.util.TransportFrameDecoder.exceptionCaught(TransportFrameDecoder.java:188)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:850)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:364)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: FAILING this stream

After this, the client logs show that the server has terminated the connection.

20/12/11 19:14:57 WARN TransportChannelHandler: Exception in connection from ltx1-hcl3412.grid.linkedin.com/10.150.55.78:7337
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
20/12/11 19:14:57 ERROR TransportResponseHandler: Still have 38 requests outstanding when connection from ltx1-hcl3412.grid.linkedin.com/10.150.55.78:7337 is closed

logger.trace("{} shuffleId {} reduceId {} onData writable",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId);
Expand All @@ -596,7 +603,9 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
// If we got here, it's safe to write the block data to the merged shuffle file. We
// first write any deferred block.
try {
writeAnyDeferredBufs();
if (deferredBufs != null && !deferredBufs.isEmpty()) {
writeDeferredBufs();
}
writeBuf(buf);
} catch (IOException ioe) {
incrementIOExceptionsAndAbortIfNecessary();
Expand Down Expand Up @@ -674,7 +683,10 @@ public void onComplete(String streamId) throws IOException {
}
if (partitionInfo.getCurrentMapIndex() < 0) {
try {
writeAnyDeferredBufs();
if (deferredBufs != null && !deferredBufs.isEmpty()) {
abortIfNecessary();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be inside the if block, instead of keeping writeDeferredBufs as is and just calling abortIfNecessary before writeDeferredBufs?

Copy link
Contributor Author

@otterc otterc Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if a stream is completing but doesn't need to write any deferred bufs then we should let it complete without failures.

writeDeferredBufs();
}
} catch (IOException ioe) {
incrementIOExceptionsAndAbortIfNecessary();
// If the above doesn't throw a RuntimeException, then we propagate the IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,23 +618,102 @@ public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOE
}

@Test (expected = RuntimeException.class)
public void testRequestForAbortedShufflePartitionThrowsException() throws IOException {
public void testRequestForAbortedShufflePartitionThrowsException() {
try {
testIOExceptionsDuringMetaUpdateIncreasesExceptionCount();
} catch (Throwable t) {
// No more blocks can be merged to this partition.
}
try {
RemoteBlockPushResolver.PushBlockStreamCallback callback =
(RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 10, 0, 0));
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 10, 0, 0));
} catch (Throwable t) {
assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0",
t.getMessage());
throw t;
}
}

@Test (expected = RuntimeException.class)
public void testPendingBlockIsAbortedImmediately() throws IOException {
useTestFiles(true, false);
RemoteBlockPushResolver.PushBlockStreamCallback callback =
(RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 0, 0, 0));
RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
testIndexFile.close();
for (int i = 1; i < 6; i++) {
RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
(RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, i, 0, 0));
try {
callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
// This will complete without any exceptions but the exception count is increased.
callback1.onComplete(callback1.getID());
} catch (Throwable t) {
callback1.onFailure(callback1.getID(), t);
}
}
assertEquals(5, partitionInfo.getNumIOExceptions());
// The server will respond with IOExceptions exceeded threshold for any additional attempts
// to write.
try {
callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
} catch (Throwable t) {
assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0",
t.getMessage());
throw t;
}
}

@Test (expected = RuntimeException.class)
public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException {
useTestFiles(true, false);
RemoteBlockPushResolver.PushBlockStreamCallback callback =
(RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 0, 0, 0));
RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
testIndexFile.close();
for (int i = 1; i < 5; i++) {
RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
(RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, i, 0, 0));
try {
callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
// This will complete without any exceptions but the exception count is increased.
callback1.onComplete(callback1.getID());
} catch (Throwable t) {
callback1.onFailure(callback1.getID(), t);
}
}
assertEquals(4, partitionInfo.getNumIOExceptions());
RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
(RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, 0, 5, 0, 0));
callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
// This is deferred
callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
// Callback2 completes which will throw another exception.
try {
callback2.onComplete(callback2.getID());
} catch (Throwable t) {
callback2.onFailure(callback2.getID(), t);
}
assertEquals(5, partitionInfo.getNumIOExceptions());
// Restore index file so that any further writes to it are successful and any exceptions are
// due to IOExceptions exceeding threshold.
testIndexFile.restore();
try {
callback.onComplete(callback.getID());
} catch (Throwable t) {
assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0",
t.getMessage());
throw t;
}
}

@Test
public void testFailureWhileTruncatingFiles() throws IOException {
useTestFiles(true, false);
Expand Down