Skip to content

Conversation

@mccheah
Copy link

@mccheah mccheah commented Apr 15, 2019

This solves the consistency and resource leakage concerns with the first iteration of this API, where it would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

  • Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely. This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO APIs marked as experimental or developer-api.

  • We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some destination sink.

  • The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps resources open until commitAllPartitions() is called.

  • ShufflePartitionWriter no longer extends Closeable, clarifying the fact that the resources returned by the partition writer will be closed by the callers.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.
@mccheah mccheah changed the title Propose a new NIO transfer API for partition writing. [SPARK-25299] Propose a new NIO transfer API for partition writing. Apr 15, 2019
Copy link

@ifilonenko ifilonenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions

partitionInputStream = new LimitedInputStream(spillInputStreams[i],
partitionLengthInSpill, false);
partitionInputStream = blockManager.serializerManager().wrapForEncryption(
partitionOutput = writer.openStream();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you are not wrapping this around a CloseShieldOutputStream. How are you now shielding the close of the compressor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You aren't, that's the point. The shielding of the close of the underlying file is handed by DefaultShuffleMapOutputWriter.

}

public long getCount() throws IOException {
long writtenPosition = outputFileChannel.position();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap this in a try {} in case the outputFileChannel can't calculate the position of the file channel?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws IOException which should propagate up and cause the failure in the right place.

partitionWriter.close()
}
}
mapOutputWriter.getNextPartitionWriter

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no close here? If its an empty partition

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You aren't closing partition writers anymore, only any streams that they create.

// close objOut to flush the compressed bytes to the partition writer stream, but we don't want
// to close the partition output stream in the process.
partitionStream = new CloseShieldOutputStream(partitionWriter.toStream)
partitionStream = partitionWriter.openStream

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be wise here to add a comment about the contract of the stream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't really a contract aside from the fact that we'll close it - which is expected because OutputStream extends Closeable.

Copy link

@ifilonenko ifilonenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this reorganization as it removes the need for the CloseShield

@mccheah
Copy link
Author

mccheah commented Apr 29, 2019

@vanzin @squito - was curious what you thought about this change.

For context, if we don't do it this way, we run into a weird inconsistency where we can't close the returned channel (since it has to be an instance of FileChannel) but we can, and probably should, close partition output streams.

Copy link

@yifeih yifeih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new API, but I left a few comments

import java.io.IOException;
import java.nio.channels.WritableByteChannel;

