Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,23 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) {
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
"block from the same RDD")
return false
// Apply the same-RDD rule for cache replacement. Quoted from the
// original RDD paper:
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy over this log message to L267/L277 - when there blocks in entries and eviction wont help store this block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, thanks.

// When a new RDD partition is computed but there is not enough
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @liancheng I think it's okay to remove this quote. If you look at the scaladoc it already explains the intended policy wrt to partitions in the same RDD - so I think that is sufficient. The scaladoc says "which leads to a wasteful cyclic replacement pattern for RDDs don't fit into memory that we want to avoid"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed :)

// space to store it, we evict a partition from the least recently
// accessed RDD, unless this is the same RDD as the one with the
// new partition. In that case, we keep the old partition in memory
// to prevent cycling partitions from the same RDD in and out.
//
// TODO implement LRU eviction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entries is already a LinkedHashMap - so you iterate in LRU : you can remove the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm , I think LinkedHashMap actually keeps the order of insertion, but not using? (though I'm not clear how to track when the block is accessed by the tasks for now...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, @mridulm is right, entries is constructed with accessOrder set to be true, which enables LRU. Thanks for pointing this out @mridulm, wasn't aware of that before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

rddToAdd match {
case Some(rddId) if rddId == getRddId(blockId) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a mistake here, rddId: Int == getRddId(blockId): Option[Int] never holds...

// no-op
case _ =>
selectedBlocks += blockId
selectedMemory += pair.getValue.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggested alternative to LRU:

To minimize the number of affected RDDs, how about evicting the blocks from those RDDs occupying the most memory space first, because in usual, all the blocks in RDD are necessary for the computation, this approach may minimize the chance for recomputation

}
selectedBlocks += blockId
selectedMemory += pair.getValue.size
}
}

Expand Down