Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
735eca6
Split MemoryEntry into two separate classes (serialized and deseriali…
JoshRosen Mar 15, 2016
8f08289
Add ChunkedByteBuffer and use it in storage layer.
JoshRosen Mar 15, 2016
79b1a6a
Add test cases and fix bug in ChunkedByteBuffer.toInputStream()
JoshRosen Mar 15, 2016
7dbcd5a
WIP towards understanding destruction.
JoshRosen Mar 15, 2016
3fbec21
Small fixes to dispose behavior.
JoshRosen Mar 15, 2016
e5e663f
Modify BlockManager.dataSerialize to write ChunkedByteBuffers.
JoshRosen Mar 15, 2016
de62f0d
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 16, 2016
0a347fd
Fix test compilation in streaming.
JoshRosen Mar 16, 2016
6852c48
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 16, 2016
43f8fa6
Allow ChunkedByteBuffer to contain no chunks.
JoshRosen Mar 16, 2016
25e6884
Document toByteBuffer() and toArray() size limitations.
JoshRosen Mar 16, 2016
325c83d
Move dispose() from BlockManager to StorageUtils.
JoshRosen Mar 16, 2016
4f5074e
Better documentation for dispose() methods.
JoshRosen Mar 16, 2016
b6ddf3e
Rename limit to size.
JoshRosen Mar 16, 2016
719ad3c
Implement missing InputStream methods.
JoshRosen Mar 16, 2016
2300607
More comments.
JoshRosen Mar 16, 2016
3fc0b66
Fix confusing getChunks().head
JoshRosen Mar 16, 2016
c747c85
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 17, 2016
cb9311b
Fix logging import.
JoshRosen Mar 17, 2016
2970932
Clean up dispose logic to address review comments.
JoshRosen Mar 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rename limit to size.
  • Loading branch information
JoshRosen committed Mar 16, 2016
commit b6ddf3ed40cc90ec94b7e4917808f8a726b597ee
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private[spark] class BlockManager(
*/
def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit)
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
}
}

Expand Down Expand Up @@ -742,7 +742,7 @@ private[spark] class BlockManager(
null
}

val size = bytes.limit
val size = bytes.size

if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
Expand Down Expand Up @@ -950,7 +950,7 @@ private[spark] class BlockManager(
diskBytes.dispose()
memoryStore.getBytes(blockId).get
} else {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit, () => {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we
// cannot put it into MemoryStore, copyForMemory should not be created. That's why
Expand Down Expand Up @@ -1079,15 +1079,15 @@ private[spark] class BlockManager(
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
logTrace(s"Trying to replicate $blockId of ${data.limit} bytes to $peer")
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host,
peer.port,
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
tLevel)
logTrace(s"Replicated $blockId of ${data.limit} bytes to $peer in %s ms"
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
Expand All @@ -1110,7 +1110,7 @@ private[spark] class BlockManager(
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.limit} bytes to " +
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private[spark] class MemoryStore(
if (memoryManager.acquireStorageMemory(blockId, size)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.limit == size)
assert(bytes.size == size)
val entry = new SerializedMemoryEntry(bytes, size)
entries.synchronized {
entries.put(blockId, entry)
Expand Down Expand Up @@ -189,7 +189,7 @@ private[spark] class MemoryStore(
new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
} else {
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
new SerializedMemoryEntry(bytes, bytes.limit)
new SerializedMemoryEntry(bytes, bytes.size)
}
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import io.netty.buffer.{ByteBuf, Unpooled}
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.storage.StorageUtils


private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
require(chunks != null, "chunks must not be null")
require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")

val limit: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum

def this(byteBuffer: ByteBuffer) = {
this(Array(byteBuffer))
Expand All @@ -56,11 +57,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size.
*/
def toArray: Array[Byte] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should document this throws exceptions if size doesn't fit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for toByteBuffer

if (limit >= Integer.MAX_VALUE) {
if (size >= Integer.MAX_VALUE) {
throw new UnsupportedOperationException(
s"cannot call toArray because buffer size ($limit bytes) exceeds maximum array size")
s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size")
}
val byteChannel = new ByteArrayWritableChannel(limit.toInt)
val byteChannel = new ByteArrayWritableChannel(size.toInt)
writeFully(byteChannel)
byteChannel.close()
byteChannel.getData
Expand All @@ -81,6 +82,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {

/**
* Creates an input stream to read data from this ChunkedByteBuffer.
*
* @param dispose if true, [[dispose()]] will be called at the end of the stream
* in order to close any memory-mapped files which back this buffer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {

test("no chunks") {
val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
assert(emptyChunkedByteBuffer.limit === 0)
assert(emptyChunkedByteBuffer.size === 0)
assert(emptyChunkedByteBuffer.getChunks().isEmpty)
assert(emptyChunkedByteBuffer.toArray === Array.empty)
assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0)
Expand Down Expand Up @@ -58,7 +58,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {

test("writeFully() does not affect original buffer's position") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.limit.toInt))
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

Expand All @@ -72,7 +72,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4)
fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity())
val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer))
assert(chunkedByteBuffer.limit === (1024L * 1024L * 1024L * 4L))
assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L))
intercept[UnsupportedOperationException] {
chunkedByteBuffer.toArray
}
Expand All @@ -82,10 +82,10 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
assert(chunkedByteBuffer.limit === bytes1.limit() + bytes2.limit())
assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())

val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
val bytesFromStream = new Array[Byte](chunkedByteBuffer.limit.toInt)
val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
ByteStreams.readFully(inputStream, bytesFromStream)
assert(bytesFromStream === bytes1.array() ++ bytes2.array())
assert(chunkedByteBuffer.getChunks().head.position() === 0)
Expand Down