Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
735eca6
Split MemoryEntry into two separate classes (serialized and deseriali…
JoshRosen Mar 15, 2016
8f08289
Add ChunkedByteBuffer and use it in storage layer.
JoshRosen Mar 15, 2016
79b1a6a
Add test cases and fix bug in ChunkedByteBuffer.toInputStream()
JoshRosen Mar 15, 2016
7dbcd5a
WIP towards understanding destruction.
JoshRosen Mar 15, 2016
3fbec21
Small fixes to dispose behavior.
JoshRosen Mar 15, 2016
e5e663f
Modify BlockManager.dataSerialize to write ChunkedByteBuffers.
JoshRosen Mar 15, 2016
de62f0d
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 16, 2016
0a347fd
Fix test compilation in streaming.
JoshRosen Mar 16, 2016
6852c48
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 16, 2016
43f8fa6
Allow ChunkedByteBuffer to contain no chunks.
JoshRosen Mar 16, 2016
25e6884
Document toByteBuffer() and toArray() size limitations.
JoshRosen Mar 16, 2016
325c83d
Move dispose() from BlockManager to StorageUtils.
JoshRosen Mar 16, 2016
4f5074e
Better documentation for dispose() methods.
JoshRosen Mar 16, 2016
b6ddf3e
Rename limit to size.
JoshRosen Mar 16, 2016
719ad3c
Implement missing InputStream methods.
JoshRosen Mar 16, 2016
2300607
More comments.
JoshRosen Mar 16, 2016
3fc0b66
Fix confusing getChunks().head
JoshRosen Mar 16, 2016
c747c85
Merge remote-tracking branch 'origin/master' into chunked-block-seria…
JoshRosen Mar 17, 2016
cb9311b
Fix logging import.
JoshRosen Mar 17, 2016
2970932
Clean up dispose logic to address review comments.
JoshRosen Mar 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move dispose() from BlockManager to StorageUtils.
It was a static method before, but its location was confusing.
  • Loading branch information
JoshRosen committed Mar 16, 2016
commit 325c83d8909472428ae65620033fff4887c36e06
21 changes: 2 additions & 19 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.ByteBuffer

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NonFatal

import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -1333,24 +1331,9 @@ private[spark] class BlockManager(
}


private[spark] object BlockManager extends Logging {
private[spark] object BlockManager {
private val ID_GENERATOR = new IdGenerator

/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
* unfortunately no standard API to do this.
*/
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Unmapping $buffer")
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
}
}
}

def blockIdsToHosts(
blockIds: Array[BlockId],
env: SparkEnv,
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.storage

import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.Map
import scala.collection.mutable

import sun.nio.ch.DirectBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -222,7 +227,22 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
}

/** Helper methods for storage-related objects. */
private[spark] object StorageUtils {
private[spark] object StorageUtils extends Logging {

/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
* unfortunately no standard API to do this.
*/
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Unmapping $buffer")
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
}
}
}

/**
* Update the given list of RDDInfo with the given list of storage statuses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.spark.util
import java.io.InputStream
import java.nio.ByteBuffer

import org.apache.spark.storage.BlockManager
import org.apache.spark.storage.StorageUtils

/**
* Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
* Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
* at the end of the stream (e.g. to close a memory-mapped file).
*/
private[spark]
Expand Down Expand Up @@ -68,12 +68,12 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
}

/**
* Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
* Clean up the buffer, and potentially dispose of it using StorageUtils.dispose().
*/
private def cleanUp() {
if (buffer != null) {
if (dispose) {
BlockManager.dispose(buffer)
StorageUtils.dispose(buffer)
}
buffer = null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.google.common.primitives.UnsignedBytes
import io.netty.buffer.{ByteBuf, Unpooled}

import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.storage.BlockManager
import org.apache.spark.storage.StorageUtils

private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
require(chunks != null, "chunks must not be null")
Expand Down Expand Up @@ -105,12 +105,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* unfortunately no standard API to do this.
*/
def dispose(): Unit = {
chunks.foreach(BlockManager.dispose)
chunks.foreach(StorageUtils.dispose)
}
}

/**
* Reads data from a ChunkedByteBuffer, and optionally cleans it up using BlockManager.dispose()
* Reads data from a ChunkedByteBuffer, and optionally cleans it up using StorageUtils.dispose()
* at the end of the stream (e.g. to close a memory-mapped file).
*/
private class ChunkedByteBufferInputStream(
Expand All @@ -129,7 +129,7 @@ private class ChunkedByteBufferInputStream(

override def read(): Int = {
if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) {
BlockManager.dispose(currentChunk)
StorageUtils.dispose(currentChunk)
currentChunk = chunks.next()
}
if (currentChunk != null && currentChunk.hasRemaining) {
Expand Down