Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.spark.api.shuffle;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

import org.apache.http.annotation.Experimental;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
Expand All @@ -32,43 +29,16 @@
* @since 3.0.0
*/
@Experimental
public interface ShufflePartitionWriter extends Closeable {
public interface ShufflePartitionWriter {

/**
* Returns an underlying {@link OutputStream} that can write bytes to the underlying data store.
* <p>
* Note that this stream itself is not closed by the caller; close the stream in the
* implementation of this interface's {@link #close()}.
* Opens and returns an underlying {@link OutputStream} that can write bytes to the underlying
* data store.
*/
OutputStream toStream() throws IOException;
OutputStream openStream() throws IOException;

/**
* Returns an underlying {@link WritableByteChannel} that can write bytes to the underlying data
* store.
* <p>
* Note that this channel itself is not closed by the caller; close the channel in the
* implementation of this interface's {@link #close()}.
*/
default WritableByteChannel toChannel() throws IOException {
return Channels.newChannel(toStream());
}

/**
* Get the number of bytes written by this writer's stream returned by {@link #toStream()} or
* the channel returned by {@link #toChannel()}.
* Get the number of bytes written by this writer's stream returned by {@link #openStream()}.
*/
long getNumBytesWritten();

/**
* Close all resources created by this ShufflePartitionWriter, via calls to {@link #toStream()}
* or {@link #toChannel()}.
* <p>
* This must always close any stream returned by {@link #toStream()}.
* <p>
* Note that the default version of {@link #toChannel()} returns a {@link WritableByteChannel}
* that does not itself need to be closed up front; only the underlying output stream given by
* {@link #toStream()} must be closed.
*/
@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.io.IOException;

import org.apache.http.annotation.Experimental;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;

import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
* Indicates that partition writers can transfer bytes directly from input byte channels to
* output channels that stream data to the underlying shuffle partition storage medium.
* <p>
* This API is separated out for advanced users because it only needs to be used for
* specific low-level optimizations. The idea is that the returned channel can transfer bytes
* from the input file channel out to the backing storage system without copying data into
* memory.
* <p>
* Most shuffle plugin implementations should use {@link ShufflePartitionWriter} instead.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsTransferTo extends ShufflePartitionWriter {

/**
* Opens and returns a {@link TransferrableWritableByteChannel} for transferring bytes from
* input byte channels to the underlying shuffle data store.
*/
TransferrableWritableByteChannel openTransferrableChannel() throws IOException;

/**
* Returns the number of bytes written either by this writer's output stream opened by
* {@link #openStream()} or the byte channel opened by {@link #openTransferrableChannel()}.
*/
@Override
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting overrided? Is it for the javadoc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's explicitly for JavaDoc - we have to specifically say that the count has to take into account the channel if it was opened.

long getNumBytesWritten();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.Closeable;
import java.io.IOException;

import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
* Represents an output byte channel that can copy bytes from input file channels to some
* arbitrary storage system.
* <p>
* This API is provided for advanced users who can transfer bytes from a file channel to
* some output sink without copying data into memory. Most users should not need to use
* this functionality; this is primarily provided for the built-in shuffle storage backends
* that persist shuffle files on local disk.
* <p>
* For a simpler alternative, see {@link ShufflePartitionWriter}.
*
* @since 3.0.0
*/
@Experimental
public interface TransferrableWritableByteChannel extends Closeable {

/**
* Copy all bytes from the source readable byte channel into this byte channel.
*
* @param source File to transfer bytes from. Do not call anything on this channel other than
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
* @param transferStartPosition Start position of the input file to transfer from.
* @param numBytesToTransfer Number of bytes to transfer from the given source.
*/
void transferFrom(FileChannel source, long transferStartPosition, long numBytesToTransfer)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import scala.None$;
import scala.Option;
import scala.Product2;
Expand All @@ -38,19 +36,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.SupportsTransferTo;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import org.apache.spark.internal.config.package$;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;
Expand Down Expand Up @@ -90,7 +91,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final int mapId;
private final Serializer serializer;
private final ShuffleWriteSupport shuffleWriteSupport;
private final IndexShuffleBlockResolver shuffleBlockResolver;

/** Array of file writers, one for each partition */
private DiskBlockObjectWriter[] partitionWriters;
Expand All @@ -107,7 +107,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {

BypassMergeSortShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
SparkConf conf,
Expand All @@ -124,7 +123,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
this.shuffleWriteSupport = shuffleWriteSupport;
}

Expand Down Expand Up @@ -209,40 +207,43 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
boolean copyThrewException = true;
ShufflePartitionWriter writer = null;
try {
writer = mapOutputWriter.getPartitionWriter(i);
if (!file.exists()) {
copyThrewException = false;
} else {
if (transferToEnabled) {
WritableByteChannel outputChannel = writer.toChannel();
FileInputStream in = new FileInputStream(file);
try (FileChannel inputChannel = in.getChannel()) {
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
} else {
OutputStream tempOutputStream = writer.toStream();
FileInputStream in = new FileInputStream(file);
try {
Utils.copyStream(in, tempOutputStream, false, false);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're missing the case for if file.exists() is false, you need to still set the copyThrewException to false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of copyThrewException isn't used if the file does not exist.

boolean copyThrewException = true;
if (transferToEnabled) {
FileInputStream in = new FileInputStream(file);
TransferrableWritableByteChannel outputChannel = null;
try (FileChannel inputChannel = in.getChannel()) {
if (writer instanceof SupportsTransferTo) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
} else {
// Use default transferrable writable channel anyways in order to have parity with
// UnsafeShuffleWriter.
outputChannel = new DefaultTransferrableWritableByteChannel(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any point in having this else branch here? shouldn't the if (transferToEnabled) above also check SupportsTransferTo? I don't think you'll get any performance benefits from this with a channel wrapping an outputstream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to remain consistent with UnsafeShuffleWriter. See also #535 (comment)

Channels.newChannel(writer.openStream()));
}
outputChannel.transferFrom(inputChannel, 0L, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Closeables.close(outputChannel, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
} else {
FileInputStream in = new FileInputStream(file);
OutputStream outputStream = null;
try {
outputStream = writer.openStream();
Utils.copyStream(in, outputStream, false, false);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Closeables.close(outputStream, copyThrewException);
}
}
} finally {
Closeables.close(writer, copyThrewException);
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}

lengths[i] = writer.getNumBytesWritten();
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import org.apache.spark.util.Utils;

/**
* This is used when transferTo is enabled but the shuffle plugin hasn't implemented
* {@link org.apache.spark.api.shuffle.SupportsTransferTo}.
* <p>
* This default implementation exists as a convenience to the unsafe shuffle writer and
* the bypass merge sort shuffle writers.
*/
public class DefaultTransferrableWritableByteChannel implements TransferrableWritableByteChannel {

private final WritableByteChannel delegate;

public DefaultTransferrableWritableByteChannel(WritableByteChannel delegate) {
this.delegate = delegate;
}

@Override
public void transferFrom(
FileChannel source, long transferStartPosition, long numBytesToTransfer) {
Utils.copyFileStreamNIO(source, delegate, transferStartPosition, numBytesToTransfer);
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Loading