public class DefaultTransferrableWritableByteChannel implements TransferrableWritableByteChannel {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out of the api package and into shuffle.sort along with the other implementation classes

* Returns the number of bytes written either by this writer's output stream opened by
* {@link #openStream()} or the byte channel opened by {@link #openTransferrableChannel()}.
*/
@Override
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting overrided? Is it for the javadoc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's explicitly for JavaDoc - we have to specifically say that the count has to take into account the channel if it was opened.

Closeables.close(in, copyThrewException);
}
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're missing the case for if file.exists() is false, you need to still set the copyThrewException to false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of copyThrewException isn't used if the file does not exist.


/**
* Opens and returns a {@link TransferrableWritableByteChannel} for transferring bytes from
* partial input byte channels to the underlying shuffle data store.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "partial" mean in this situation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial meaning the spill file chunks. How can we clarify this without exposing information about the internals of sort-based shuffle?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think the use of "partial" is unclear. Also, none of the other java docs say anything about spill file merging, so I think it's better to omit the word "partial"

WritableByteChannel channel = writer.toChannel();
partitionChannel = writer instanceof SupportsTransferTo ?
((SupportsTransferTo) writer).openTransferrableChannel()
: new DefaultTransferrableWritableByteChannel(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it weird that we create a DefaultTransferrableWritableByteChannel here if transferToEnabled is true and the writer is an instance of SupportsTransferTo, but in BypassMergeSortShuffleWriter, we don't and both transferToEnabled has to be true and the writer must be of type SupportsTransferTo in order for us to call that codepath?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind the inconsistency too much.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's easily fixed so we can align these

outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
TransferrableReadableByteChannel inputTransferable =
new FileTransferrableReadableByteChannel(inputChannel, 0L);
outputChannel.transferFrom(inputTransferable, inputChannel.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need for all the layers of indirection here -- both TransferrableReadableByteChannel and TransferrableWritableByteChannel seem unnecessary. Even with a non-default writer, inputChannel is always a FileChannel. that's not something that changes. The plugin just needs to provide a WritableByteChannel. then you'd do something like

WriteableByteChannel outputChannel = ((SupportsTransferTo) writer).openWritableChannel();
Utils.copyFileStreamNIO(inputChannel, outputChannel, 0, inputChannel.size());

Copy link
Author

@mccheah mccheah May 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the asymmetry of how to handle closing the channel vs. closing the output stream.

If we call outputChannel.close() here, that closes the underlying output file, but that's not what we want to do in the default writer implementation.

If we don't, we risk leaking channels in non-default implementations.

We should close the output stream as returned if toStream is called, but, we can't close the output stream and not close the channel - they should be symmetrical.

If we have the default writer return a channel that isn't a FileChannel, say, some implementation of a channel that shields from closing, then input.transferTo(output) won't do the right thing - it won't do the zero-byte copy, since implementations of FileChannel check that the output channel is specifically an instanceof FileChannel.

The whole premise of this refactor is to make it possible to close the output channel, but to have the implementations be able to decide whether or not that closes the output resource.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok, sorry I had not really grasped that part before -- though I still might be missing something

  1. so even given that, still seems like my comments apply to TransferrableReadableByteChannel?
  2. then shouldn't your implementation of DefaultTransferrableWritableByteChannel be a no-op? The whole point is you don't want to close that channel? and wouldn't you still want that close to be in outer finally which closes the entire writer, since its the thing which owns the outputChannel which spans everything? Seems like that would go for streams or channels, and most of the storage plugins we've discussed would want that.

I have a feeling I'm not getting something fundamental here ... I will go back and take a closer look at the code before this change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we don't want to make the DefaultShufflePartitionWriter extend Closeable, because that decouples the lifecycle of closing the writer from the lifecycle of closing the channel or stream. It seems dubious to have the writer return a Closeable object that we do not close but we expect to be closed via other means.

For TransferrableReadableByteChannel I created that to hide the fact that we're transferring from a file - thus it's more so an abstraction for removing knowledge that bytes are transferred from files to the output.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem being that passing in FileChannel directly can expose information like writing to the file? Basically seems ideal to create an abstraction that strictly limits the options that the plugin can do with it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok lemme step back a bit and make sure I understand the issue correctly.

I guess there are two styles of shuffle storage plugins we're considered about here. Type (a) will store all the mapoutput together (perhaps in a local file, like spark's current shuffle storage, or maybe in some distributed storage, but still its all written to one socket / channel / outputstream). Type (b) will write each reduce-partition output, from this one map task, a separate destination.

Type (a) is fine with the implementation you have before this change.

But type (b) had a problem, because the output streams don't get closed when we're done with them. They should get closed when the entire output of this mapper is done, as long as the user handles that properly in writer.close(). But in addition to being a cumbersome api, it also leaves open those outputstreams / channels far too long.

Its worth noting that type (b) is not like the old hash shuffle manager, which intentionally kept a outputstream open for each reduce Partition, because that just wrote records immediately as it received them. This is doing a local sort first, which should let this cut down the number of open outputstreams (which is where the hash shuffle manager would fall over, and why you really want to make sure you're closing them as soon as possible).

Before my attempting to propose another solution -- have I at least described the problem correctly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's correct - we want to give the API that allows for closing each partition stream individually but the default implementation (and perhaps others) can internally shield the close of the returned streams.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think I finally get it and what you are doing. makes sense, I would like more comments in the code explaining the "why" a little bit.

* partial input byte channels to the underlying shuffle data store.
*/
default TransferrableWritableByteChannel openTransferrableChannel() throws IOException {
return new DefaultTransferrableWritableByteChannel(Channels.newChannel(openStream()));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why provide a default implementation here? don't we actually want to discourage implementations from even declaring they provide this interface, unless they provide a smarter implementation here?

Channels.newChannel(writer.openStream()));
}
TransferrableReadableByteChannel inputTransferable =
new FileTransferrableReadableByteChannel(inputChannel, 0L);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seperating out another thread for the need for this interface:

