Skip to content
Prev Previous commit
Next Next commit
Address comments
  • Loading branch information
mccheah committed May 1, 2019
commit 1faf98045ba808a6da81dfd8ce4f28efc4fad2ee
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ public interface SupportsTransferTo extends ShufflePartitionWriter {

/**
* Opens and returns a {@link TransferrableWritableByteChannel} for transferring bytes from
* partial input byte channels to the underlying shuffle data store.
* input byte channels to the underlying shuffle data store.
*/
default TransferrableWritableByteChannel openTransferrableChannel() throws IOException {
return new DefaultTransferrableWritableByteChannel(Channels.newChannel(openStream()));
}
TransferrableWritableByteChannel openTransferrableChannel() throws IOException;

/**
* Returns the number of bytes written either by this writer's output stream opened by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -92,7 +93,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final int mapId;
private final Serializer serializer;
private final ShuffleWriteSupport shuffleWriteSupport;
private final IndexShuffleBlockResolver shuffleBlockResolver;

/** Array of file writers, one for each partition */
private DiskBlockObjectWriter[] partitionWriters;
Expand All @@ -109,7 +109,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {

BypassMergeSortShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
SparkConf conf,
Expand All @@ -126,7 +125,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
this.shuffleWriteSupport = shuffleWriteSupport;
}

Expand Down Expand Up @@ -211,14 +209,19 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
boolean copyThrewException = true;
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're missing the case for if file.exists() is false, you need to still set the copyThrewException to false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of copyThrewException isn't used if the file does not exist.

if (transferToEnabled && writer instanceof SupportsTransferTo) {
boolean copyThrewException = true;
if (transferToEnabled) {
FileInputStream in = new FileInputStream(file);
TransferrableWritableByteChannel outputChannel = null;
try (FileChannel inputChannel = in.getChannel()) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
if (writer instanceof SupportsTransferTo) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
} else {
outputChannel = new DefaultTransferrableWritableByteChannel(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any point in having this else branch here? shouldn't the if (transferToEnabled) above also check SupportsTransferTo? I don't think you'll get any performance benefits from this with a channel wrapping an outputstream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to remain consistent with UnsafeShuffleWriter. See also #535 (comment)

Channels.newChannel(writer.openStream()));
}
TransferrableReadableByteChannel inputTransferable =
new FileTransferrableReadableByteChannel(inputChannel, 0L);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seperating out another thread for the need for this interface:

The main problem being that passing in FileChannel directly can expose information like writing to the file? Basically seems ideal to create an abstraction that strictly limits the options that the plugin can do with it.

I guess I'm not too concerned about that. I mean the plugin could also ignore the data in the file entirely, or shift all the bits left, or other random things. this is low-level enough that whoever is touching this needs to have some basic understanding of what they should do with it. IMO more indirection makes it harder to understand. In particular, you are trying to signal to a plugin author that they can leverage the transferTo which is only available on FileChannel.

outputChannel.transferFrom(inputTransferable, inputChannel.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need for all the layers of indirection here -- both TransferrableReadableByteChannel and TransferrableWritableByteChannel seem unnecessary. Even with a non-default writer, inputChannel is always a FileChannel. that's not something that changes. The plugin just needs to provide a WritableByteChannel. then you'd do something like

WriteableByteChannel outputChannel = ((SupportsTransferTo) writer).openWritableChannel();
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());

Copy link
Author

@mccheah mccheah May 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the asymmetry of how to handle closing the channel vs. closing the output stream.

If we call outputChannel.close() here, that closes the underlying output file, but that's not what we want to do in the default writer implementation.

If we don't, we risk leaking channels in non-default implementations.

We should close the output stream as returned if toStream is called, but, we can't close the output stream and not close the channel - they should be symmetrical.

If we have the default writer return a channel that isn't a FileChannel, say, some implementation of a channel that shields from closing, then input.transferTo(output) won't do the right thing - it won't do the zero-byte copy, since implementations of FileChannel check that the output channel is specifically an instanceof FileChannel.

The whole premise of this refactor is to make it possible to close the output channel, but to have the implementations be able to decide whether or not that closes the output resource.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok, sorry I had not really grasped that part before -- though I still might be missing something

  1. so even given that, still seems like my comments apply to TransferrableReadableByteChannel?
  2. then shouldn't your implementation of DefaultTransferrableWritableByteChannel be a no-op? The whole point is you don't want to close that channel? and wouldn't you still want that close to be in outer finally which closes the entire writer, since its the thing which owns the outputChannel which spans everything? Seems like that would go for streams or channels, and most of the storage plugins we've discussed would want that.

I have a feeling I'm not getting something fundamental here ... I will go back and take a closer look at the code before this change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we don't want to make the DefaultShufflePartitionWriter extend Closeable, because that decouples the lifecycle of closing the writer from the lifecycle of closing the channel or stream. It seems dubious to have the writer return a Closeable object that we do not close but we expect to be closed via other means.

For TransferrableReadableByteChannel I created that to hide the fact that we're transferring from a file - thus it's more so an abstraction for removing knowledge that bytes are transferred from files to the output.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem being that passing in FileChannel directly can expose information like writing to the file? Basically seems ideal to create an abstraction that strictly limits the options that the plugin can do with it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok lemme step back a bit and make sure I understand the issue correctly.

I guess there are two styles of shuffle storage plugins we're considered about here. Type (a) will store all the mapoutput together (perhaps in a local file, like spark's current shuffle storage, or maybe in some distributed storage, but still its all written to one socket / channel / outputstream). Type (b) will write each reduce-partition output, from this one map task, a separate destination.

Type (a) is fine with the implementation you have before this change.

But type (b) had a problem, because the output streams don't get closed when we're done with them. They should get closed when the entire output of this mapper is done, as long as the user handles that properly in writer.close(). But in addition to being a cumbersome api, it also leaves open those outputstreams / channels far too long.

Its worth noting that type (b) is not like the old hash shuffle manager, which intentionally kept a outputstream open for each reduce Partition, because that just wrote records immediately as it received them. This is doing a local sort first, which should let this cut down the number of open outputstreams (which is where the hash shuffle manager would fall over, and why you really want to make sure you're closing them as soon as possible).

Before my attempting to propose another solution -- have I at least described the problem correctly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's correct - we want to give the API that allows for closing each partition stream individually but the default implementation (and perhaps others) can internally shield the close of the returned streams.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think I finally get it and what you are doing. makes sense, I would like more comments in the code explaining the "why" a little bit.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.api.shuffle;
package org.apache.spark.shuffle.sort;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.api.shuffle.TransferrableReadableByteChannel;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;

public class DefaultTransferrableWritableByteChannel implements TransferrableWritableByteChannel {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out of the api package and into shuffle.sort along with the other implementation classes


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.DefaultTransferrableWritableByteChannel;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.api.shuffle.DefaultTransferrableWritableByteChannel;
import org.apache.spark.shuffle.sort.DefaultTransferrableWritableByteChannel;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import org.apache.spark.api.shuffle.SupportsTransferTo;
Expand Down