Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
9a6658c
[SPARK-17477]: SparkSQL cannot handle schema evolution from Int -> Lo…
wgtmac Sep 9, 2016
d978d4b
[SPARK-17354] [SQL] Partitioning by dates/timestamps should work with…
HyukjinKwon Sep 9, 2016
965d966
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` inf…
tejasapatil Sep 10, 2016
a30257b
[SPARK-11496][GRAPHX] Parallel implementation of personalized pagerank
Sep 10, 2016
bca8f30
[SPARK-15509][FOLLOW-UP][ML][SPARKR] R MLlib algorithms should suppor…
yanboliang Sep 10, 2016
705fbdc
[SPARK-17396][CORE] Share the task support between UnionRDD instances.
rdblue Sep 10, 2016
9e6680e
[SPARK-16445][MLLIB][SPARKR] Fix @return description for sparkR mlp s…
keypointt Sep 10, 2016
71d6291
[SPARK-17389][ML][MLLIB] KMeans speedup with better choice of k-means…
srowen Sep 11, 2016
eb10a06
[SPARK-17439][SQL] Fixing compression issues with approximate quantil…
thunterdb Sep 11, 2016
8ce0d5e
[SPARK-17330][SPARK UT] Clean up spark-warehouse in UT
Sep 11, 2016
1d4165f
[SPARK-17336][PYSPARK] Fix appending multiple times to PYTHONPATH fro…
BryanCutler Sep 11, 2016
7d309ff
[SPARK-17389][FOLLOW-UP][ML] Change KMeans k-means|| default init ste…
yanboliang Sep 11, 2016
ed112bd
[SPARK-17415][SQL] Better error message for driver-side broadcast joi…
sameeragarwal Sep 11, 2016
7afab77
[SPARK-17486] Remove unused TaskMetricsUIData.updatedBlockStatuses field
JoshRosen Sep 12, 2016
d608f99
[SPARK-17171][WEB UI] DAG will list all partitions in the graph
cenyuhai Sep 12, 2016
10b1feb
[SPARK-17447] Performance improvement in Partitioner.defaultPartition…
codlife Sep 12, 2016
28f6379
[SPARK-16992][PYSPARK] use map comprehension in doc
gsemet Sep 12, 2016
7037d03
[SPARK CORE][MINOR] fix "default partitioner cannot partition array k…
WeichenXu123 Sep 12, 2016
cb6e59c
[SPARK-17503][CORE] Fix memory leak in Memory store when unable to ca…
clockfly Sep 12, 2016
1984289
[SPARK-17483] Refactoring in BlockManager status reporting and block …
JoshRosen Sep 12, 2016
1c60199
[SPARK-14818] Post-2.0 MiMa exclusion and build changes
JoshRosen Sep 12, 2016
671d8ec
[SPARK-17485] Prevent failed remote reads of cached blocks from faili…
JoshRosen Sep 12, 2016
77808e0
[SPARK-17474] [SQL] fix python udf in TakeOrderedAndProjectExec
Sep 12, 2016
67b0a5e
[SPARK-17515] CollectLimit.execute() should perform per-partition limits
JoshRosen Sep 13, 2016
2dda3bb
[SPARK-17142][SQL] Complex query triggers binding error in HashAggreg…
jiangxb1987 Sep 13, 2016
33fd9a7
[SPARK-17531] Don't initialize Hive Listeners for the Execution Client
brkyvz Sep 13, 2016
d020ea3
[SPARK-17530][SQL] Add Statistics into DESCRIBE FORMATTED
gatorsmile Sep 13, 2016
9b48546
[SPARK-17317][SPARKR] Add SparkR vignette
junyangq Sep 14, 2016
8cbdf79
[SPARK-17449][DOCUMENTATION] Relation between heartbeatInterval and…
jagadeesanas2 Sep 14, 2016
7cd75c0
[SPARK-17525][PYTHON] Remove SparkContext.clearFiles() from the PySpa…
sjakthol Sep 14, 2016
c37fda6
[CORE][DOC] remove redundant comment
wangmiao1981 Sep 14, 2016
84dced4
[SPARK-17480][SQL] Improve performance by removing or caching List.le…
seyfe Sep 14, 2016
25ca2de
[SPARK-17445][DOCS] Reference an ASF page as the main place to find t…
srowen Sep 14, 2016
2125957
[SPARK-17409][SQL] Do Not Optimize Query in CTAS More Than Once
gatorsmile Sep 14, 2016
f9f2c6b
[SPARK-17514] df.take(1) and df.limit(1).collect() should perform the…
JoshRosen Sep 14, 2016
08b9c75
[MINOR][SQL] Add missing functions for some options in SQLConf and us…
HyukjinKwon Sep 14, 2016
a1d246e
[SPARK-10747][SQL] Support NULLS FIRST|LAST clause in ORDER BY
xwu0226 Sep 14, 2016
a63556f
[SPARK-17511] Yarn Dynamic Allocation: Avoid marking released contain…
kishorvpatil Sep 14, 2016
e5814cc
[SPARK-17463][CORE] Make CollectionAccumulator and SetAccumulator's v…
zsxwing Sep 14, 2016
3b80d1f
[SPARK-17472] [PYSPARK] Better error message for serialization failur…
ericl Sep 14, 2016
795d83e
[SPARK-17465][SPARK CORE] Inappropriate memory management in `org.apa…
Sep 14, 2016
f1a0223
[SPARK-17440][SPARK-17441] Fixed Multiple Bugs in ALTER TABLE
gatorsmile Sep 15, 2016
de885a8
[SPARK-17507][ML][MLLIB] check weight vector size in ANN
WeichenXu123 Sep 15, 2016
5ee52aa
[SPARK-17524][TESTS] Use specified spark.buffer.pageSize
a-roberts Sep 15, 2016
037565f
[SPARK-17521] Error when I use sparkContext.makeRDD(Seq())
codlife Sep 15, 2016
6f0e760
[SPARK-17406][WEB UI] limit timeline executor events
cenyuhai Sep 15, 2016
74bf9a2
[SPARK-17536][SQL] Minor performance improvement to JDBC batch inserts
Sep 15, 2016
396a6ce
[SPARK-17406][BUILD][HOTFIX] MiMa excludes fix
srowen Sep 15, 2016
4b29340
[SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver…
tejasapatil Sep 15, 2016
1430e3b
[SPARK-17379][BUILD] Upgrade netty-all to 4.0.41 final for bug fixes
a-roberts Sep 15, 2016
129e87b
[SPARK-17547] Ensure temp shuffle data file is cleaned up after error
JoshRosen Sep 15, 2016
be726e2
[SPARK-17114][SQL] Fix aggregates grouped by literals with empty input
hvanhovell Sep 15, 2016
17f7ce3
[SPARK-17429][SQL] use ImplicitCastInputTypes with function Length
cenyuhai Sep 15, 2016
609e6ce
[SPARK-17364][SQL] Antlr lexer wrongly treats full qualified identifi…
clockfly Sep 15, 2016
2dc869f
[SPARK-17484] Prevent invalid block locations from being reported aft…
JoshRosen Sep 15, 2016
2bb1a19
[SPARK-17458][SQL] Alias specified for aggregates in a pivot are not …
aray Sep 15, 2016
8098b29
[SPARK-17543] Missing log4j config file for tests in common/network-…
jagadeesanas2 Sep 16, 2016
fdb9154
[SPARK-17534][TESTS] Increase timeouts for DirectKafkaStreamSuite tests
a-roberts Sep 16, 2016
0ef4313
[SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid OOM when conve…
clockfly Sep 16, 2016
ef7fa83
[SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7.3
rxin Sep 16, 2016
ccff86d
[SPARK-17561][DOCS] DataFrameWriter documentation formatting problems
srowen Sep 16, 2016
60c287f
[SPARK-17549][SQL] Only collect table size stat in driver for cached …
Sep 16, 2016
19f894e
Correct fetchsize property name in docs
darabos Sep 17, 2016
ae3b76b
[SPARK-17567][DOCS] Use valid url to Spark RDD paper
keypointt Sep 17, 2016
b7d1923
[SPARK-17548][MLLIB] Word2VecModel.findSynonyms no longer spuriously …
willb Sep 17, 2016
06684bb
[SPARK-17529][CORE] Implement BitSet.clearUntil and use it during mer…
Sep 17, 2016
19132d5
[SPARK-17575][DOCS] Remove extra table tags in configuration document
phalodi Sep 17, 2016
defe9aa
[SPARK-17480][SQL][FOLLOWUP] Fix more instances which calls List.leng…
HyukjinKwon Sep 17, 2016
21035a6
[SPARK-17491] Close serialization stream to fix wrong answer bug in p…
JoshRosen Sep 17, 2016
d6cbf8d
[SPARK-17518][SQL] Block Users to Specify the Internal Data Source Pr…
gatorsmile Sep 18, 2016
3357bc7
[SPARK-17541][SQL] fix some DDL bugs about table management when same…
cloud-fan Sep 18, 2016
7d24523
[SPARK-17506][SQL] Improve the check double values equality rule.
jiangxb1987 Sep 18, 2016
e460dcd
[SPARK-17546][DEPLOY] start-* scripts should use hostname -f
srowen Sep 18, 2016
032335e
[SPARK-17586][BUILD] Do not call static member via instance reference
HyukjinKwon Sep 18, 2016
5930e0b
[SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV cast null value…
lw-lin Sep 18, 2016
98dfd17
[SPARK-17571][SQL] AssertOnQuery.condition should always return Boole…
petermaxlee Sep 18, 2016
3b18214
[SPARK-17297][DOCS] Clarify window/slide duration as absolute time, n…
srowen Sep 19, 2016
2b7ebff
[SPARK-17473][SQL] fixing docker integration tests error due to diffe…
sureshthalamati Sep 19, 2016
e6a9a72
[SPARK-17438][WEBUI] Show Application.executorLimit in the applicatio…
zsxwing Sep 19, 2016
f6aeaf2
revert change
wgtmac Sep 19, 2016
9fc18a4
[SPARK-17477][SQL] SparkSQL cannot handle schema evolution from Int -…
wgtmac Sep 19, 2016
8efd4dd
[SPARK-16439] [SQL] bring back the separator in SQL UI
Sep 19, 2016
a1bdea0
[SPARK-17100] [SQL] fix Python udf in filter on top of outer join
Sep 19, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-17491] Close serialization stream to fix wrong answer bug in p…
…utIteratorAsBytes()

## What changes were proposed in this pull request?

`MemoryStore.putIteratorAsBytes()` may silently lose values when used with `KryoSerializer` because it does not properly close the serialization stream before attempting to deserialize the already-serialized values, which may cause values buffered in Kryo's internal buffers to not be read.

This is the root cause behind a user-reported "wrong answer" bug in PySpark caching reported by bennoleslie on the Spark user mailing list in a thread titled "pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK". Due to Spark 2.0's automatic use of KryoSerializer for "safe" types (such as byte arrays, primitives, etc.) this misuse of serializers manifested itself as silent data corruption rather than a StreamCorrupted error (which you might get from JavaSerializer).

The minimal fix, implemented here, is to close the serialization stream before attempting to deserialize written values. In addition, this patch adds several additional assertions / precondition checks to prevent misuse of `PartiallySerializedBlock` and `ChunkedByteBufferOutputStream`.

## How was this patch tested?

The original bug was masked by an invalid assert in the memory store test cases: the old assert compared two results record-by-record with `zip` but didn't first check that the lengths of the two collections were equal, causing missing records to go unnoticed. The updated test case reproduced this bug.

In addition, I added a new `PartiallySerializedBlockSuite` to unit test that component.

Author: Josh Rosen <[email protected]>

Closes #15043 from JoshRosen/partially-serialized-block-values-iterator-bugfix.
  • Loading branch information
JoshRosen authored and wgtmac committed Sep 19, 2016
commit 21035a66b3ada119efb0934d67ede8095682a4af
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task)
Utils.writeByteBuffer(taskBytes, out)
out.close()
out.toByteBuffer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

Expand Down Expand Up @@ -277,6 +277,7 @@ private[spark] class MemoryStore(
"released too much unroll memory")
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty))
Expand All @@ -285,7 +286,11 @@ private[spark] class MemoryStore(
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Left(new PartiallyUnrolledIterator(
this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values))
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values))
}
}

