-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-1103] [WIP] Automatic garbage collection of RDD, shuffle and broadcast data #126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
1e752f1
Added unpersist method to Broadcast.
80dd977
Fix for Broadcast unpersist patch.
e427a9e
Added ContextCleaner to automatically clean RDDs and shuffles when th…
tdas 8512612
Changed TimeStampedHashMap to use WrappedJavaHashMap.
tdas a24fefc
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas cb0a5a6
Fixed docs and styles.
tdas ae9da88
Removed unncessary TimeStampedHashMap from DAGScheduler, added try-ca…
tdas e61daa0
Modifications based on the comments on PR 126.
tdas a7260d3
Added try-catch in context cleaner and null value cleaning in TimeSta…
tdas 892b952
Removed use of BoundedHashMap, and made BlockManagerSlaveActor cleanu…
tdas e1fba5f
Style fix
tdas f2881fd
Changed ContextCleaner to use ReferenceQueue instead of finalizer
tdas 620eca3
Changes based on PR comments.
tdas a007307
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas d2f8b97
Removed duplicate unpersistRDD.
tdas 6c9dcf6
Added missing Apache license
tdas c7ccef1
Merge branch 'bc-unpersist-merge' of github.com:ignatich/incubator-sp…
andrewor14 ba52e00
Refactor broadcast classes
andrewor14 d0edef3
Add framework for broadcast cleanup
andrewor14 544ac86
Clean up broadcast blocks through BlockManager*
andrewor14 e95479c
Add tests for unpersisting broadcast
andrewor14 f201a8d
Test broadcast cleanup in ContextCleanerSuite + remove BoundedHashMap
andrewor14 c92e4d9
Merge github.com:apache/spark into cleanup
andrewor14 0d17060
Import, comments, and style fixes (minor)
andrewor14 34f436f
Generalize BroadcastBlockId to remove BroadcastHelperBlockId
andrewor14 fbfeec8
Add functionality to query executors for their local BlockStatuses
andrewor14 88904a3
Make TimeStampedWeakValueHashMap a wrapper of TimeStampedHashMap
andrewor14 e442246
Merge github.com:apache/spark into cleanup
andrewor14 8557c12
Merge github.com:apache/spark into cleanup
andrewor14 7edbc98
Merge remote-tracking branch 'apache-github/master' into state-cleanup
tdas 634a097
Merge branch 'state-cleanup' of github.com:tdas/spark into cleanup
andrewor14 7ed72fb
Fix style test fail + remove verbose test message regarding broadcast
andrewor14 5016375
Address TD's comments
andrewor14 f0aabb1
Correct semantics for TimeStampedWeakValueHashMap + add tests
andrewor14 762a4d8
Merge pull request #1 from andrewor14/cleanup
tdas a6460d4
Merge github.com:apache/spark into cleanup
andrewor14 c5b1d98
Address Patrick's comments
andrewor14 a2cc8bc
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas ada45f0
Merge branch 'state-cleanup' of github.com:tdas/spark into cleanup
andrewor14 cd72d19
Make automatic cleanup configurable (not documented)
andrewor14 b27f8e8
Merge pull request #3 from andrewor14/cleanup
tdas a430f06
Fixed compilation errors.
tdas 104a89a
Fixed failing BroadcastSuite unit tests by introducing blocking for r…
tdas 6222697
Fixed bug and adding unit test for removeBroadcast in BlockManagerSuite.
tdas 41c9ece
Added more unit tests for BlockManager, DiskBlockManager, and Context…
tdas 2b95b5e
Added more documentation on Broadcast implementations, specially whic…
tdas 4d05314
Scala style fix.
tdas cff023c
Fixed issues based on Andrew's comments.
tdas d25a86e
Fixed stupid typo.
tdas f489fdc
Merge remote-tracking branch 'apache/master' into state-cleanup
tdas 61b8d6e
Fixed issue with Tachyon + new BlockManager methods.
tdas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Make TimeStampedWeakValueHashMap a wrapper of TimeStampedHashMap
This allows us to get rid of WrappedJavaHashMap without much duplicate code.
- Loading branch information
commit 88904a3659fe4a81bdfb2a6b615894d926af3fe1
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,113 +18,115 @@ | |
| package org.apache.spark.util | ||
|
|
||
| import java.lang.ref.WeakReference | ||
| import java.util | ||
| import java.util.concurrent.ConcurrentHashMap | ||
| import java.util.concurrent.atomic.AtomicInteger | ||
|
|
||
| import scala.collection.JavaConversions | ||
|
|
||
| import org.apache.spark.Logging | ||
|
|
||
| private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: WeakReference[T]) { | ||
| def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value)) | ||
| } | ||
| import scala.collection.{immutable, mutable} | ||
|
|
||
| /** | ||
| * A map that stores the timestamp of when a key was inserted along with the value, | ||
| * while ensuring that the values are weakly referenced. If the value is garbage collected and | ||
| * the weak reference is null, get() operation returns the key be non-existent. However, | ||
| * the key is actually not removed in the current implementation. Key-value pairs whose | ||
| * timestamps are older than a particular threshold time can then be removed using the | ||
| * clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a | ||
| * drop-in replacement for Scala HashMaps. | ||
| * A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped. | ||
| * | ||
| * If the value is garbage collected and the weak reference is null, get() operation returns | ||
| * a non-existent value. However, the corresponding key is actually not removed in the current | ||
| * implementation. Key-value pairs whose timestamps are older than a particular threshold time | ||
| * can then be removed using the clearOldValues method. It exposes a scala.collection.mutable.Map | ||
| * interface to allow it to be a drop-in replacement for Scala HashMaps. | ||
| * | ||
| * Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe. | ||
| * | ||
| * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed. | ||
| */ | ||
| private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false) | ||
| extends mutable.Map[A, B]() { | ||
|
|
||
| import TimeStampedWeakValueHashMap._ | ||
|
|
||
| private[spark] class TimeStampedWeakValueHashMap[A, B]() | ||
| extends WrappedJavaHashMap[A, B, A, TimeStampedWeakValue[B]] with Logging { | ||
| private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet) | ||
|
|
||
| /** Number of inserts after which keys whose weak ref values are null will be cleaned */ | ||
| private val CLEANUP_INTERVAL = 1000 | ||
| def get(key: A): Option[B] = internalMap.get(key) | ||
|
|
||
| /** Counter for counting the number of inserts */ | ||
| private val insertCounts = new AtomicInteger(0) | ||
| def iterator: Iterator[(A, B)] = internalMap.iterator | ||
|
|
||
| override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = { | ||
| val newMap = new TimeStampedWeakValueHashMap[A, B1] | ||
| newMap.internalMap += kv | ||
| newMap | ||
| } | ||
|
|
||
| private[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = { | ||
| new ConcurrentHashMap[A, TimeStampedWeakValue[B]]() | ||
| override def - (key: A): mutable.Map[A, B] = { | ||
| val newMap = new TimeStampedWeakValueHashMap[A, B] | ||
| newMap.internalMap -= key | ||
| newMap | ||
| } | ||
|
|
||
| private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { | ||
| new TimeStampedWeakValueHashMap[K1, V1]() | ||
| override def += (kv: (A, B)): this.type = { | ||
| internalMap += kv | ||
| this | ||
| } | ||
|
|
||
| override def +=(kv: (A, B)): this.type = { | ||
| // Cleanup null value at certain intervals | ||
| if (insertCounts.incrementAndGet() % CLEANUP_INTERVAL == 0) { | ||
| cleanNullValues() | ||
| } | ||
| super.+=(kv) | ||
| override def -= (key: A): this.type = { | ||
| internalMap -= key | ||
| this | ||
| } | ||
|
|
||
| override def get(key: A): Option[B] = { | ||
| Option(internalJavaMap.get(key)).flatMap { weakValue => | ||
| val value = weakValue.weakValue.get | ||
| if (value == null) { | ||
| internalJavaMap.remove(key) | ||
| } | ||
| Option(value) | ||
| } | ||
| override def update(key: A, value: B) = this += ((key, value)) | ||
|
|
||
| override def apply(key: A): B = internalMap.apply(key) | ||
|
|
||
| override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = internalMap.filter(p) | ||
|
|
||
| override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]() | ||
|
|
||
| override def size: Int = internalMap.size | ||
|
|
||
| override def foreach[U](f: ((A, B)) => U) = internalMap.foreach(f) | ||
|
|
||
| def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value) | ||
|
|
||
| def toMap: immutable.Map[A, B] = iterator.toMap | ||
|
|
||
| /** | ||
| * Remove old key-value pairs that have timestamp earlier than `threshTime`. | ||
| */ | ||
| def clearOldValues(threshTime: Long) = internalMap.clearOldValues(threshTime) | ||
|
|
||
| } | ||
|
|
||
| /** | ||
| * Helper methods for converting to and from WeakReferences. | ||
| */ | ||
| private[spark] object TimeStampedWeakValueHashMap { | ||
|
|
||
| /* Implicit conversion methods to WeakReferences */ | ||
|
|
||
| implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v) | ||
|
|
||
| implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = { | ||
| kv match { case (k, v) => (k, toWeakReference(v)) } | ||
| } | ||
|
|
||
| @inline override protected def externalValueToInternalValue(v: B): TimeStampedWeakValue[B] = { | ||
| new TimeStampedWeakValue(currentTime, v) | ||
| implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = { | ||
| (kv: (K, WeakReference[V])) => p(kv) | ||
| } | ||
|
|
||
| @inline override protected def internalValueToExternalValue(iv: TimeStampedWeakValue[B]): B = { | ||
| iv.weakValue.get | ||
| /* Implicit conversion methods from WeakReferences */ | ||
|
|
||
| implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get | ||
|
|
||
| implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = { | ||
| v.map(fromWeakReference) | ||
| } | ||
|
|
||
| override def iterator: Iterator[(A, B)] = { | ||
| val iterator = internalJavaMap.entrySet().iterator() | ||
| JavaConversions.asScalaIterator(iterator).flatMap(kv => { | ||
| val (key, value) = (kv.getKey, kv.getValue.weakValue.get) | ||
| if (value != null) Seq((key, value)) else Seq.empty | ||
| }) | ||
| implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = { | ||
| kv match { case (k, v) => (k, fromWeakReference(v)) } | ||
| } | ||
|
|
||
| /** | ||
| * Removes old key-value pairs that have timestamp earlier than `threshTime`, | ||
| * calling the supplied function on each such entry before removing. | ||
| */ | ||
| def clearOldValues(threshTime: Long, f: (A, B) => Unit = null) { | ||
| val iterator = internalJavaMap.entrySet().iterator() | ||
| while (iterator.hasNext) { | ||
| val entry = iterator.next() | ||
| if (entry.getValue.timestamp < threshTime) { | ||
| val value = entry.getValue.weakValue.get | ||
| if (f != null && value != null) { | ||
| f(entry.getKey, value) | ||
| } | ||
| logDebug("Removing key " + entry.getKey) | ||
| iterator.remove() | ||
| } | ||
| } | ||
| implicit def fromWeakReferenceIterator[K, V]( | ||
| it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = { | ||
| it.map(fromWeakReferenceTuple) | ||
| } | ||
|
|
||
| /** | ||
| * Removes keys whose weak referenced values have become null. | ||
| */ | ||
| private def cleanNullValues() { | ||
| val iterator = internalJavaMap.entrySet().iterator() | ||
| while (iterator.hasNext) { | ||
| val entry = iterator.next() | ||
| if (entry.getValue.weakValue.get == null) { | ||
| logDebug("Removing key " + entry.getKey) | ||
| iterator.remove() | ||
| } | ||
| } | ||
| implicit def fromWeakReferenceMap[K, V]( | ||
| map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = { | ||
| mutable.Map(map.mapValues(fromWeakReference).toSeq: _*) | ||
| } | ||
|
|
||
| private def currentTime = System.currentTimeMillis() | ||
| } | ||
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a question still? Seems like you chose
OptionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.