Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolved more comments
  • Loading branch information
ifilonenko committed Mar 22, 2019
commit 996e9038be2487bea25977a467fb9736c21c6d63
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {
private final long[] partitionLengths;
private final int bufferSize;
private int currPartitionId = 0;
private boolean successfulWrite = false;

private final File outputFile;
private final File outputTempFile;
Expand Down Expand Up @@ -85,7 +84,7 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
public void commitAllPartitions() throws IOException {
cleanUp();
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile);
if (!successfulWrite) {
if (!outputFile.exists()) {
if (!outputFile.getParentFile().isDirectory() && !outputFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
Expand All @@ -102,13 +101,17 @@ public void commitAllPartitions() throws IOException {

@Override
public void abort(Throwable error) throws IOException {
try {
cleanUp();
} catch (Exception e) {
log.error("Unable to close appropriate underlying file stream", e);
}
if (!outputTempFile.delete() && outputTempFile.exists()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for null

log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath());
}
if (!outputFile.delete() && outputFile.exists()) {
log.warn("Failed to delete outputshuffle file at {}", outputFile.getAbsolutePath());
}
cleanUp();
}

private void cleanUp() throws IOException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're closing the streams, but I noticed that Channel also has a close() method. Do you need to call close() on the outputFileChannel too?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh I am wrong... closing the fileOutputStream closes the channel :P

Expand Down Expand Up @@ -143,16 +146,16 @@ private void initChannel() throws IOException {
private class DefaultShufflePartitionWriter implements ShufflePartitionWriter {

private final int partitionId;
private DefaultShuffleBlockOutputStream stream = null;
private DefaultShuffleBlockByteChannel channel = null;
private PartitionWriterStream stream = null;
private PartitionWriterChannel channel = null;

private DefaultShufflePartitionWriter(int partitionId) {
this.partitionId = partitionId;
}

@Override
public OutputStream openStream() throws IOException {
stream = new DefaultShuffleBlockOutputStream();
stream = new PartitionWriterStream();
return stream;
}

Expand All @@ -162,7 +165,7 @@ public long getLength() {
try {
channel.close();
} catch (Exception e) {
log.error("Error with closing channel for partition", e);
throw new IllegalStateException("Attempting to close byte channel", e);
}
int length = channel.getCount();
partitionLengths[partitionId] = length;
Expand All @@ -171,7 +174,7 @@ public long getLength() {
try {
stream.close();
} catch (Exception e) {
log.error("Error with closing stream for partition", e);
throw new IllegalStateException("Attempting to close output stream", e);
}
int length = stream.getCount();
partitionLengths[partitionId] = length;
Expand All @@ -181,12 +184,12 @@ public long getLength() {

@Override
public WritableByteChannel openChannel() throws IOException {
channel = new DefaultShuffleBlockByteChannel();
channel = new PartitionWriterChannel();
return channel;
}
}

private class DefaultShuffleBlockOutputStream extends OutputStream {
private class PartitionWriterStream extends OutputStream {
private int count = 0;
private boolean isClosed = false;

Expand All @@ -199,7 +202,6 @@ public void write(int b) throws IOException {
if (isClosed) {
throw new IllegalStateException("Attempting to write to a closed block output stream.");
}
successfulWrite = true;
outputBufferedFileStream.write(b);
count++;
}
Expand All @@ -216,7 +218,7 @@ public void flush() throws IOException {
}
}

private class DefaultShuffleBlockByteChannel implements WritableByteChannel {
private class PartitionWriterChannel implements WritableByteChannel {

private int count = 0;
private boolean isClosed = false;
Expand All @@ -230,7 +232,6 @@ public int write(ByteBuffer src) throws IOException {
if (isClosed) {
throw new IllegalStateException("Attempting to write to a closed block byte channel.");
}
successfulWrite = true;
int written = outputFileChannel.write(src);
count += written;
return written;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.shuffle.sort

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.shuffle.sort.io.{DefaultShuffleExecutorComponents, DefaultShuffleWriteSupport}
import org.apache.spark.util.Utils
import org.apache.spark.shuffle.sort.io.{DefaultShuffleWriteSupport}

/**
* Benchmark to measure performance for aggregate primitives.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach

import org.apache.spark._
import org.apache.spark.api.shuffle.{ShuffleMapOutputWriter, ShuffleWriteSupport}
import org.apache.spark.api.shuffle.ShuffleWriteSupport
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.shuffle.sort.io.{DefaultShuffleMapOutputWriter, DefaultShuffleWriteSupport}
import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport
import org.apache.spark.storage._
import org.apache.spark.util.Utils

Expand All @@ -47,11 +47,11 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
@Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _
@Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _
@Mock(answer = RETURNS_SMART_NULLS) private var dependency: ShuffleDependency[Int, Int, Int] = _
@Mock(answer = RETURNS_SMART_NULLS) private var writeSupport: ShuffleWriteSupport = _

private var taskMetrics: TaskMetrics = _
private var tempDir: File = _
private var outputFile: File = _
private var writeSupport: ShuffleWriteSupport = _
private val conf: SparkConf = new SparkConf(loadDefaults = false)
.set("spark.app.id", "sampleApp")
private val temporaryFilesCreated: mutable.Buffer[File] = new ArrayBuffer[File]()
Expand Down Expand Up @@ -122,25 +122,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
blockIdToFileMap.get(invocation.getArguments.head.asInstanceOf[BlockId]).get
}
})
when(writeSupport.createMapOutputWriter(
anyString(),
anyInt(),
anyInt(),
anyInt()))
.thenAnswer(new Answer[ShuffleMapOutputWriter] {
override def answer(invocationOnMock: InvocationOnMock): ShuffleMapOutputWriter = {
val shuffleId = invocationOnMock.getArgument(1).asInstanceOf[Int]
val mapId = invocationOnMock.getArgument(2).asInstanceOf[Int]
val numPartitions = invocationOnMock.getArgument(3).asInstanceOf[Int]
new DefaultShuffleMapOutputWriter(
shuffleId,
mapId,
numPartitions,
taskContext.taskMetrics().shuffleWriteMetrics,
blockResolver,
conf)
}
})
writeSupport = new DefaultShuffleWriteSupport(
conf, blockResolver, taskContext.taskMetrics().shuffleWriteMetrics)
}

override def afterEach(): Unit = {
Expand Down Expand Up @@ -177,14 +160,41 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
}

test("write with some empty partitions") {
val transferConf = conf.clone.set("spark.file.transferTo", "false")
def records: Iterator[(Int, Int)] =
Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,
blockResolver,
shuffleHandle,
0, // MapId
conf,
transferConf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
)
writer.write(records)
writer.stop( /* success = */ true)
assert(temporaryFilesCreated.nonEmpty)
assert(writer.getPartitionLengths.sum === outputFile.length())
assert(writer.getPartitionLengths.count(_ == 0L) === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics
assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
assert(shuffleWriteMetrics.recordsWritten === records.length)
assert(taskMetrics.diskBytesSpilled === 0)
assert(taskMetrics.memoryBytesSpilled === 0)
}

test("write with some empty partitions with transferTo") {
val transferConf = conf.clone.set("spark.file.transferTo", "true")
def records: Iterator[(Int, Int)] =
Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,
blockResolver,
shuffleHandle,
0, // MapId
transferConf,
taskContext.taskMetrics().shuffleWriteMetrics,
writeSupport
)
Expand Down Expand Up @@ -259,5 +269,4 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
writer.stop( /* success = */ false)
assert(temporaryFilesCreated.count(_.exists()) === 0)
}

}