Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
51ca7bd
Improve building with maven docs
Mar 6, 2014
cda381f
SPARK-1184: Update the distribution tar.gz to include spark-assembly jar
markgrover Mar 6, 2014
3eb009f
SPARK-1156: allow user to login into a cluster without slaves
CodingCat Mar 6, 2014
3d3acef
SPARK-1187, Added missing Python APIs
Mar 6, 2014
40566e1
SPARK-942: Do not materialize partitions when DISK_ONLY storage level…
kellrott Mar 6, 2014
7edbea4
SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI…
tgravescs Mar 7, 2014
328c73d
SPARK-1197. Change yarn-standalone to yarn-cluster and fix up running…
sryza Mar 7, 2014
9ae919c
Example for cassandra CQL read/write from spark
anitatailor Mar 7, 2014
33baf14
Small clean-up to flatmap tests
pwendell Mar 7, 2014
dabeb6f
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
aarondav Mar 7, 2014
b7cd9e9
SPARK-1195: set map_input_file environment variable in PipedRDD
tgravescs Mar 7, 2014
6e730ed
Spark 1165 rdd.intersection in python and java
ScrapCodes Mar 8, 2014
a99fb37
SPARK-1193. Fix indentation in pom.xmls
sryza Mar 8, 2014
8ad486a
Allow sbt to use more than 1G of heap.
rxin Mar 8, 2014
0b7b7fd
[SPARK-1194] Fix the same-RDD rule for cache replacement
liancheng Mar 8, 2014
c2834ec
Update junitxml plugin to the latest version to avoid recompilation i…
rxin Mar 8, 2014
e59a3b6
SPARK-1190: Do not initialize log4j if slf4j log4j backend is not bei…
pwendell Mar 9, 2014
52834d7
SPARK-929: Fully deprecate usage of SPARK_MEM
aarondav Mar 9, 2014
f6f9d02
Add timeout for fetch file
guojc Mar 9, 2014
faf4cad
Fix markup errors introduced in #33 (SPARK-1189)
pwendell Mar 9, 2014
b9be160
SPARK-782 Clean up for ASM dependency.
pwendell Mar 9, 2014
5d98cfc
maintain arbitrary state data for each key
CrazyJvm Mar 10, 2014
32ad348
[SPARK-1186] : Enrich the Spark Shell to support additional arguments.
berngp Mar 10, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-942: Do not materialize partitions when DISK_ONLY storage level…
… is used

This is a port of a pull request original targeted at incubator-spark: https://github.com/apache/incubator-spark/pull/180

Essentially if a user returns a generative iterator (from a flatMap operation), when trying to persist the data, Spark would first unroll the iterator into an ArrayBuffer, and then try to figure out if it could store the data. In cases where the user provided an iterator that generated more data then available memory, this would case a crash. With this patch, if the user requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is inputed into the serializer.

