Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
first pass of comments
  • Loading branch information
ifilonenko committed Mar 20, 2019
commit 460f0ea468576f9b47fdccb8f49685b1293db5e7
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.*;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use * import

import javax.annotation.Nullable;

import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import scala.None$;
import scala.Option;
import scala.Product2;
Expand All @@ -34,6 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.internal.config.package$;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
Expand Down Expand Up @@ -79,9 +79,11 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final BlockManager blockManager;
private final Partitioner partitioner;
private final ShuffleWriteMetricsReporter writeMetrics;
private final String appId;
private final int shuffleId;
private final int mapId;
private final Serializer serializer;
private final ShuffleExecutorComponents shuffleExecutorComponents;
private final IndexShuffleBlockResolver shuffleBlockResolver;

/** Array of file writers, one for each partition */
Expand All @@ -103,70 +105,76 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics) {
ShuffleWriteMetricsReporter writeMetrics,
ShuffleExecutorComponents shuffleExecutorComponents) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can pass in just the ShuffleWriteSupport because you won't need the reader components here.

// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.appId = conf.getAppId();
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
this.shuffleExecutorComponents = shuffleExecutorComponents;
}

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.writes()
.createMapOutputWriter(appId, shuffleId, mapId, numPartitions);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question: can the appId actually just be passed to the ShuffleMapOutputWriter through the ShuffleDataIO? It should be part of the sparkConf and shouldn't change in the executors right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can, but its doesn't add much computation besides the above call to getAppId() so it seems pretty unintrusive.
However, the API was built so that you call:

  public ShuffleMapOutputWriter createMapOutputWriter(
      String appId,
      int shuffleId,
      int mapId,
      int numPartitions)

so I am a bit bounded by that :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I'm just wondering whether we need that in the API or not, since some implementations, like this refactor one that we're doing, don't necessarily need it, although all remote implementations might. @mccheah thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can change the API, it should be passed through ShuffleDataIO - maybe ShuffleExecutorComponents#initialize?

try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionWriters[i]) {
partitionWriterSegments[i] = writer.commitAndGet();
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
}

File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionWriters[i]) {
partitionWriterSegments[i] = writer.commitAndGet();
}
}

