Skip to content
Prev Previous commit
Address comments.
  • Loading branch information
mccheah committed May 7, 2019
commit f3dac6ea511cb58a218cccded5ee73c58c34a22b
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
* Indicates that partition writers can transfer bytes directly from input byte channels to
* output channels that stream data to the underlying shuffle partition storage medium.
* <p>
* This API is separated out from ShuffleParittionWriter because it only needs to be used for
* specific low-level optimizations.
* This API is separated out for advanced users because it only needs to be used for
* specific low-level optimizations. The idea is that the returned channel can transfer bytes
* from the input file channel out to the backing storage system without copying data into
* memory.
* <p>
* Most shuffle plugin implementations should use {@link ShufflePartitionWriter} instead.
*
* @since 3.0.0
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@
import java.io.Closeable;
import java.io.IOException;

import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
* Represents an output byte channel that can copy bytes from readable byte channels to some
* Represents an output byte channel that can copy bytes from input file channels to some
* arbitrary storage system.
* <p>
* This API is provided for advanced users who can transfer bytes from a file channel to
* some output sink without copying data into memory. Most users should not need to use
* this functionality; this is primarily provided for the built-in shuffle storage backends
* that persist shuffle files on local disk.
* <p>
* For a simpler alternative, see {@link ShufflePartitionWriter}.
*
* @since 3.0.0
*/
Expand All @@ -34,7 +43,12 @@ public interface TransferrableWritableByteChannel extends Closeable {

/**
* Copy all bytes from the source readable byte channel into this byte channel.
*
* @param source File to transfer bytes from. Do not call anything on this channel other than
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
* @param transferStartPosition Start position of the input file to transfer from.
* @param numBytesToTransfer Number of bytes to transfer from the given source.
*/
void transferFrom(
TransferrableReadableByteChannel source, long numBytesToTransfer) throws IOException;
void transferFrom(FileChannel source, long transferStartPosition, long numBytesToTransfer)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.api.shuffle.TransferrableReadableByteChannel;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import org.apache.spark.internal.config.package$;
import org.apache.spark.scheduler.MapStatus;
Expand Down Expand Up @@ -218,12 +217,12 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
if (writer instanceof SupportsTransferTo) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
} else {
// Use default transferrable writable channel anyways in order to have parity with
// UnsafeShuffleWriter.
outputChannel = new DefaultTransferrableWritableByteChannel(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any point in having this else branch here? shouldn't the if (transferToEnabled) above also check SupportsTransferTo? I don't think you'll get any performance benefits from this with a channel wrapping an outputstream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to remain consistent with UnsafeShuffleWriter. See also #535 (comment)

Channels.newChannel(writer.openStream()));
}
TransferrableReadableByteChannel inputTransferable =
new FileTransferrableReadableByteChannel(inputChannel, 0L);
outputChannel.transferFrom(inputTransferable, inputChannel.size());
outputChannel.transferFrom(inputChannel, 0L, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
package org.apache.spark.shuffle.sort;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.api.shuffle.TransferrableReadableByteChannel;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import org.apache.spark.util.Utils;

/**
* This is used when transferTo is enabled but the shuffle plugin hasn't implemented
* {@link org.apache.spark.api.shuffle.SupportsTransferTo}.
* <p>
* This default implementation exists as a convenience to the unsafe shuffle writer and
* the bypass merge sort shuffle writers.
*/
public class DefaultTransferrableWritableByteChannel implements TransferrableWritableByteChannel {

private final WritableByteChannel delegate;
Expand All @@ -32,8 +40,8 @@ public DefaultTransferrableWritableByteChannel(WritableByteChannel delegate) {

@Override
public void transferFrom(
TransferrableReadableByteChannel source, long numBytesToTransfer) throws IOException {
source.transferTo(delegate, numBytesToTransfer);
FileChannel source, long transferStartPosition, long numBytesToTransfer) {
Utils.copyFileStreamNIO(source, delegate, transferStartPosition, numBytesToTransfer);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,7 @@ private long[] mergeSpillsWithTransferTo(
final FileChannel spillInputChannel = spillInputChannels[i];
final long writeStartTime = System.nanoTime();
partitionChannel.transferFrom(
new FileTransferrableReadableByteChannel(
spillInputChannel, spillInputChannelPositions[i]),
partitionLengthInSpill);
spillInputChannel, spillInputChannelPositions[i], partitionLengthInSpill);
spillInputChannelPositions[i] += partitionLengthInSpill;
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.api.shuffle.{SupportsTransferTo, TransferrableReadableByteChannel}
import org.apache.spark.api.shuffle.SupportsTransferTo
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils

class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
Expand Down Expand Up @@ -166,11 +166,19 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
intBuffer.put(data(p))
channel.transferFrom(
new ByteBufferTransferrableReadableChannel(byteBuffer), byteBuffer.remaining())
val numBytes = byteBuffer.remaining()
val outputTempFile = File.createTempFile("channelTemp", "", tempDir)
val outputTempFileStream = new FileOutputStream(outputTempFile)
Utils.copyStream(
new ByteBufferInputStream(byteBuffer),
outputTempFileStream,
closeStreams = true)
val tempFileInput = new FileInputStream(outputTempFile)
channel.transferFrom(tempFileInput.getChannel, 0L, numBytes)
// Bytes require * 4
assert(writer.getNumBytesWritten == D_LEN * 4)
channel.close()
tempFileInput.close()
assert(writer.getNumBytesWritten == D_LEN * 4)
}
mapOutputWriter.commitAllPartitions()
val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray
Expand Down Expand Up @@ -210,8 +218,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
out.write(byteBuffer.array())
out.close()
val in = new FileInputStream(tempFile)
channel.transferFrom(
new ByteBufferTransferrableReadableChannel(byteBuffer), byteBuffer.remaining())
channel.transferFrom(in.getChannel, 0L, byteBuffer.remaining())
channel.close()
assert(writer.getNumBytesWritten == D_LEN * 4)
}
Expand All @@ -221,11 +228,4 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
assert(mergedOutputFile.length() === partitionLengths.sum)
assert(data === readRecordsFromFile(true))
}

private class ByteBufferTransferrableReadableChannel(buf: ByteBuffer)
extends TransferrableReadableByteChannel {
override def transferTo(outputChannel: WritableByteChannel, numBytesToTransfer: Long): Unit = {
outputChannel.write(buf)
}
}
}