Expand Down Expand Up @@ -394,7 +399,7 @@ private[spark] class MemoryStore(
redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
bbos.toChunkedByteBuffer,
bbos,
values,
classTag))
}
Expand Down Expand Up @@ -655,20 +660,22 @@ private[spark] class MemoryStore(
* The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
*
* @param memoryStore the memoryStore, used for freeing memory.
* @param memoryMode the memory mode (on- or off-heap).
* @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
* @param unrolled an iterator for the partially-unrolled values.
* @param rest the rest of the original iterator passed to
* [[MemoryStore.putIteratorAsValues()]].
*/
private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
memoryMode: MemoryMode,
unrollMemory: Long,
private[this] var unrolled: Iterator[T],
rest: Iterator[T])
extends Iterator[T] {

private def releaseUnrollMemory(): Unit = {
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
// SPARK-17503: Garbage collects the unrolling memory before the life end of
// PartiallyUnrolledIterator.
unrolled = null
Expand Down Expand Up @@ -706,7 +713,7 @@ private[storage] class PartiallyUnrolledIterator[T](
/**
* A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
*/
private class RedirectableOutputStream extends OutputStream {
private[storage] class RedirectableOutputStream extends OutputStream {
private[this] var os: OutputStream = _
def setOutputStream(s: OutputStream): Unit = { os = s }
override def write(b: Int): Unit = os.write(b)
Expand All @@ -726,7 +733,8 @@ private class RedirectableOutputStream extends OutputStream {
* @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
* @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
* @param memoryMode whether the unroll memory is on- or off-heap
* @param unrolled a byte buffer containing the partially-serialized values.
* @param bbos byte buffer output stream containing the partially-serialized values.
* [[redirectableOutputStream]] initially points to this output stream.
* @param rest the rest of the original iterator passed to
* [[MemoryStore.putIteratorAsValues()]].
* @param classTag the [[ClassTag]] for the block.
Expand All @@ -735,14 +743,19 @@ private[storage] class PartiallySerializedBlock[T](
memoryStore: MemoryStore,
serializerManager: SerializerManager,
blockId: BlockId,
serializationStream: SerializationStream,
redirectableOutputStream: RedirectableOutputStream,
unrollMemory: Long,
private val serializationStream: SerializationStream,
private val redirectableOutputStream: RedirectableOutputStream,
val unrollMemory: Long,
memoryMode: MemoryMode,
unrolled: ChunkedByteBuffer,
bbos: ChunkedByteBufferOutputStream,
rest: Iterator[T],
classTag: ClassTag[T]) {

private lazy val unrolledBuffer: ChunkedByteBuffer = {
bbos.close()
bbos.toChunkedByteBuffer
}

// If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of
// this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task
// completion listener here in order to ensure that `unrolled.dispose()` is called at least once.
Expand All @@ -751,23 +764,42 @@ private[storage] class PartiallySerializedBlock[T](
taskContext.addTaskCompletionListener { _ =>
// When a task completes, its unroll memory will automatically be freed. Thus we do not call
// releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing.
unrolled.dispose()
unrolledBuffer.dispose()
}
}

// Exposed for testing
private[storage] def getUnrolledChunkedByteBuffer: ChunkedByteBuffer = unrolledBuffer

private[this] var discarded = false
private[this] var consumed = false

private def verifyNotConsumedAndNotDiscarded(): Unit = {
if (consumed) {
throw new IllegalStateException(
"Can only call one of finishWritingToStream() or valuesIterator() and can only call once.")
}
if (discarded) {
throw new IllegalStateException("Cannot call methods on a discarded PartiallySerializedBlock")
}
}

/**
* Called to dispose of this block and free its memory.
*/
def discard(): Unit = {
try {
// We want to close the output stream in order to free any resources associated with the
// serializer itself (such as Kryo's internal buffers). close() might cause data to be
// written, so redirect the output stream to discard that data.
redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
serializationStream.close()
} finally {
unrolled.dispose()
memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
if (!discarded) {
try {
// We want to close the output stream in order to free any resources associated with the
// serializer itself (such as Kryo's internal buffers). close() might cause data to be
// written, so redirect the output stream to discard that data.
redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
serializationStream.close()
} finally {
discarded = true
unrolledBuffer.dispose()
memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
}
}
}

Expand All @@ -776,8 +808,10 @@ private[storage] class PartiallySerializedBlock[T](
* and then serializing the values from the original input iterator.
*/
def finishWritingToStream(os: OutputStream): Unit = {
verifyNotConsumedAndNotDiscarded()
consumed = true
// `unrolled`'s underlying buffers will be freed once this input stream is fully read:
ByteStreams.copy(unrolled.toInputStream(dispose = true), os)
ByteStreams.copy(unrolledBuffer.toInputStream(dispose = true), os)
memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
redirectableOutputStream.setOutputStream(os)
while (rest.hasNext) {
Expand All @@ -794,13 +828,22 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
def valuesIterator: PartiallyUnrolledIterator[T] = {
verifyNotConsumedAndNotDiscarded()
consumed = true
// Close the serialization stream so that the serializer's internal buffers are freed and any
// "end-of-stream" markers can be written out so that `unrolled` is a valid serialized stream.
serializationStream.close()
// `unrolled`'s underlying buffers will be freed once this input stream is fully read:
val unrolledIter = serializerManager.dataDeserializeStream(
blockId, unrolled.toInputStream(dispose = true))(classTag)
blockId, unrolledBuffer.toInputStream(dispose = true))(classTag)
// The unroll memory will be freed once `unrolledIter` is fully consumed in
// PartiallyUnrolledIterator. If the iterator is not consumed by the end of the task then any
// extra unroll memory will automatically be freed by a `finally` block in `Task`.
new PartiallyUnrolledIterator(
memoryStore,
memoryMode,
unrollMemory,
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
unrolled = unrolledIter,
rest = rest)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,32 @@ private[spark] class ByteBufferOutputStream(capacity: Int) extends ByteArrayOutp

def getCount(): Int = count

private[this] var closed: Boolean = false

override def write(b: Int): Unit = {
require(!closed, "cannot write to a closed ByteBufferOutputStream")
super.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
require(!closed, "cannot write to a closed ByteBufferOutputStream")
super.write(b, off, len)
}

override def reset(): Unit = {
require(!closed, "cannot reset a closed ByteBufferOutputStream")
super.reset()
}

override def close(): Unit = {
if (!closed) {
super.close()
closed = true
}
}

def toByteBuffer: ByteBuffer = {
return ByteBuffer.wrap(buf, 0, count)
require(closed, "can only call toByteBuffer() after ByteBufferOutputStream has been closed")
ByteBuffer.wrap(buf, 0, count)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,27 @@ private[spark] class ChunkedByteBufferOutputStream(
*/
private[this] var position = chunkSize
private[this] var _size = 0
private[this] var closed: Boolean = false

def size: Long = _size

override def close(): Unit = {
if (!closed) {
super.close()
closed = true
}
}

override def write(b: Int): Unit = {
require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream")
allocateNewChunkIfNeeded()
chunks(lastChunkIndex).put(b.toByte)
position += 1
_size += 1
}

override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream")
var written = 0
while (written < len) {
allocateNewChunkIfNeeded()
Expand All @@ -73,7 +83,6 @@ private[spark] class ChunkedByteBufferOutputStream(

@inline
private def allocateNewChunkIfNeeded(): Unit = {
require(!toChunkedByteBufferWasCalled, "cannot write after toChunkedByteBuffer() is called")
if (position == chunkSize) {
chunks += allocator(chunkSize)
lastChunkIndex += 1
Expand All @@ -82,6 +91,7 @@ private[spark] class ChunkedByteBufferOutputStream(
}

def toChunkedByteBuffer: ChunkedByteBuffer = {
require(closed, "cannot call toChunkedByteBuffer() unless close() has been called")
require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once")
toChunkedByteBufferWasCalled = true
if (lastChunkIndex == -1) {
Expand Down
34 changes: 16 additions & 18 deletions core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class MemoryStoreSuite
(memoryStore, blockInfoManager)
}

private def assertSameContents[T](expected: Seq[T], actual: Seq[T], hint: String): Unit = {
assert(actual.length === expected.length, s"wrong number of values returned in $hint")
expected.iterator.zip(actual.iterator).foreach { case (e, a) =>
assert(e === a, s"$hint did not return original values!")
}
}

test("reserve/release unroll memory") {
val (memoryStore, _) = makeMemoryStore(12000)
assert(memoryStore.currentUnrollMemory === 0)
Expand Down Expand Up @@ -137,9 +144,7 @@ class MemoryStoreSuite
var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
assert(e === a, "getValues() did not return original values!")
}
assertSameContents(smallList, memoryStore.getValues("unroll").get.toSeq, "getValues")
blockInfoManager.lockForWriting("unroll")
assert(memoryStore.remove("unroll"))
blockInfoManager.removeBlock("unroll")
Expand All @@ -152,9 +157,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
assert(!memoryStore.contains("someBlock1"))
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
assert(e === a, "getValues() did not return original values!")
}
assertSameContents(smallList, memoryStore.getValues("unroll").get.toSeq, "getValues")
blockInfoManager.lockForWriting("unroll")
assert(memoryStore.remove("unroll"))
blockInfoManager.removeBlock("unroll")
Expand All @@ -167,9 +170,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
assert(e === a, "putIterator() did not return original values!")
}
assertSameContents(bigList, putResult.left.get.toSeq, "putIterator")
// The unroll memory was freed once the iterator returned by putIterator() was fully traversed.
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
Expand Down Expand Up @@ -316,9 +317,8 @@ class MemoryStoreSuite
assert(res.isLeft)
assert(memoryStore.currentUnrollMemoryForThisTask > 0)
val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization
valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) =>
assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!")
}
assertSameContents(
bigList, valuesReturnedFromFailedPut, "PartiallySerializedBlock.valuesIterator()")
// The unroll memory was freed once the iterator was fully traversed.
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
Expand All @@ -340,12 +340,10 @@ class MemoryStoreSuite
res.left.get.finishWritingToStream(bos)
// The unroll memory was freed once the block was fully written.
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
val deserializationStream = serializerManager.dataDeserializeStream[Any](
"b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any)
deserializationStream.zip(bigList.iterator).foreach { case (e, a) =>
assert(e === a,
"PartiallySerializedBlock.finishWritingtoStream() did not write original values!")
}
val deserializedValues = serializerManager.dataDeserializeStream[Any](
"b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any).toSeq
assertSameContents(
bigList, deserializedValues, "PartiallySerializedBlock.finishWritingToStream()")
}

test("multiple unrolls by the same thread") {
Expand Down
Loading