partitionLengths = writePartitionedData(mapOutputWriter);
mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
} catch (Exception e2) {
logger.error("Failed to abort the writer after failing to write map output.", e2);
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

@VisibleForTesting
Expand All @@ -179,37 +187,38 @@ long[] getPartitionLengths() {
*
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
*/
private long[] writePartitionedFile(File outputFile) throws IOException {
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
boolean copyThrewException = true;
// TODO: Enable transferTo
ShufflePartitionWriter writer = mapOutputWriter.getNextPartitionWriter();
try (OutputStream tempOutputStream = writer.openStream()) {
if (file.exists()) {
FileInputStream in = new FileInputStream(file);
try {
Utils.copyStream(in, tempOutputStream, false, false);
Copy link

@yifeih yifeih Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So using Utils.copyStream actually assumes that the outputStream is a FileOutputStream in order to take advantage of the transferTo logic inside that function. The function also does its own bufferring with an array of a fixed size. I wonder if we should just have the DefaultShuffleMapOutputWriter just deal with generic FileOutputStreams and wrap them with buffered output writers if necessary? It would make the DefaultShuffleMapOutputWriter much cleaner too so we're not keeping track of so many layers of outputStreams

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on second thought, you probably just want to fork the code paths here into a "transferTo" block and a "outputstreams" block to allow more flexibility in the plugins to return whatever outputstreams they want.

copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
}
}
lengths[i] = writer.getLength();
if (file.exists() && !file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@
import java.io.OutputStream;

public abstract class DefaultShuffleBlockOutputStream extends OutputStream {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't it just be a class with the implementation contents instead of an abstract class? Will there ever need to be other implementations of this?


public abstract int getCount();

public abstract int getCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ public ShuffleWriteSupport writes() {
return new DefaultShuffleWriteSupport(
sparkConf, blockResolver, metrics.shuffleWriteMetrics());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.shuffle.sort.io;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,13 +40,12 @@ public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {

private final int shuffleId;
private final int mapId;
private final int numPartitions;
private final ShuffleWriteMetricsReporter metrics;
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
private ShufflePartitionWriter[] partitionWriters;
private final int bufferSize;
private int currPartitionId = 0;
private boolean successfulWrite = false;

private final File outputFile;
private final File outputTempFile;
Expand All @@ -63,45 +62,38 @@ public DefaultShuffleMapOutputWriter(
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.numPartitions = numPartitions;
this.metrics = metrics;
this.blockResolver = blockResolver;
this.bufferSize =
(int) (long) sparkConf.get(
package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
this.partitionLengths = new long[numPartitions];
this.partitionWriters = new ShufflePartitionWriter[numPartitions];
this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
this.outputTempFile = Utils.tempFileWith(outputFile);
}

@Override
public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
initStream();
ShufflePartitionWriter shufflePartitionWriter =
new DefaultShufflePartitionWriter(
new DefaultShuffleBlockOutputStreamImpl());
partitionWriters[currPartitionId++] = shufflePartitionWriter;
return shufflePartitionWriter;
return new DefaultShufflePartitionWriter(currPartitionId++);
}

@Override
public void commitAllPartitions() throws IOException {
for (int pId = 0; pId < numPartitions; pId ++) {
partitionLengths[pId] = partitionWriters[pId].getLength();
}
cleanUp();
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile);
// TODO: Maybe monitor a little more intelligably then checking files?
if (!outputFile.getParentFile().isDirectory() && !outputFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Failed to create shuffle file directory at %s.",
outputFile.getParentFile().getAbsolutePath()));
}
if (!outputFile.isFile() && !outputFile.createNewFile()) {
throw new IOException(
String.format(
"Failed to create empty shuffle file at %s.", outputFile.getAbsolutePath()));
if (!successfulWrite) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a number of ways this can be done, right? For instance, can't we check that the file doesn't exist, and create it? That might be easier to read than using a boolean flag.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens later? its easier to ensure that some form of write happened by either the byte stream or byte channel as an initial filter, I would think

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The action of committing would create the file if any bytes were written. But if no bytes are written, there would be no output file, right? Would be good to verify these behaviors by running BypassMergeSortShuffleWriterSuite locally and seeing what happens.

if (!outputFile.getParentFile().isDirectory() && !outputFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Failed to create shuffle file directory at %s.",
outputFile.getParentFile().getAbsolutePath()));
}
if (!outputFile.isFile() && !outputFile.createNewFile()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, i'm confused by this block. You're checking whether the output file is there, and then trying to create it? Shouldn't you be throwing if it's not there regardless of whether you can create it or not because when blockResolver.writeIndexFileAndCommit() is done, the file should be there? But let me know if I'm misunderstanding this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the committer didn't create the file, we want to try to create an empty one by default, I think. But it's strange to me that shuffles can write empty files. Was this the case before our plugin refactor?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh i see... well, looking at this code, I don't think it actually writes an empty file if there are no records: https://github.com/bloomberg/apache-spark-on-k8s/pull/6/files#diff-8b6b7a5dadc0d8e97307d0f8e8378d8fL126

throw new IOException(
String.format(
"Failed to create empty shuffle file at %s.", outputFile.getAbsolutePath()));
}
}
}

Expand Down Expand Up @@ -138,10 +130,42 @@ private void initStream() throws IOException {
}
}

private class DefaultShuffleBlockOutputStreamImpl extends DefaultShuffleBlockOutputStream {
private class DefaultShufflePartitionWriter implements ShufflePartitionWriter {

private boolean isClosed = false;
private final int partitionId;
private final DefaultShuffleBlockOutputStream stream;

private DefaultShufflePartitionWriter(int partitionId) {
this.partitionId = partitionId;
this.stream = new DefaultShuffleBlockOutputStreamImpl();
}

@Override
public OutputStream openStream() throws IOException {
return stream;
}

@Override
public long getLength() {
try {
stream.close();
} catch (Exception e) {
log.error("Error with closing stream for partition", e);
}
int length = stream.getCount();
partitionLengths[partitionId] = length;
return length;
}

@Override
public WritableByteChannel openChannel() throws IOException {
return Channels.newChannel(outputFileStream);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is strange - we create the newChannel around the outputFileStream, but openStream returns the DefaultShuffleBlockOutputStream. This can lead to an inconsistency in the count returned by this writer, because if we return the FileChannel here, the counter from the DefaultShuffleBlockOutputStream will not be reflected.

I think most of this can be caught by running BypassMergeSortShuffleWriterSuite locally.

}
}

private class DefaultShuffleBlockOutputStreamImpl extends DefaultShuffleBlockOutputStream {
private int count = 0;
private boolean isClosed = false;

@Override
public int getCount() {
Expand All @@ -153,6 +177,7 @@ public void write(int b) throws IOException {
if (isClosed) {
throw new IllegalStateException("Attempting to write to a closed block byte channel.");
}
successfulWrite = true;
outputBufferedFileStream.write(b);
count++;
}
Expand Down
Loading