Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix to issue of closing channel
  • Loading branch information
ifilonenko committed Mar 25, 2019
commit 9e3f05cff7fe4f50185fd41fc34458e3c62ac4da
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

import org.apache.spark.api.shuffle.ShufflePartitionWriter;
Expand Down Expand Up @@ -203,15 +204,14 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
boolean copyThrewException = true;
ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter();
if (transferToEnabled) {
try (FileChannel outputChannel = writer.openChannel()) {
if (file.exists()) {
FileInputStream in = new FileInputStream(file);
try (FileChannel inputChannel = in.getChannel()){
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
WritableByteChannel outputChannel = writer.openChannel();
if (file.exists()) {
FileInputStream in = new FileInputStream(file);
try (FileChannel inputChannel = in.getChannel()){
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
}
} else {
Expand Down
36 changes: 18 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.lang.reflect.InvocationTargetException
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.{Channels, FileChannel}
import java.nio.channels.{Channels, FileChannel, WritableByteChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.SecureRandom
Expand Down Expand Up @@ -337,10 +337,10 @@ private[spark] object Utils extends Logging {

def copyFileStreamNIO(
input: FileChannel,
output: FileChannel,
output: WritableByteChannel,
startPosition: Long,
bytesToCopy: Long): Unit = {
val initialPos = output.position()
// val initialPos = output.position()
var count = 0L
// In case transferTo method transferred less data than we have required.
while (count < bytesToCopy) {
Expand All @@ -349,21 +349,21 @@ private[spark] object Utils extends Logging {
assert(count == bytesToCopy,
s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = output.position()
val expectedPos = initialPos + bytesToCopy
assert(finalPos == expectedPos,
s"""
|Current position $finalPos do not equal to expected position $expectedPos
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read the links, and it seems to be a kernel bug? is this safe to delete?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no position in writeableByteChannels :/ soo... idk ... meep

Copy link

@mccheah mccheah Mar 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this check it should be in the implementation of ShuffleMapOutputWriter I would guess. Maybe in close / abort / commit etc?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps i'm missing something, but how do you get the expected position in the ShuffleMapOutputWriter for the assert? If that's not easy to get, I would say it's better to keep this here with a if-statement to check if it's an instance of FileChannel to maintain the same behavior as before.

// // Check the position after transferTo loop to see if it is in the right position and
// // give user information if not.
// // Position will not be increased to the expected length after calling transferTo in
// // kernel version 2.6.32, this issue can be seen in
// // https://bugs.openjdk.java.net/browse/JDK-7052359
// // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
// val finalPos = output.position()
// val expectedPos = initialPos + bytesToCopy
// assert(finalPos == expectedPos,
// s"""
// |Current position $finalPos do not equal to expected position $expectedPos
// |after transferTo, please check your kernel version to see if it is 2.6.32,
// |this is a kernel bug which will lead to unexpected behavior when using transferTo.
// |You can set spark.file.transferTo = false to disable this NIO feature.
// """.stripMargin)
}

/**
Expand Down