Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
musings without unit tests
  • Loading branch information
ifilonenko committed Mar 19, 2019
commit 7160ce3121086fc4d247195668048d13a124ab8c
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.io;

import java.io.OutputStream;

public abstract class DefaultShuffleBlockOutputStream extends OutputStream {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't it just be a class with the implementation contents instead of an abstract class? Will there ever need to be other implementations of this?


public abstract int getCount();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.io;

import org.apache.spark.SparkConf;
import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleDataIO;

public class DefaultShuffleDataIO implements ShuffleDataIO {

private final SparkConf sparkConf;

public DefaultShuffleDataIO(SparkConf sparkConf) {
this.sparkConf = sparkConf;
}


@Override
public ShuffleExecutorComponents executor() {
return new DefaultShuffleExecutorComponents(sparkConf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.io;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.storage.BlockManager;

public class DefaultShuffleExecutorComponents implements ShuffleExecutorComponents {

private final SparkConf sparkConf;
private BlockManager blockManager;
private IndexShuffleBlockResolver blockResolver;
private TaskMetrics metrics;

public DefaultShuffleExecutorComponents(SparkConf sparkConf) {
this.sparkConf = sparkConf;
}

@Override
public void intitializeExecutor(String appId, String execId) {
blockManager = SparkEnv.get().blockManager();
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
metrics = TaskContext.get().taskMetrics();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, i don't think you want this here actually. The TaskContext is associated with each individual shuffle task, but you're only calling initializeExecutor once per executor. You want to get the metrics from TaskContext after you have the set the taskContext here (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L102) for each task

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah if so, can we pass metrics into the ShuffleMapOutputWriter so that it can be properly mocked (can't mock Static methods).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a few ways you can do it. You can pass it in as you suggested. In this particular case, you can also call TaskContext.set(mockTaskContext) to initialize the static variable before the test. I prefer the latter way, but you could make a case for either.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you might want to pass it through so that concurrent tests don't collide...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that TaskMetrics is currently marked as DeveloperApi which means it's questionable to pass it in to a public API. We could proposed some alternative metrics API that delegates to the Spark default TaskMetrics API. But I think for now we can use TaskContext#get from inside the writer and then in tests call TaskContext#setTaskContext. I took a closer look and a lot of tests use TaskContext#setTaskContext indicating they don't anticipate tests to be run in parallel in the same JVM.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make sure we call TaskContext#unset appropriate after each test.

}

@Override
public ShuffleWriteSupport writes() {
if (blockResolver == null || metrics == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
}

return new DefaultShuffleWriteSupport(
sparkConf, blockResolver, metrics.shuffleWriteMetrics());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.io;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.internal.config.package$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.util.Utils;

public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {

private static final Logger log =
LoggerFactory.getLogger(DefaultShuffleMapOutputWriter.class);

private final int shuffleId;
private final int mapId;
private final int numPartitions;
private final ShuffleWriteMetricsReporter metrics;
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
private ShufflePartitionWriter[] partitionWriters;
private final int bufferSize;
private int currPartitionId = 0;

private final File outputFile;
private final File outputTempFile;
private FileOutputStream outputFileStream;
private TimeTrackingOutputStream ts;
private BufferedOutputStream outputBufferedFileStream;

public DefaultShuffleMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions,
ShuffleWriteMetricsReporter metrics,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.numPartitions = numPartitions;
this.metrics = metrics;
this.blockResolver = blockResolver;
this.bufferSize =
(int) (long) sparkConf.get(
package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
this.partitionLengths = new long[numPartitions];
this.partitionWriters = new ShufflePartitionWriter[numPartitions];
this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
this.outputTempFile = Utils.tempFileWith(outputFile);
}

@Override
public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
initStream();
ShufflePartitionWriter shufflePartitionWriter =
new DefaultShufflePartitionWriter(
new DefaultShuffleBlockOutputStreamImpl());
partitionWriters[currPartitionId++] = shufflePartitionWriter;
return shufflePartitionWriter;
}

@Override
public void commitAllPartitions() throws IOException {
for (int pId = 0; pId < numPartitions; pId ++) {
partitionLengths[pId] = partitionWriters[pId].getLength();
}
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile);
// TODO: Maybe monitor a little more intelligably then checking files?
if (!outputFile.getParentFile().isDirectory() && !outputFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Failed to create shuffle file directory at %s.",
outputFile.getParentFile().getAbsolutePath()));
}
if (!outputFile.isFile() && !outputFile.createNewFile()) {
throw new IOException(
String.format(
"Failed to create empty shuffle file at %s.", outputFile.getAbsolutePath()));
}
}

@Override
public void abort(Throwable error) throws IOException {
cleanUp();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When aborting, we don't want cleanup to throw an exception that halts the rest of this method - we want to always be able to attempt to delete the files.

if (!outputTempFile.delete()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check both here and in the next clause that the outputTempFile exists first. No point logging a warning trying to delete a file that never existed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath());
}
if (!outputFile.delete()) {
log.warn("Failed to delete outputshuffle file at {}", outputTempFile.getAbsolutePath());
}
}

private void cleanUp() throws IOException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're closing the streams, but I noticed that Channel also has a close() method. Do you need to call close() on the outputFileChannel too?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh I am wrong... closing the fileOutputStream closes the channel :P

if (outputBufferedFileStream != null) {
outputBufferedFileStream.flush();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing an underlying output stream I believe implies flushing it, but double check this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, very true. close() I believe always calls flush()

outputBufferedFileStream.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the buffered output stream will close the underlying file stream. Is closing the same file output stream twice prone to throwing an error? If it doesn't throw an error than this is fine (would rather be explicit about all the resources we're closing).

}

if (outputFileStream != null) {
outputFileStream.flush();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm should closing the outputBufferedFileStream close this stream as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

outputFileStream.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this close the file channel as well?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looking at the implementation of FileOutputStream.close(), calling close on the outputFileStream will also close the outputFileChannel

}
}

private void initStream() throws IOException {
if (outputFileStream == null) {
outputFileStream = new FileOutputStream(outputTempFile, true);
ts = new TimeTrackingOutputStream(metrics, outputFileStream);
}
if (outputBufferedFileStream == null) {
outputBufferedFileStream = new BufferedOutputStream(ts, bufferSize);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you always want to use a BufferedOutputStream here? BypassMergeSortShuffleWriter doesn't use it at all I believe, although I'm not sure if it's only because then it can support transferTo, so this might be better

}
}

private class DefaultShuffleBlockOutputStreamImpl extends DefaultShuffleBlockOutputStream {

private boolean isClosed = false;
private int count = 0;

@Override
public int getCount() {
return count;
}

@Override
public void write(int b) throws IOException {
if (isClosed) {
throw new IllegalStateException("Attempting to write to a closed block byte channel.");
}
outputBufferedFileStream.write(b);
count++;
}

@Override
public void close() throws IOException {
flush();
isClosed = true;
}

@Override
public void flush() throws IOException {
outputBufferedFileStream.flush();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.io;

import org.apache.spark.api.shuffle.ShufflePartitionWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;

public class DefaultShufflePartitionWriter implements ShufflePartitionWriter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rare, but in this case I think it makes more sense for this to be a static inner class of DefaultMapOutputWriter - the interactions between the length stored here and usage in DefaultMapOutputWriter#commitAllPartitions would be clearer this way. But I'd like a second opinion from @yifeih.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am okay with either. I thought this was more isolated.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference either way, although I did find myself needed to go to the other class to understand this one, so placing it together could help.


private static final Logger log =
LoggerFactory.getLogger(DefaultShufflePartitionWriter.class);

private final DefaultShuffleBlockOutputStream stream;

public DefaultShufflePartitionWriter(
DefaultShuffleBlockOutputStream stream) {
this.stream = stream;
}

@Override
public OutputStream openStream() throws IOException {
return stream;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looking at this more carefully I don't think this is going to have the transferTo behavior that we expect. The reason for this is that Channels.newChannel knows how to make a FileChannel from a FileOutputStream specifically - it checks instanceOf. But here, Channels.newChannel would be passed a DefaultShuffleBlockOutputStream, which is not a FileOutputStream.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, in its current form, it would return a WritableByteChannelImpl

}

@Override
public long getLength() {
try {
stream.close();
} catch (Exception e) {
log.error("Error with closing stream for partition", e);
}
return stream.getCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.io;

import org.apache.spark.SparkConf;
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;

public class DefaultShuffleWriteSupport implements ShuffleWriteSupport {

private final SparkConf sparkConf;
private final IndexShuffleBlockResolver blockResolver;
private final ShuffleWriteMetricsReporter metrics;

public DefaultShuffleWriteSupport(
SparkConf sparkConf,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation should be 4 spaces from public.

IndexShuffleBlockResolver blockResolver,
ShuffleWriteMetricsReporter metrics) {
this.sparkConf = sparkConf;
this.blockResolver = blockResolver;
this.metrics = metrics;
}

@Override
public ShuffleMapOutputWriter createMapOutputWriter(
String appId,
int shuffleId,
int mapId,
int numPartitions) {
return new DefaultShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, metrics, blockResolver, sparkConf);
}
}