Skip to content
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'palantir/spark-25299' into proposed-new…
…-transferto-api
  • Loading branch information
mccheah committed Apr 30, 2019
commit e98661e649d123d59553d9606be3329d2406aa1c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.nio.channels.FileChannel;
import javax.annotation.Nullable;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.SupportsTransferTo;
import org.apache.spark.api.shuffle.TransferrableReadableByteChannel;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
Expand Down Expand Up @@ -210,7 +212,7 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
boolean copyThrewException = true;
ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter();
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're missing the case for if file.exists() is false, you need to still set the copyThrewException to false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of copyThrewException isn't used if the file does not exist.

if (transferToEnabled && writer instanceof SupportsTransferTo) {
FileInputStream in = new FileInputStream(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import java.nio.channels.FileChannel;
import java.util.Iterator;

import org.apache.spark.api.shuffle.DefaultTransferrableWritableByteChannel;
import org.apache.spark.api.shuffle.SupportsTransferTo;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
Expand All @@ -40,9 +37,14 @@

import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.DefaultTransferrableWritableByteChannel;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.api.shuffle.SupportsTransferTo;
import org.apache.spark.internal.config.package$;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
Expand Down Expand Up @@ -286,11 +288,6 @@ private long[] mergeSpills(SpillInfo[] spills,
long[] partitionLengths = new long[numPartitions];
try {
if (spills.length == 0) {
// The contract we are working under states that we will open a partition writer for
// each partition, regardless of number of spills
for (int i = 0; i < numPartitions; i++) {
mapWriter.getNextPartitionWriter();
}
return partitionLengths;
} else {
// There are multiple spills to merge, so none of these spill files' lengths were counted
Expand Down Expand Up @@ -364,7 +361,7 @@ private long[] mergeSpillsWithFileStream(
}
for (int partition = 0; partition < numPartitions; partition++) {
boolean copyThrewExecption = true;
ShufflePartitionWriter writer = mapWriter.getNextPartitionWriter();
ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition);
OutputStream partitionOutput = null;
try {
partitionOutput = writer.openStream();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you are not wrapping this around a CloseShieldOutputStream. How are you now shielding the close of the compressor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You aren't, that's the point. The shielding of the close of the underlying file is handed by DefaultShuffleMapOutputWriter.

Expand Down Expand Up @@ -435,7 +432,7 @@ private long[] mergeSpillsWithTransferTo(
}
for (int partition = 0; partition < numPartitions; partition++) {
boolean copyThrewExecption = true;
ShufflePartitionWriter writer = mapWriter.getNextPartitionWriter();
ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition);
TransferrableWritableByteChannel partitionChannel = null;
try {
partitionChannel = writer instanceof SupportsTransferTo ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public ShufflePartitionWriter getPartitionWriter(int partitionId) throws IOExcep
} else {
currChannelPosition = 0L;
}

return new DefaultShufflePartitionWriter(currPartitionId++);
return new DefaultShufflePartitionWriter(partitionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,6 @@ private[spark] class ExternalSorter[K, V, C](
lengths
}

private def writeEmptyPartition(mapOutputWriter: ShuffleMapOutputWriter): Unit = {
mapOutputWriter.getNextPartitionWriter
}

/**
* Write all the data added into this ExternalSorter into a map output writer that pushes bytes
* to some arbitrary backing store. This is called by the SortShuffleWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import org.apache.spark.api.shuffle.{SupportsTransferTo, TransferrableReadableBy
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.Utils

class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAfterEach {

Expand Down Expand Up @@ -141,7 +143,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("writing to an outputstream") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val stream = writer.openStream()
data(p).foreach { i => stream.write(i)}
stream.close()
Expand All @@ -159,7 +161,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("writing to a channel") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val channel = writer.asInstanceOf[SupportsTransferTo].openTransferrableChannel()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
Expand All @@ -179,7 +181,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("copyStreams with an outputstream") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val stream = writer.openStream()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
Expand All @@ -199,7 +201,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("copyStreamsWithNIO with a channel") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val channel = writer.asInstanceOf[SupportsTransferTo].openTransferrableChannel()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.