Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'master' into refactorLCS
  • Loading branch information
Earne committed Apr 5, 2016
commit ee017741f403d4b7016aae3ce3fe17d0e01519fa
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.google.common.io.ByteStreams

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, BlockManager, StorageLevel}
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
Expand Down Expand Up @@ -143,7 +146,7 @@ private[spark] class MemoryStore(
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.putEntry(blockId, entry)
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Expand Down Expand Up @@ -462,9 +465,9 @@ private[spark] class MemoryStore(
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
var selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId): Boolean = {
rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.