The main problem being that passing in FileChannel directly can expose information like writing to the file? Basically seems ideal to create an abstraction that strictly limits the options that the plugin can do with it.

I guess I'm not too concerned about that. I mean the plugin could also ignore the data in the file entirely, or shift all the bits left, or other random things. this is low-level enough that whoever is touching this needs to have some basic understanding of what they should do with it. IMO more indirection makes it harder to understand. In particular, you are trying to signal to a plugin author that they can leverage the transferTo which is only available on FileChannel.

if (writer instanceof SupportsTransferTo) {
outputChannel = ((SupportsTransferTo) writer).openTransferrableChannel();
} else {
outputChannel = new DefaultTransferrableWritableByteChannel(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any point in having this else branch here? shouldn't the if (transferToEnabled) above also check SupportsTransferTo? I don't think you'll get any performance benefits from this with a channel wrapping an outputstream

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to remain consistent with UnsafeShuffleWriter. See also #535 (comment)

@mccheah
Copy link
Author

mccheah commented May 7, 2019

Address comments either in follow-up discussion or code in the latest patch @squito.

@mccheah
Copy link
Author

mccheah commented May 24, 2019

@squito I think I addressed the comments, I'm going to merge and we can address feedback in a follow up.

@mccheah mccheah merged commit 62c2664 into spark-25299 May 24, 2019
@mccheah mccheah deleted the proposed-new-transferto-api branch May 24, 2019 19:15
Copy link

@svc-spark-25299 svc-spark-25299 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

================================================================================================
BlockStoreShuffleReader reader
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
no aggregation or sorting:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
local fetch                                       11089          11137          19          0.9        1108.9       1.0X
remote rpc fetch                                  11093          11159          43          0.9        1109.3       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with aggregation:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
local fetch                                       29309          29564         229          0.1       14654.7       1.0X
remote rpc fetch                                  29184          29525         339          0.1       14592.1       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with sorting:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
local fetch                                       30166          30529         230          0.1       15082.8       1.0X
remote rpc fetch                                  30361          30693         303          0.1       15180.3       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with seek:                                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
seek to last record                                   1              1           1       2779.2           0.4       1.0X

================================================================================================
BypassMergeSortShuffleWriter write
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite without spill:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without disk spill                      2              4           3          0.5        2018.8       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite with spill:   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo                                 7215           7348          88          0.9        1075.2       1.0X
with transferTo                                    7325           7384          63          0.9        1091.5       1.0X

================================================================================================
SortShuffleWriter writer
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter without spills:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills                         10             16           4          0.1        9934.6       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter with spills:            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
no map side combine                               14324          14456          94          0.5        2134.4       1.0X
with map side aggregation                         14282          14414          93          0.5        2128.2       1.0X
with map side sort                                14289          14387         101          0.5        2129.2       1.0X

================================================================================================
UnsafeShuffleWriter write
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter without spills:       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills                         22             27          15          0.0       21697.4       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter with spills:          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo                                15908          16027         114          0.8        1185.3       1.0X
with transferTo                                   15885          16005         105          0.8        1183.6       1.0X



yifeih pushed a commit that referenced this pull request May 30, 2019
…535)

* Propose a new NIO transfer API for partition writing.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.

* Migrate unsafe shuffle writer to use new byte channel API.

* More sane implementation for unsafe

* Fix style

* Address comments

* Fix imports

* Fix build

* Fix more build problems

* Address comments.
mccheah added a commit that referenced this pull request Jun 27, 2019
…535)

* Propose a new NIO transfer API for partition writing.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.

* Migrate unsafe shuffle writer to use new byte channel API.

* More sane implementation for unsafe

* Fix style

* Address comments

* Fix imports

* Fix build

* Fix more build problems

* Address comments.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants