Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
minor
  • Loading branch information
Earne committed Apr 1, 2016
commit 8f82519fbbd98e2c457242e91a87a591b8cae0fa
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait MemoryEntryManager[K, V] {
def containsEntry(key: K): Boolean
}

class FIFOMemoryEntryManager[K, V <: MemoryEntry[_]] extends MemoryEntryManager[K, V] {
class FIFOMemoryEntryManager[K, V] extends MemoryEntryManager[K, V] {
val entries = new util.LinkedHashMap[K, V](32, 0.75f)

override def getEntry(key: K): V = {
Expand Down Expand Up @@ -66,7 +66,11 @@ class FIFOMemoryEntryManager[K, V <: MemoryEntry[_]] extends MemoryEntryManager[
}
}

class LRUMemoryEntryManager[K, V <: MemoryEntry[_]] extends MemoryEntryManager[K, V] {
class LRUMemoryEntryManager[K, V] extends MemoryEntryManager[K, V] {
def entrySet() : util.Set[util.Map.Entry[K, V]] = {
entries.entrySet()
}

val entries = new util.LinkedHashMap[K, V](32, 0.75f, true)

override def getEntry(key: K): V = {
Expand Down Expand Up @@ -98,26 +102,4 @@ class LRUMemoryEntryManager[K, V <: MemoryEntry[_]] extends MemoryEntryManager[K
entries.containsKey(key)
}
}


def foo(freedMemory: Long, space: Long, blockIsEvictable: (K) => Boolean,
hasWriteLock: (K) => Boolean): Long = {
val selectedBlocks = new ArrayBuffer[K]
var freed = freedMemory
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (blockIsEvictable(blockId)) {
if (hasWriteLock(blockId)) {
selectedBlocks += blockId
freed += pair.getValue.size
}
}
}
}
// (selectedBlocks, freed)
freed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,26 +445,22 @@ private[spark] class MemoryStore(
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
// entries.synchronized {
// val iterator = entries.entrySet().iterator()
// while (freedMemory < space && iterator.hasNext) {
// val pair = iterator.next()
// val blockId = pair.getKey
// if (blockIsEvictable(blockId)) {
// // We don't want to evict blocks which are currently being read, so we need to obtain
// // an exclusive write lock on blocks which are candidates for eviction. We perform a
// // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
// if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
// selectedBlocks += blockId
// freedMemory += pair.getValue.size
// }
// }
// }
// }
// val foo = new FIFOMemoryEntryManager[BlockId, MemoryEntry[_]]
freedMemory = entries.foo(freedMemory, space,
(blockId: BlockId) => true,
(blockId: BlockId) => true)
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (blockIsEvictable(blockId)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
}

def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
Expand Down