Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;

public class DefaultTransferrableWritableByteChannel implements TransferrableWritableByteChannel {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out of the api package and into shuffle.sort along with the other implementation classes


private final WritableByteChannel delegate;

public DefaultTransferrableWritableByteChannel(WritableByteChannel delegate) {
this.delegate = delegate;
}

@Override
public void transferFrom(
TransferrableReadableByteChannel source, long numBytesToTransfer) throws IOException {
source.transferTo(delegate, numBytesToTransfer);
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.spark.api.shuffle;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

import org.apache.http.annotation.Experimental;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
Expand All @@ -32,43 +29,16 @@
* @since 3.0.0
*/
@Experimental
public interface ShufflePartitionWriter extends Closeable {
public interface ShufflePartitionWriter {

/**
* Returns an underlying {@link OutputStream} that can write bytes to the underlying data store.
* <p>
* Note that this stream itself is not closed by the caller; close the stream in the
* implementation of this interface's {@link #close()}.
* Opens and returns an underlying {@link OutputStream} that can write bytes to the underlying
* data store.
*/
OutputStream toStream() throws IOException;
OutputStream openStream() throws IOException;

/**
* Returns an underlying {@link WritableByteChannel} that can write bytes to the underlying data
* store.
* <p>
* Note that this channel itself is not closed by the caller; close the channel in the
* implementation of this interface's {@link #close()}.
*/
default WritableByteChannel toChannel() throws IOException {
return Channels.newChannel(toStream());
}

/**
* Get the number of bytes written by this writer's stream returned by {@link #toStream()} or
* the channel returned by {@link #toChannel()}.
* Get the number of bytes written by this writer's stream returned by {@link #openStream()}.
*/
long getNumBytesWritten();

/**
* Close all resources created by this ShufflePartitionWriter, via calls to {@link #toStream()}
* or {@link #toChannel()}.
* <p>
* This must always close any stream returned by {@link #toStream()}.
* <p>
* Note that the default version of {@link #toChannel()} returns a {@link WritableByteChannel}
* that does not itself need to be closed up front; only the underlying output stream given by
* {@link #toStream()} must be closed.
*/
@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.io.IOException;

import org.apache.http.annotation.Experimental;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;
import java.nio.channels.Channels;

import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
* Indicates that partition writers can transfer bytes directly from input byte channels to
* output channels that stream data to the underlying shuffle partition storage medium.
* <p>
* This API is separated out from ShuffleParittionWriter because it only needs to be used for
* specific low-level optimizations.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsTransferTo extends ShufflePartitionWriter {

/**
* Opens and returns a {@link TransferrableWritableByteChannel} for transferring bytes from
* partial input byte channels to the underlying shuffle data store.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "partial" mean in this situation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial meaning the spill file chunks. How can we clarify this without exposing information about the internals of sort-based shuffle?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think the use of "partial" is unclear. Also, none of the other java docs say anything about spill file merging, so I think it's better to omit the word "partial"

*/
default TransferrableWritableByteChannel openTransferrableChannel() throws IOException {
return new DefaultTransferrableWritableByteChannel(Channels.newChannel(openStream()));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why provide a default implementation here? don't we actually want to discourage implementations from even declaring they provide this interface, unless they provide a smarter implementation here?

}

/**
* Returns the number of bytes written either by this writer's output stream opened by
* {@link #openStream()} or the byte channel opened by {@link #openTransferrableChannel()}.
*/
@Override
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting overrided? Is it for the javadoc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's explicitly for JavaDoc - we have to specifically say that the count has to take into account the channel if it was opened.

long getNumBytesWritten();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
* Represents a readable byte channel, where its bytes can be transferred directly to some
* {@link WritableByteChannel} instance.
* <p>
* Shuffle plugin implementations should not need to implement this, but an instance of this is
* passed in for transferring bytes to output byte channels in
* {@link TransferrableWritableByteChannel}.
*
* @since 3.0.0
*/
@Experimental
public interface TransferrableReadableByteChannel {

/**
* Transfer bytes from this input channel to the given {@link WritableByteChannel}.
*/
void transferTo(WritableByteChannel outputChannel, long numBytesToTransfer) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.Closeable;
import java.io.IOException;

import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
* Represents an output byte channel that can copy bytes from readable byte channels to some
* arbitrary storage system.
*
* @since 3.0.0
*/
@Experimental
public interface TransferrableWritableByteChannel extends Closeable {

/**
* Copy all bytes from the source readable byte channel into this byte channel.
*/
void transferFrom(
TransferrableReadableByteChannel source, long numBytesToTransfer) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

import org.apache.spark.api.shuffle.SupportsTransferTo;
import org.apache.spark.api.shuffle.TransferrableReadableByteChannel;
import org.apache.spark.api.shuffle.TransferrableWritableByteChannel;
import scala.None$;
import scala.Option;
import scala.Product2;
Expand Down Expand Up @@ -202,39 +204,37 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
boolean copyThrewException = true;
ShufflePartitionWriter writer = null;
try {
writer = mapOutputWriter.getNextPartitionWriter();
if (!file.exists()) {
copyThrewException = false;
} else {
if (transferToEnabled) {
WritableByteChannel outputChannel = writer.toChannel();
FileInputStream in = new FileInputStream(file);
try (FileChannel inputChannel = in.getChannel()) {
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
} else {
OutputStream tempOutputStream = writer.toStream();
FileInputStream in = new FileInputStream(file);
try {
Utils.copyStream(in, tempOutputStream, false, false);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter();
if (file.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're missing the case for if file.exists() is false, you need to still set the copyThrewException to false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of copyThrewException isn't used if the file does not exist.

if (transferToEnabled && writer instanceof SupportsTransferTo) {
FileInputStream in = new FileInputStream(file);
TransferrableWritableByteChannel outputChannel = null;
try (FileChannel inputChannel = in.getChannel()) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
TransferrableReadableByteChannel inputTransferable =
new FileTransferrableReadableByteChannel(inputChannel, 0L);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seperating out another thread for the need for this interface:

The main problem being that passing in FileChannel directly can expose information like writing to the file? Basically seems ideal to create an abstraction that strictly limits the options that the plugin can do with it.

I guess I'm not too concerned about that. I mean the plugin could also ignore the data in the file entirely, or shift all the bits left, or other random things. this is low-level enough that whoever is touching this needs to have some basic understanding of what they should do with it. IMO more indirection makes it harder to understand. In particular, you are trying to signal to a plugin author that they can leverage the transferTo which is only available on FileChannel.

outputChannel.transferFrom(inputTransferable, inputChannel.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need for all the layers of indirection here -- both TransferrableReadableByteChannel and TransferrableWritableByteChannel seem unnecessary. Even with a non-default writer, inputChannel is always a FileChannel. that's not something that changes. The plugin just needs to provide a WritableByteChannel. then you'd do something like

WriteableByteChannel outputChannel = ((SupportsTransferTo) writer).openWritableChannel();
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());

Copy link
Author

@mccheah mccheah May 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the asymmetry of how to handle closing the channel vs. closing the output stream.

If we call outputChannel.close() here, that closes the underlying output file, but that's not what we want to do in the default writer implementation.

If we don't, we risk leaking channels in non-default implementations.

We should close the output stream as returned if toStream is called, but, we can't close the output stream and not close the channel - they should be symmetrical.

If we have the default writer return a channel that isn't a FileChannel, say, some implementation of a channel that shields from closing, then input.transferTo(output) won't do the right thing - it won't do the zero-byte copy, since implementations of FileChannel check that the output channel is specifically an instanceof FileChannel.

The whole premise of this refactor is to make it possible to close the output channel, but to have the implementations be able to decide whether or not that closes the output resource.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok, sorry I had not really grasped that part before -- though I still might be missing something

  1. so even given that, still seems like my comments apply to TransferrableReadableByteChannel?
  2. then shouldn't your implementation of DefaultTransferrableWritableByteChannel be a no-op? The whole point is you don't want to close that channel? and wouldn't you still want that close to be in outer finally which closes the entire writer, since its the thing which owns the outputChannel which spans everything? Seems like that would go for streams or channels, and most of the storage plugins we've discussed would want that.

I have a feeling I'm not getting something fundamental here ... I will go back and take a closer look at the code before this change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we don't want to make the DefaultShufflePartitionWriter extend Closeable, because that decouples the lifecycle of closing the writer from the lifecycle of closing the channel or stream. It seems dubious to have the writer return a Closeable object that we do not close but we expect to be closed via other means.

For TransferrableReadableByteChannel I created that to hide the fact that we're transferring from a file - thus it's more so an abstraction for removing knowledge that bytes are transferred from files to the output.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem being that passing in FileChannel directly can expose information like writing to the file? Basically seems ideal to create an abstraction that strictly limits the options that the plugin can do with it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok lemme step back a bit and make sure I understand the issue correctly.

I guess there are two styles of shuffle storage plugins we're considered about here. Type (a) will store all the mapoutput together (perhaps in a local file, like spark's current shuffle storage, or maybe in some distributed storage, but still its all written to one socket / channel / outputstream). Type (b) will write each reduce-partition output, from this one map task, a separate destination.

Type (a) is fine with the implementation you have before this change.

But type (b) had a problem, because the output streams don't get closed when we're done with them. They should get closed when the entire output of this mapper is done, as long as the user handles that properly in writer.close(). But in addition to being a cumbersome api, it also leaves open those outputstreams / channels far too long.

Its worth noting that type (b) is not like the old hash shuffle manager, which intentionally kept a outputstream open for each reduce Partition, because that just wrote records immediately as it received them. This is doing a local sort first, which should let this cut down the number of open outputstreams (which is where the hash shuffle manager would fall over, and why you really want to make sure you're closing them as soon as possible).

Before my attempting to propose another solution -- have I at least described the problem correctly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's correct - we want to give the API that allows for closing each partition stream individually but the default implementation (and perhaps others) can internally shield the close of the returned streams.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think I finally get it and what you are doing. makes sense, I would like more comments in the code explaining the "why" a little bit.

copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Closeables.close(outputChannel, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
} else {
FileInputStream in = new FileInputStream(file);
OutputStream outputStream = null;
try {
outputStream = writer.openStream();
Utils.copyStream(in, outputStream, false, false);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Closeables.close(outputStream, copyThrewException);
}
}
} finally {
Closeables.close(writer, copyThrewException);
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}

lengths[i] = writer.getNumBytesWritten();
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.api.shuffle.TransferrableReadableByteChannel;
import org.apache.spark.util.Utils;

public class FileTransferrableReadableByteChannel implements TransferrableReadableByteChannel {

private final FileChannel input;
private final long transferStartPosition;

public FileTransferrableReadableByteChannel(FileChannel input, long transferStartPosition) {
this.input = input;
this.transferStartPosition = transferStartPosition;
}

@Override
public void transferTo(WritableByteChannel outputChannel, long numBytesToTransfer) {
Utils.copyFileStreamNIO(input, outputChannel, transferStartPosition, numBytesToTransfer);
}
}
Loading