To do this, two changes where made:
1) The type of the 'values' argument in the putValues method of the BlockStore interface was changed from ArrayBuffer to Iterator (and all code interfacing with this method was modified to connect correctly.
2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 1000 objects. This was done because the ObjectOutputStream caches objects (thus preventing them from being GC'd) to write more compact serialization. If reset is never called, eventually the memory fills up, if it is called too often then the serialization streams become much larger because of redundant class descriptions.

Author: Kyle Ellrott <[email protected]>

Closes #50 from kellrott/iterator-to-disk and squashes the following commits:

9ef7cb8 [Kyle Ellrott] Fixing formatting issues.
60e0c57 [Kyle Ellrott] Fixing issues (formatting, variable names, etc.) from review comments
8aa31cd [Kyle Ellrott] Merge ../incubator-spark into iterator-to-disk
33ac390 [Kyle Ellrott] Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into iterator-to-disk
2f684ea [Kyle Ellrott] Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
f70d069 [Kyle Ellrott] Adding docs for spark.serializer.objectStreamReset configuration
7ccc74b [Kyle Ellrott] Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more
16a4cea [Kyle Ellrott] Streamlined the LargeIteratorSuite unit test. It should now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of Spark.
c2fb430 [Kyle Ellrott] Removing more un-needed array-buffer to iterator conversions
627a8b7 [Kyle Ellrott] Wrapping a few long lines
0f28ec7 [Kyle Ellrott] Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication.
656c33e [Kyle Ellrott] Fixing the JavaSerializer to read from the SparkConf rather then the System property.
8644ee8 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
00c98e0 [Kyle Ellrott] Making the Java ObjectStreamSerializer reset rate configurable by the system variable 'spark.serializer.objectStreamReset', default is not 10000.
40fe1d7 [Kyle Ellrott] Removing rouge space
31fe08e [Kyle Ellrott] Removing un-needed semi-colons
9df0276 [Kyle Ellrott] Added check to make sure that streamed-to-dist RDD actually returns good data in the LargeIteratorSuite
a6424ba [Kyle Ellrott] Wrapping long line
2eeda75 [Kyle Ellrott] Fixing dumb mistake ("||" instead of "&&")
0e6f808 [Kyle Ellrott] Deleting temp output directory when done
95c7f67 [Kyle Ellrott] Simplifying StorageLevel checks
56f71cd [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
44ec35a [Kyle Ellrott] Adding some comments.
5eb2b7e [Kyle Ellrott] Changing the JavaSerializer reset to occur every 1000 objects.
f403826 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
81d670c [Kyle Ellrott] Adding unit test for straight to disk iterator methods.
d32992f [Kyle Ellrott] Merge remote-tracking branch 'origin/master' into iterator-to-disk
cac1fad [Kyle Ellrott] Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer objects. This was previously done higher up the stack.
efe1102 [Kyle Ellrott] Changing CacheManager and BlockManager to pass iterators directly to the serializer when a 'DISK_ONLY' persist is called. This is in response to SPARK-942.
  • Loading branch information
kellrott authored and pwendell committed Mar 6, 2014
commit 40566e10aae4b21ffc71ea72702b8df118ac5c8e
28 changes: 24 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,30 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
if (storageLevel.useDisk && !storageLevel.useMemory) {
// In the case that this RDD is to be persisted using DISK_ONLY
// the iterator will be passed directly to the blockManager (rather then
// caching it to an ArrayBuffer first), then the resulting block data iterator
// will be passed back to the user. If the iterator generates a lot of data,
// this means that it doesn't all have to be held in memory at one time.
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
// blocks aren't dropped by the block store before enabling that.
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
return blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
}
} finally {
loading.synchronized {
loading.remove(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,28 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.ByteBufferInputStream

private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
private[spark] class JavaSerializationStream(out: OutputStream, conf: SparkConf)
extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
var counter = 0
val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)

/**
* Calling reset to avoid memory leak:
* http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
* But only call it every 10,000th time to avoid bloated serialization streams (when
* the stream 'resets' object class descriptions have to be re-written)
*/
def writeObject[T](t: T): SerializationStream = {
objOut.writeObject(t)
if (counterReset > 0 && counter >= counterReset) {
objOut.reset()
counter = 0
} else {
counter += 1
}
this
}
def flush() { objOut.flush() }
def close() { objOut.close() }
}
Expand All @@ -41,7 +60,7 @@ extends DeserializationStream {
def close() { objIn.close() }
}

private[spark] class JavaSerializerInstance extends SerializerInstance {
private[spark] class JavaSerializerInstance(conf: SparkConf) extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
Expand All @@ -63,7 +82,7 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
}

def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s)
new JavaSerializationStream(s, conf)
}

def deserializeStream(s: InputStream): DeserializationStream = {
Expand All @@ -79,5 +98,5 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
* A Spark serializer that uses Java's built-in serialization.
*/
class JavaSerializer(conf: SparkConf) extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
def newInstance(): SerializerInstance = new JavaSerializerInstance(conf)
}
87 changes: 51 additions & 36 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._

sealed trait Values

case class ByteBufferValues(buffer: ByteBuffer) extends Values
case class IteratorValues(iterator: Iterator[Any]) extends Values
case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values

private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
Expand Down Expand Up @@ -455,9 +461,7 @@ private[spark] class BlockManager(

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
: Long = {
val elements = new ArrayBuffer[Any]
elements ++= values
put(blockId, elements, level, tellMaster)
doPut(blockId, IteratorValues(values), level, tellMaster)
}

/**
Expand All @@ -479,7 +483,7 @@ private[spark] class BlockManager(
def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
tellMaster: Boolean = true) : Long = {
require(values != null, "Values is null")
doPut(blockId, Left(values), level, tellMaster)
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
}

/**
Expand All @@ -488,10 +492,11 @@ private[spark] class BlockManager(
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
tellMaster: Boolean = true) {
require(bytes != null, "Bytes is null")
doPut(blockId, Right(bytes), level, tellMaster)
doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
}

private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer],
private def doPut(blockId: BlockId,
data: Values,
level: StorageLevel, tellMaster: Boolean = true): Long = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
Expand Down Expand Up @@ -534,8 +539,9 @@ private[spark] class BlockManager(

// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (data.isRight && level.replication > 1) {
val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
// Duplicate doesn't copy the bytes, just creates a wrapper
val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
Future {
replicate(blockId, bufferView, level)
}
Expand All @@ -549,34 +555,43 @@ private[spark] class BlockManager(

var marked = false
try {
data match {
case Left(values) => {
if (level.useMemory) {
// Save it just to memory first, even if it also has useDisk set to true; we will
// drop it to disk later if the memory store can't hold it.
val res = memoryStore.putValues(blockId, values, level, true)
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication > 1
val res = diskStore.putValues(blockId, values, level, askForBytes)
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
if (level.useMemory) {
// Save it just to memory first, even if it also has useDisk set to true; we will
// drop it to disk later if the memory store can't hold it.
val res = data match {
case IteratorValues(iterator) =>
memoryStore.putValues(blockId, iterator, level, true)
case ArrayBufferValues(array) =>
memoryStore.putValues(blockId, array, level, true)
case ByteBufferValues(bytes) => {
bytes.rewind();
memoryStore.putBytes(blockId, bytes, level)
}
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication > 1

val res = data match {
case IteratorValues(iterator) =>
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) => {
bytes.rewind();
diskStore.putBytes(blockId, bytes, level)
}
}
case Right(bytes) => {
bytes.rewind()
// Store it only in memory at first, even if useDisk is also set to true
(if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level)
size = bytes.limit
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}

Expand Down Expand Up @@ -605,8 +620,8 @@ private[spark] class BlockManager(
// values and need to serialize and replicate them now:
if (level.replication > 1) {
data match {
case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
case Left(values) => {
case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
case _ => {
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.Logging
*/
private[spark]
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel)
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult

/**
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
Expand All @@ -37,6 +37,9 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
returnValues: Boolean) : PutResult

def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
returnValues: Boolean) : PutResult

Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
diskManager.getBlockLocation(blockId).length
}

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
Expand All @@ -52,20 +52,30 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
return PutResult(bytes.limit(), Right(bytes.duplicate()))
}

override def putValues(
blockId: BlockId,
values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
return putValues(blockId, values.toIterator, level, returnValues)
}

override def putValues(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {

logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
blockManager.dataSerializeStream(blockId, outputStream, values)
val length = file.length

val timeTaken = System.currentTimeMillis - startTime
Expand Down
31 changes: 26 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
Expand All @@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
PutResult(sizeEstimate, Left(values.toIterator))
} else {
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}

Expand All @@ -69,14 +71,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {

: PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
tryToPut(blockId, values, sizeEstimate, true)
PutResult(sizeEstimate, Left(values.iterator))
PutResult(sizeEstimate, Left(values.toIterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.toIterator)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}

override def putValues(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {

if (level.deserialized) {
val valueEntries = new ArrayBuffer[Any]()
valueEntries ++= values
val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
tryToPut(blockId, valueEntries, sizeEstimate, true)
PutResult(sizeEstimate, Left(valueEntries.toIterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
val bytes = blockManager.dataSerialize(blockId, values)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
Expand Down
Loading