Skip to content

Commit 88904a3

Browse files
committed
Make TimeStampedWeakValueHashMap a wrapper of TimeStampedHashMap
This allows us to get rid of WrappedJavaHashMap without much duplicate code.
1 parent fbfeec8 commit 88904a3

File tree

6 files changed

+168
-479
lines changed

6 files changed

+168
-479
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
3535
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3636
import org.apache.mesos.MesosNativeLibrary
3737

38-
import org.apache.spark.broadcast.Broadcast
3938
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
4039
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4140
import org.apache.spark.rdd._

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ private[spark] class BlockManager(
209209
}
210210
}
211211

212-
/** Return the status of the block identified by the given ID, if it exists. */
212+
/** Get the BlockStatus for the block identified by the given ID, if it exists.*/
213213
def getStatus(blockId: BlockId): Option[BlockStatus] = {
214214
blockInfo.get(blockId).map { info =>
215215
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
@@ -635,9 +635,10 @@ private[spark] class BlockManager(
635635
diskStore.putValues(blockId, iterator, level, askForBytes)
636636
case ArrayBufferValues(array) =>
637637
diskStore.putValues(blockId, array, level, askForBytes)
638-
case ByteBufferValues(bytes) =>
638+
case ByteBufferValues(bytes) => {
639639
bytes.rewind()
640640
diskStore.putBytes(blockId, bytes, level)
641+
}
641642
}
642643
size = res.size
643644
res.data match {
@@ -872,7 +873,7 @@ private[spark] class BlockManager(
872873
}
873874

874875
private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) {
875-
val iterator = blockInfo.internalMap.entrySet().iterator()
876+
val iterator = blockInfo.getEntrySet.iterator
876877
while (iterator.hasNext) {
877878
val entry = iterator.next()
878879
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)

core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,64 +17,108 @@
1717

1818
package org.apache.spark.util
1919

20+
import java.util.Set
21+
import java.util.Map.Entry
2022
import java.util.concurrent.ConcurrentHashMap
2123

24+
import scala.collection.{immutable, JavaConversions, mutable}
25+
2226
import org.apache.spark.Logging
2327

24-
private[util] case class TimeStampedValue[T](timestamp: Long, value: T)
28+
private[spark] case class TimeStampedValue[V](value: V, timestamp: Long)
2529

2630
/**
27-
* A map that stores the timestamp of when a key was inserted along with the value. If specified,
28-
* the timestamp of each pair can be updated every time it is accessed.
29-
* Key-value pairs whose timestamps are older than a particular
30-
* threshold time can then be removed using the clearOldValues method. It exposes a
31-
* scala.collection.mutable.Map interface to allow it to be a drop-in replacement for Scala
32-
* HashMaps.
33-
*
34-
* Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe.
31+
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
32+
* timestamp along with each key-value pair. If specified, the timestamp of each pair can be
33+
* updated every time it is accessed. Key-value pairs whose timestamp are older than a particular
34+
* threshold time can then be removed using the clearOldValues method. This is intended to
35+
* be a drop-in replacement of scala.collection.mutable.HashMap.
3536
*
36-
* @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
37-
* updated when it is accessed
37+
* @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed
3838
*/
3939
private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
40-
extends WrappedJavaHashMap[A, B, A, TimeStampedValue[B]] with Logging {
40+
extends mutable.Map[A, B]() with Logging {
4141

42-
private[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
42+
private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
4343

44-
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
45-
new TimeStampedHashMap[K1, V1]()
44+
def get(key: A): Option[B] = {
45+
val value = internalMap.get(key)
46+
if (value != null && updateTimeStampOnGet) {
47+
internalMap.replace(key, value, TimeStampedValue(value.value, currentTime))
48+
}
49+
Option(value).map(_.value)
4650
}
4751

48-
def internalMap = internalJavaMap
52+
def iterator: Iterator[(A, B)] = {
53+
val jIterator = getEntrySet.iterator()
54+
JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue.value))
55+
}
4956

50-
override def get(key: A): Option[B] = {
51-
val timeStampedValue = internalMap.get(key)
52-
if (updateTimeStampOnGet && timeStampedValue != null) {
53-
internalJavaMap.replace(key, timeStampedValue,
54-
TimeStampedValue(currentTime, timeStampedValue.value))
55-
}
56-
Option(timeStampedValue).map(_.value)
57+
def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet()
58+
59+
override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
60+
val newMap = new TimeStampedHashMap[A, B1]
61+
val oldInternalMap = this.internalMap.asInstanceOf[ConcurrentHashMap[A, TimeStampedValue[B1]]]
62+
newMap.internalMap.putAll(oldInternalMap)
63+
kv match { case (a, b) => newMap.internalMap.put(a, TimeStampedValue(b, currentTime)) }
64+
newMap
5765
}
58-
@inline override protected def externalValueToInternalValue(v: B): TimeStampedValue[B] = {
59-
new TimeStampedValue(currentTime, v)
66+
67+
override def - (key: A): mutable.Map[A, B] = {
68+
val newMap = new TimeStampedHashMap[A, B]
69+
newMap.internalMap.putAll(this.internalMap)
70+
newMap.internalMap.remove(key)
71+
newMap
72+
}
73+
74+
override def += (kv: (A, B)): this.type = {
75+
kv match { case (a, b) => internalMap.put(a, TimeStampedValue(b, currentTime)) }
76+
this
77+
}
78+
79+
override def -= (key: A): this.type = {
80+
internalMap.remove(key)
81+
this
82+
}
83+
84+
override def update(key: A, value: B) {
85+
this += ((key, value))
6086
}
6187

62-
@inline override protected def internalValueToExternalValue(iv: TimeStampedValue[B]): B = {
63-
iv.value
88+
override def apply(key: A): B = {
89+
val value = internalMap.get(key)
90+
Option(value).map(_.value).getOrElse { throw new NoSuchElementException() }
6491
}
6592

66-
/** Atomically put if a key is absent. This exposes the existing API of ConcurrentHashMap. */
93+
override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = {
94+
JavaConversions.mapAsScalaConcurrentMap(internalMap)
95+
.map { case (k, TimeStampedValue(v, t)) => (k, v) }
96+
.filter(p)
97+
}
98+
99+
override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]()
100+
101+
override def size: Int = internalMap.size
102+
103+
override def foreach[U](f: ((A, B)) => U) {
104+
val iterator = getEntrySet.iterator()
105+
while(iterator.hasNext) {
106+
val entry = iterator.next()
107+
val kv = (entry.getKey, entry.getValue.value)
108+
f(kv)
109+
}
110+
}
111+
112+
// Should we return previous value directly or as Option?
67113
def putIfAbsent(key: A, value: B): Option[B] = {
68-
val prev = internalJavaMap.putIfAbsent(key, TimeStampedValue(currentTime, value))
114+
val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime))
69115
Option(prev).map(_.value)
70116
}
71117

72-
/**
73-
* Removes old key-value pairs that have timestamp earlier than `threshTime`,
74-
* calling the supplied function on each such entry before removing.
75-
*/
118+
def toMap: immutable.Map[A, B] = iterator.toMap
119+
76120
def clearOldValues(threshTime: Long, f: (A, B) => Unit) {
77-
val iterator = internalJavaMap.entrySet().iterator()
121+
val iterator = getEntrySet.iterator()
78122
while (iterator.hasNext) {
79123
val entry = iterator.next()
80124
if (entry.getValue.timestamp < threshTime) {
@@ -86,11 +130,12 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
86130
}
87131

88132
/**
89-
* Removes old key-value pairs that have timestamp earlier than `threshTime`
133+
* Removes old key-value pairs that have timestamp earlier than `threshTime`.
90134
*/
91135
def clearOldValues(threshTime: Long) {
92136
clearOldValues(threshTime, (_, _) => ())
93137
}
94138

95-
private def currentTime: Long = System.currentTimeMillis()
139+
private def currentTime: Long = System.currentTimeMillis
140+
96141
}

core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala

Lines changed: 83 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -18,113 +18,115 @@
1818
package org.apache.spark.util
1919

2020
import java.lang.ref.WeakReference
21-
import java.util
22-
import java.util.concurrent.ConcurrentHashMap
23-
import java.util.concurrent.atomic.AtomicInteger
2421

25-
import scala.collection.JavaConversions
26-
27-
import org.apache.spark.Logging
28-
29-
private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: WeakReference[T]) {
30-
def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value))
31-
}
22+
import scala.collection.{immutable, mutable}
3223

3324
/**
34-
* A map that stores the timestamp of when a key was inserted along with the value,
35-
* while ensuring that the values are weakly referenced. If the value is garbage collected and
36-
* the weak reference is null, get() operation returns the key be non-existent. However,
37-
* the key is actually not removed in the current implementation. Key-value pairs whose
38-
* timestamps are older than a particular threshold time can then be removed using the
39-
* clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a
40-
* drop-in replacement for Scala HashMaps.
25+
* A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped.
26+
*
27+
* If the value is garbage collected and the weak reference is null, get() operation returns
28+
* a non-existent value. However, the corresponding key is actually not removed in the current
29+
* implementation. Key-value pairs whose timestamps are older than a particular threshold time
30+
* can then be removed using the clearOldValues method. It exposes a scala.collection.mutable.Map
31+
* interface to allow it to be a drop-in replacement for Scala HashMaps.
4132
*
4233
* Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe.
34+
*
35+
* @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed.
4336
*/
37+
private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false)
38+
extends mutable.Map[A, B]() {
39+
40+
import TimeStampedWeakValueHashMap._
4441

45-
private[spark] class TimeStampedWeakValueHashMap[A, B]()
46-
extends WrappedJavaHashMap[A, B, A, TimeStampedWeakValue[B]] with Logging {
42+
private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet)
4743

48-
/** Number of inserts after which keys whose weak ref values are null will be cleaned */
49-
private val CLEANUP_INTERVAL = 1000
44+
def get(key: A): Option[B] = internalMap.get(key)
5045

51-
/** Counter for counting the number of inserts */
52-
private val insertCounts = new AtomicInteger(0)
46+
def iterator: Iterator[(A, B)] = internalMap.iterator
47+
48+
override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
49+
val newMap = new TimeStampedWeakValueHashMap[A, B1]
50+
newMap.internalMap += kv
51+
newMap
52+
}
5353

