Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added tests and resolved comments
  • Loading branch information
ifilonenko committed Mar 23, 2019
commit 0737515d6ccb95b034ddafff66a0c9feee4b61b6
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public DefaultShuffleExecutorComponents(SparkConf sparkConf) {
public void intitializeExecutor(String appId, String execId) {
blockManager = SparkEnv.get().blockManager();
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
metrics = TaskContext.get().taskMetrics();
}

@Override
public ShuffleWriteSupport writes() {
metrics = TaskContext.get().taskMetrics();
if (blockResolver == null || metrics == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ private void cleanUp() throws IOException {
outputBufferedFileStream.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the buffered output stream will close the underlying file stream. Is closing the same file output stream twice prone to throwing an error? If it doesn't throw an error than this is fine (would rather be explicit about all the resources we're closing).

}

if (outputFileChannel != null) {
outputFileChannel.close();
}

if (outputFileStream != null) {
outputFileStream.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this close the file channel as well?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looking at the implementation of FileOutputStream.close(), calling close on the outputFileStream will also close the outputFileChannel

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.shuffle.sort.io

import java.io._
import java.math.BigInteger
import java.nio.{ByteBuffer, ByteOrder}

import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.{any, anyInt}
Expand Down Expand Up @@ -45,6 +47,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
private val D_LEN = 10
private val data: Array[Array[Int]] = (0 until NUM_PARTITIONS).map {
p => (1 to D_LEN).map(_ + p).toArray }.toArray
private var tempFile: File = _
private var mergedOutputFile: File = _
private var tempDir: File = _
private var partitionSizesInMergedFile: Array[Long] = _
Expand All @@ -65,6 +68,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
MockitoAnnotations.initMocks(this)
tempDir = Utils.createTempDir(null, "test")
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir)
tempFile = File.createTempFile("tempfile", "", tempDir)
partitionSizesInMergedFile = null
conf = new SparkConf()
.set("spark.app.id", "example.spark.app")
Expand All @@ -89,12 +93,13 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
0, 0, NUM_PARTITIONS, metricsReporter, blockResolver, conf)
}

private def readRecordsFromFile(): Array[Array[Int]] = {
private def readRecordsFromFile(fromByte: Boolean): Array[Array[Int]] = {
var startOffset = 0L
val result = new Array[Array[Int]](NUM_PARTITIONS)
(0 until NUM_PARTITIONS).foreach { p =>
val partitionSize = partitionSizesInMergedFile(p)
val inner = new Array[Int](D_LEN)
val partitionSize = partitionSizesInMergedFile(p).toInt
lazy val inner = new Array[Int](partitionSize)
lazy val innerBytebuffer = ByteBuffer.allocate(partitionSize)
if (partitionSize > 0) {
val in = new FileInputStream(mergedOutputFile)
in.getChannel.position(startOffset)
Expand All @@ -104,21 +109,32 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
while (nonEmpty) {
try {
val readBit = lin.read()
inner(count) = readBit
if (fromByte) {
innerBytebuffer.put(readBit.toByte)
} else {
inner(count) = readBit
}
count += 1
} catch {
case _: Exception =>
nonEmpty = false
}
}
in.close()
}
if (fromByte) {
result(p) = innerBytebuffer.array().sliding(4, 4).map { b =>
new BigInteger(b).intValue()
}.toArray
} else {
result(p) = inner
}
result(p) = inner
startOffset += partitionSize
}
result
}

test("writing to outputstream") {
test("writing to an outputstream") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val stream = writer.openStream()
Expand All @@ -133,6 +149,74 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft
val partitionLengths = (0 until NUM_PARTITIONS).map { _ => D_LEN.toDouble}.toArray
assert(partitionSizesInMergedFile === partitionLengths)
assert(mergedOutputFile.length() === partitionLengths.sum)
assert(data === readRecordsFromFile())
assert(data === readRecordsFromFile(false))
}

test("writing to a channel") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val channel = writer.openChannel()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
intBuffer.put(data(p))
assert(channel.isOpen)
channel.write(byteBuffer)
channel.close()
intercept[IllegalStateException] {
channel.write(byteBuffer)
}
assert(!channel.isOpen)
// Bytes require * 4
assert(writer.getLength == D_LEN * 4)
}
mapOutputWriter.commitAllPartitions()
val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray
assert(partitionSizesInMergedFile === partitionLengths)
assert(mergedOutputFile.length() === partitionLengths.sum)
assert(data === readRecordsFromFile(true))
}

test("copyStreams with an outputstream") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val stream = writer.openStream()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
intBuffer.put(data(p))
val in: InputStream = new ByteArrayInputStream(byteBuffer.array())
Utils.copyStream(in, stream, false, false)
in.close()
stream.close()
assert(writer.getLength == D_LEN * 4)
}
mapOutputWriter.commitAllPartitions()
val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray
assert(partitionSizesInMergedFile === partitionLengths)
assert(mergedOutputFile.length() === partitionLengths.sum)
assert(data === readRecordsFromFile(true))
}

test("copyStreamsWithNIO with a channel") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val channel = writer.openChannel()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
intBuffer.put(data(p))
val out = new FileOutputStream(tempFile)
out.write(byteBuffer.array())
out.close()
val in = new FileInputStream(tempFile)
Utils.copyFileStreamNIO(in.getChannel, channel, 0, D_LEN * 4)
in.close()
channel.close()
assert(!channel.isOpen)
assert(writer.getLength == D_LEN * 4)
}
mapOutputWriter.commitAllPartitions()
val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN * 4).toDouble}.toArray
assert(partitionSizesInMergedFile === partitionLengths)
assert(mergedOutputFile.length() === partitionLengths.sum)
assert(data === readRecordsFromFile(true))
}
}