-
Notifications
You must be signed in to change notification settings - Fork 6
[SPARK-25299] Implement default version of the API for shuffle writes #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mccheah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial review. But the behavior here is tricky - I think we'd get clearer validation that we're doing things correctly if we integrated this into one of the shuffle writers and ran the unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, looking at this more carefully I don't think this is going to have the transferTo behavior that we expect. The reason for this is that Channels.newChannel knows how to make a FileChannel from a FileOutputStream specifically - it checks instanceOf. But here, Channels.newChannel would be passed a DefaultShuffleBlockOutputStream, which is not a FileOutputStream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, in its current form, it would return a WritableByteChannelImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation should be 4 spaces from public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is rare, but in this case I think it makes more sense for this to be a static inner class of DefaultMapOutputWriter - the interactions between the length stored here and usage in DefaultMapOutputWriter#commitAllPartitions would be clearer this way. But I'd like a second opinion from @yifeih.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I am okay with either. I thought this was more isolated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference either way, although I did find myself needed to go to the other class to understand this one, so placing it together could help.
core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing an underlying output stream I believe implies flushing it, but double check this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, very true. close() I believe always calls flush()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check both here and in the next clause that the outputTempFile exists first. No point logging a warning trying to delete a file that never existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When aborting, we don't want cleanup to throw an exception that halts the rest of this method - we want to always be able to attempt to delete the files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't it just be a class with the implementation contents instead of an abstract class? Will there ever need to be other implementations of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you always want to use a BufferedOutputStream here? BypassMergeSortShuffleWriter doesn't use it at all I believe, although I'm not sure if it's only because then it can support transferTo, so this might be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm should closing the outputBufferedFileStream close this stream as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference either way, although I did find myself needed to go to the other class to understand this one, so placing it together could help.
core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
c977121 to
460f0ea
Compare
| } | ||
| }) | ||
| when(shuffleExecutorComponents.writes()).thenReturn(writeSupport) | ||
| when(writeSupport.createMapOutputWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use a real DefaultShuffleWriteSupport? This is an integration test so we can feel free to use real objects if they're on the critical path.
| import java.io.FileInputStream; | ||
| import java.io.FileOutputStream; | ||
| import java.io.IOException; | ||
| import java.io.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use * import
| public void commitAllPartitions() throws IOException { | ||
| cleanUp(); | ||
| blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); | ||
| if (!successfulWrite) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a number of ways this can be done, right? For instance, can't we check that the file doesn't exist, and create it? That might be easier to read than using a boolean flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That happens later? its easier to ensure that some form of write happened by either the byte stream or byte channel as an initial filter, I would think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The action of committing would create the file if any bytes were written. But if no bytes are written, there would be no output file, right? Would be good to verify these behaviors by running BypassMergeSortShuffleWriterSuite locally and seeing what happens.
|
|
||
| @Override | ||
| public WritableByteChannel openChannel() throws IOException { | ||
| return Channels.newChannel(outputFileStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange - we create the newChannel around the outputFileStream, but openStream returns the DefaultShuffleBlockOutputStream. This can lead to an inconsistency in the count returned by this writer, because if we return the FileChannel here, the counter from the DefaultShuffleBlockOutputStream will not be reflected.
I think most of this can be caught by running BypassMergeSortShuffleWriterSuite locally.
| SparkConf conf, | ||
| ShuffleWriteMetricsReporter writeMetrics) { | ||
| ShuffleWriteMetricsReporter writeMetrics, | ||
| ShuffleExecutorComponents shuffleExecutorComponents) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can pass in just the ShuffleWriteSupport because you won't need the reader components here.
| if (file.exists()) { | ||
| FileInputStream in = new FileInputStream(file); | ||
| try { | ||
| Utils.copyStream(in, tempOutputStream, false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So using Utils.copyStream actually assumes that the outputStream is a FileOutputStream in order to take advantage of the transferTo logic inside that function. The function also does its own bufferring with an array of a fixed size. I wonder if we should just have the DefaultShuffleMapOutputWriter just deal with generic FileOutputStreams and wrap them with buffered output writers if necessary? It would make the DefaultShuffleMapOutputWriter much cleaner too so we're not keeping track of so many layers of outputStreams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, on second thought, you probably just want to fork the code paths here into a "transferTo" block and a "outputstreams" block to allow more flexibility in the plugins to return whatever outputstreams they want.
| mapId: Int, | ||
| context: TaskContext, | ||
| metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { | ||
| initializeExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You dont' want to to do this at every getWriter call, but only once per executor. Since there's only one SortShuffleManager per executor, I think you can do it as a private variable in this class. You can get rid of the initialized flag then too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
| private class DefaultShufflePartitionWriter implements ShufflePartitionWriter { | ||
|
|
||
| private final int partitionId; | ||
| private DefaultShuffleBlockOutputStream stream = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to rename these to something a bit clearer for their purpose - maybe partitionWriterStream and partitionWriterChannel. While it's more verbose, it makes it clear that these are only for writing the given partition - given that there's outside context that writes to the map output file. I got confused in https://github.com/bloomberg/apache-spark-on-k8s/pull/6/files#r268301953 because of the naming of these vars.
| try { | ||
| channel.close(); | ||
| } catch (Exception e) { | ||
| log.error("Error with closing channel for partition", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we throw if we fail to close here?
| try { | ||
| stream.close(); | ||
| } catch (Exception e) { | ||
| log.error("Error with closing stream for partition", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly might want to consider throwing here (need to make sure that interrupting the control flow retains correctness though).
| } | ||
|
|
||
| if (outputFileStream != null) { | ||
| outputFileStream.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this close the file channel as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, looking at the implementation of FileOutputStream.close(), calling close on the outputFileStream will also close the outputFileChannel
| if (!outputFile.delete() && outputFile.exists()) { | ||
| log.warn("Failed to delete outputshuffle file at {}", outputFile.getAbsolutePath()); | ||
| } | ||
| cleanUp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanup will close streams but the files will be deleted - will that throw an exception? I think what we want is something like this:
try {
cleanup();
} catch (IOException e) {
logger.error(...., e);
}
deleteFiles();
|
|
||
| private void cleanUp() throws IOException { | ||
| if (outputBufferedFileStream != null) { | ||
| outputBufferedFileStream.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing the buffered output stream will close the underlying file stream. Is closing the same file output stream twice prone to throwing an error? If it doesn't throw an error than this is fine (would rather be explicit about all the resources we're closing).
yifeih
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall, looks great! left a few comments and questions
| partitionWriters[partitioner.getPartition(key)].write(key, record._2()); | ||
| } | ||
| ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport | ||
| .createMapOutputWriter(appId, shuffleId, mapId, numPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick question: can the appId actually just be passed to the ShuffleMapOutputWriter through the ShuffleDataIO? It should be part of the sparkConf and shouldn't change in the executors right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can, but its doesn't add much computation besides the above call to getAppId() so it seems pretty unintrusive.
However, the API was built so that you call:
public ShuffleMapOutputWriter createMapOutputWriter(
String appId,
int shuffleId,
int mapId,
int numPartitions)
so I am a bit bounded by that :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I'm just wondering whether we need that in the API or not, since some implementations, like this refactor one that we're doing, don't necessarily need it, although all remote implementations might. @mccheah thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we can change the API, it should be passed through ShuffleDataIO - maybe ShuffleExecutorComponents#initialize?
| "Failed to create shuffle file directory at %s.", | ||
| outputFile.getParentFile().getAbsolutePath())); | ||
| } | ||
| if (!outputFile.isFile() && !outputFile.createNewFile()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, i'm confused by this block. You're checking whether the output file is there, and then trying to create it? Shouldn't you be throwing if it's not there regardless of whether you can create it or not because when blockResolver.writeIndexFileAndCommit() is done, the file should be there? But let me know if I'm misunderstanding this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the committer didn't create the file, we want to try to create an empty one by default, I think. But it's strange to me that shuffles can write empty files. Was this the case before our plugin refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh i see... well, looking at this code, I don't think it actually writes an empty file if there are no records: https://github.com/bloomberg/apache-spark-on-k8s/pull/6/files#diff-8b6b7a5dadc0d8e97307d0f8e8378d8fL126
| } | ||
| } | ||
|
|
||
| private void cleanUp() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're closing the streams, but I noticed that Channel also has a close() method. Do you need to call close() on the outputFileChannel too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh I am wrong... closing the fileOutputStream closes the channel :P
| } | ||
|
|
||
| @Override | ||
| public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need some equivalent of flush() here, especially since the method we use to write to the WritableByteChannel is called Utils.copyFileStreamNIO, which is non-blocking, so I'm not sure we're guaranteed to have written everything by the time getCount() is called. I was trying to do some research on byteChannels and the closest thing I found was force()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah any ideas here, I am a bit worried about this being non-blocking as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at Utils.copyFileStreamNIO which doesn't have any indication that it's non-blocking, any idea why we think it is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed NIO stood for non-blocking IO. The javadoc for the underlying method that it's using, FileChannel.transferTo() also implies that the underlying channel could be non-blocking. https://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#transferTo(long,%20long,%20java.nio.channels.WritableByteChannel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah and I just discussed some of this offline.
The FileChannel.transferTo() implementation checks for whether the WriteableByteChannel is an instance of FileChannelImpl and behaves different if it isn't. Because we're not passing in a FileChannelImpl, this might actually behave in ways that we don't expect that deviate from the current codepath. Therefore, we shouldn't be wrapping the the FileChannel with another WritableByteChannel as we're currently doing.
This limits our choices in how we can get the lengths to return in DefaultShufflePartitionWriter.getLengths(). The simplest way seems to be to use FileChannel.position() calls to get the positions when instantiating the byte channel and when closing the byte channel in DefaultShufflePartitionWriter.getLengths().
Also, looking at the DefaultShufflePartitionWriter.getLengths() method, since we want to ensure that the shuffle partitions writers are all closed after getting the lengths, perhaps we should rename this method to close() so that it's clear what's should be happening (i.e. you can only get the lengths once everything is written)
| |after transferTo, please check your kernel version to see if it is 2.6.32, | ||
| |this is a kernel bug which will lead to unexpected behavior when using transferTo. | ||
| |You can set spark.file.transferTo = false to disable this NIO feature. | ||
| """.stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read the links, and it seems to be a kernel bug? is this safe to delete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no position in writeableByteChannels :/ soo... idk ... meep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep this check it should be in the implementation of ShuffleMapOutputWriter I would guess. Maybe in close / abort / commit etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps i'm missing something, but how do you get the expected position in the ShuffleMapOutputWriter for the assert? If that's not easy to get, I would say it's better to keep this here with a if-statement to check if it's an instance of FileChannel to maintain the same behavior as before.
| public void intitializeExecutor(String appId, String execId) { | ||
| blockManager = SparkEnv.get().blockManager(); | ||
| blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager); | ||
| metrics = TaskContext.get().taskMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, i don't think you want this here actually. The TaskContext is associated with each individual shuffle task, but you're only calling initializeExecutor once per executor. You want to get the metrics from TaskContext after you have the set the taskContext here (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L102) for each task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah if so, can we pass metrics into the ShuffleMapOutputWriter so that it can be properly mocked (can't mock Static methods).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a few ways you can do it. You can pass it in as you suggested. In this particular case, you can also call TaskContext.set(mockTaskContext) to initialize the static variable before the test. I prefer the latter way, but you could make a case for either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you might want to pass it through so that concurrent tests don't collide...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that TaskMetrics is currently marked as DeveloperApi which means it's questionable to pass it in to a public API. We could proposed some alternative metrics API that delegates to the Spark default TaskMetrics API. But I think for now we can use TaskContext#get from inside the writer and then in tests call TaskContext#setTaskContext. I took a closer look and a lot of tests use TaskContext#setTaskContext indicating they don't anticipate tests to be run in parallel in the same JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make sure we call TaskContext#unset appropriate after each test.
core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java
Show resolved
Hide resolved
| default WritableByteChannel openChannel() throws IOException { | ||
| return Channels.newChannel(openStream()); | ||
| } | ||
| FileChannel openChannel() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should still return WritableByteChannel, but the implementation returns FileChannel in DefaultShufflePartitionWriter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in Utils#transferTo will eventually check instanceOf this WritableByteChannel and optimize on FileChannel accordingly,
Proposes the following changes to the API: - closeAndGetLength() is split into separate close() and getNumBytesWritten() operations. - openChannel and openStream renamed to toChannel and toStream Proposes the following changes to the implementation: - close() in the default implementation now persists the length in the partitionLengths array - getNumBytesWritten() doesn't necessitate the writer's resources to be closed ahead of it - Don't close the stream in BypassMergeSortShuffleWriter - only close it in DefaultShufflePartitionWriter#close (for consistency with how we treat channels)
|
@yifeih for +1 :) |
yifeih
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great! just had a few small thoughts that might or might not be valid
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
Show resolved
Hide resolved
| "Failed to create shuffle file directory at %s.", | ||
| outputFile.getParentFile().getAbsolutePath())); | ||
| } | ||
| if (!outputFile.isFile() && !outputFile.createNewFile()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh i see... well, looking at this code, I don't think it actually writes an empty file if there are no records: https://github.com/bloomberg/apache-spark-on-k8s/pull/6/files#diff-8b6b7a5dadc0d8e97307d0f8e8378d8fL126
| * <p> | ||
| * Note that the default version of {@link #toChannel()} returns a {@link WritableByteChannel} | ||
| * that does not itself need to be closed up front; only the underlying output stream given by | ||
| * {@link #toStream()} must be closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking at the Channels.newChannel() implementation, and it uses ReadableByteChannelImpl, which all it seems to do is create an in-memory buffer, but it'll get garbage collected when the WritableByteChannel falls out of scope, so I think this is fine to not close that channel for now. However, if the Channels.newChannel() implementation changes to require additional resources, then we're out of luck, but that might not even be a valid concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can cross that bridge when we get there - might be something to check for future versions of Java.
| } | ||
|
|
||
| if (outputFileStream != null) { | ||
| outputFileStream.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, looking at the implementation of FileOutputStream.close(), calling close on the outputFileStream will also close the outputFileChannel
| } | ||
| } | ||
|
|
||
| private void cleanUp() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh I am wrong... closing the fileOutputStream closes the channel :P
|
@ifilonenko I dug into the making of the temp file. If you look at https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L126, it's actually calling in with the To preserve functionality as closely as possibly (in case there are other side effects down that code path), it's probably better to set |
| } catch (Exception e) { | ||
| log.error("Unable to close appropriate underlying file stream", e); | ||
| } | ||
| if (!outputTempFile.delete() && outputTempFile.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for null
|
+1 from me after #6 (comment) |
| if (!outputTempFile.delete() && outputTempFile.exists()) { | ||
| log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath()); | ||
| } | ||
| if (!outputFile.delete() && outputFile.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I don't think we want to delete here - a concurrent attempt shouldn't have its results removed if this attempt fails. I think this lines up with the behavior from the original shuffle writer.
|
looks good from my end! thanks for working on this! |
| log.error("Unable to close appropriate underlying file stream", e); | ||
| } | ||
| if (outputTempFile != null) { | ||
| if (!outputTempFile.delete() && outputTempFile.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch the ordering of the statements in this boolean condition.
…#6) Implement default version of the API that would be used across all shuffle writers, writes to local disk. Shuffle Writes [2/6] [3/6] [5/6]
…524) Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files. The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead. APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers. Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`. Ported from bloomberg#6, credits to @ifilonenko.
…pache-spark-on-k8s#524) Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files. The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead. APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers. Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`. Ported from #6, credits to @ifilonenko.
…524) Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files. The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead. APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers. Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`. Ported from bloomberg#6, credits to @ifilonenko.
…enkins's test results ### What changes were proposed in this pull request? See https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/109834/testReport/junit/org.apache.spark.sql/SQLQueryTestSuite/  ```xml <?xml version="1.0" encoding="UTF-8"?> <testsuite hostname="C02Y52ZLJGH5" name="org.apache.spark.sql.SQLQueryTestSuite" tests="3" errors="0" failures="0" skipped="0" time="14.475"> ... <testcase classname="org.apache.spark.sql.SQLQueryTestSuite" name="sql - Scala UDF" time="6.703"> </testcase> <testcase classname="org.apache.spark.sql.SQLQueryTestSuite" name="sql - Regular Python UDF" time="4.442"> </testcase> <testcase classname="org.apache.spark.sql.SQLQueryTestSuite" name="sql - Scalar Pandas UDF" time="3.33"> </testcase> <system-out/> <system-err/> </testsuite> ``` Root cause seems a bug in SBT - it truncates the test name based on the last dot. sbt/sbt#2949 https://github.com/sbt/sbt/blob/v0.13.18/testing/src/main/scala/sbt/JUnitXmlTestsListener.scala#L71-L79 I tried to find a better way but couldn't find. Therefore, this PR proposes a workaround by appending the test file name into the assert log: ```diff [info] - inner-join.sql *** FAILED *** (4 seconds, 306 milliseconds) + [info] inner-join.sql [info] Expected "1 a [info] 1 a [info] 1 b [info] 1[]", but got "1 a [info] 1 a [info] 1 b [info] 1[ b]" Result did not match for query #6 [info] SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag (SQLQueryTestSuite.scala:377) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528) ``` It will at least prevent us to search full logs to identify which test file is failed by clicking filed test. Note that this PR does not fully fix the issue but only fix the logs on its failed tests. ### Why are the changes needed? To debug Jenkins logs easier. Otherwise, we should open full logs and search which test was failed. ### Does this PR introduce any user-facing change? It will print out the file name of failed tests in Jenkins' test reports. ### How was this patch tested? Manually tested but Jenkins tests are required in this PR. Now it at least shows which file it is:  Closes apache#25630 from HyukjinKwon/SPARK-28894-1. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Implement default version of the API that would be used across all shuffle writers, writes to local disk.
Shuffle Writes [2/6] [3/6] [5/6]
How was this patch tested?
Compiled and unit tests