54-
private[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = {
55-
new ConcurrentHashMap[A, TimeStampedWeakValue[B]]()
54+
override def - (key: A): mutable.Map[A, B] = {
55+
val newMap = new TimeStampedWeakValueHashMap[A, B]
56+
newMap.internalMap -= key
57+
newMap
5658
}
5759

58-
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
59-
new TimeStampedWeakValueHashMap[K1, V1]()
60+
override def += (kv: (A, B)): this.type = {
61+
internalMap += kv
62+
this
6063
}
6164

62-
override def +=(kv: (A, B)): this.type = {
63-
// Cleanup null value at certain intervals
64-
if (insertCounts.incrementAndGet() % CLEANUP_INTERVAL == 0) {
65-
cleanNullValues()
66-
}
67-
super.+=(kv)
65+
override def -= (key: A): this.type = {
66+
internalMap -= key
67+
this
6868
}
6969

70-
override def get(key: A): Option[B] = {
71-
Option(internalJavaMap.get(key)).flatMap { weakValue =>
72-
val value = weakValue.weakValue.get
73-
if (value == null) {
74-
internalJavaMap.remove(key)
75-
}
76-
Option(value)
77-
}
70+
override def update(key: A, value: B) = this += ((key, value))
71+
72+
override def apply(key: A): B = internalMap.apply(key)
73+
74+
override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = internalMap.filter(p)
75+
76+
override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]()
77+
78+
override def size: Int = internalMap.size
79+
80+
override def foreach[U](f: ((A, B)) => U) = internalMap.foreach(f)
81+
82+
def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value)
83+
84+
def toMap: immutable.Map[A, B] = iterator.toMap
85+
86+
/**
87+
* Remove old key-value pairs that have timestamp earlier than `threshTime`.
88+
*/
89+
def clearOldValues(threshTime: Long) = internalMap.clearOldValues(threshTime)
90+
91+
}
92+
93+
/**
94+
* Helper methods for converting to and from WeakReferences.
95+
*/
96+
private[spark] object TimeStampedWeakValueHashMap {
97+
98+
/* Implicit conversion methods to WeakReferences */
99+
100+
implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v)
101+
102+
implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = {
103+
kv match { case (k, v) => (k, toWeakReference(v)) }
78104
}
79105

80-
@inline override protected def externalValueToInternalValue(v: B): TimeStampedWeakValue[B] = {
81-
new TimeStampedWeakValue(currentTime, v)
106+
implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = {
107+
(kv: (K, WeakReference[V])) => p(kv)
82108
}
83109

84-
@inline override protected def internalValueToExternalValue(iv: TimeStampedWeakValue[B]): B = {
85-
iv.weakValue.get
110+
/* Implicit conversion methods from WeakReferences */
111+
112+
implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get
113+
114+
implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = {
115+
v.map(fromWeakReference)
86116
}
87117

88-
override def iterator: Iterator[(A, B)] = {
89-
val iterator = internalJavaMap.entrySet().iterator()
90-
JavaConversions.asScalaIterator(iterator).flatMap(kv => {
91-
val (key, value) = (kv.getKey, kv.getValue.weakValue.get)
92-
if (value != null) Seq((key, value)) else Seq.empty
93-
})
118+
implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = {
119+
kv match { case (k, v) => (k, fromWeakReference(v)) }
94120
}
95121

96-
/**
97-
* Removes old key-value pairs that have timestamp earlier than `threshTime`,
98-
* calling the supplied function on each such entry before removing.
99-
*/
100-
def clearOldValues(threshTime: Long, f: (A, B) => Unit = null) {
101-
val iterator = internalJavaMap.entrySet().iterator()
102-
while (iterator.hasNext) {
103-
val entry = iterator.next()
104-
if (entry.getValue.timestamp < threshTime) {
105-
val value = entry.getValue.weakValue.get
106-
if (f != null && value != null) {
107-
f(entry.getKey, value)
108-
}
109-
logDebug("Removing key " + entry.getKey)
110-
iterator.remove()
111-
}
112-
}
122+
implicit def fromWeakReferenceIterator[K, V](
123+
it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = {
124+
it.map(fromWeakReferenceTuple)
113125
}
114126

115-
/**
116-
* Removes keys whose weak referenced values have become null.
117-
*/
118-
private def cleanNullValues() {
119-
val iterator = internalJavaMap.entrySet().iterator()
120-
while (iterator.hasNext) {
121-
val entry = iterator.next()
122-
if (entry.getValue.weakValue.get == null) {
123-
logDebug("Removing key " + entry.getKey)
124-
iterator.remove()
125-
}
126-
}
127+
implicit def fromWeakReferenceMap[K, V](
128+
map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = {
129+
mutable.Map(map.mapValues(fromWeakReference).toSeq: _*)
127130
}
128131

129-
private def currentTime = System.currentTimeMillis()
130132
}

0 commit comments

Comments
 (0)