Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
SPARK-8309: Support for more than 12M items in OpenHashMap
  • Loading branch information
SlavikBaranov committed Jun 11, 2015
commit 39206563dc84e0d200c605717b69ff7485ea9c54
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
*/
private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
val newCapacity = _capacity * 2
require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
allocateFunc(newCapacity)
val newBitset = new BitSet(newCapacity)
val newData = new Array[T](newCapacity)
Expand Down Expand Up @@ -278,7 +279,7 @@ object OpenHashSet {

val INVALID_POS = -1
val NONEXISTENCE_MASK = 0x80000000
val POSITION_MASK = 0xEFFFFFF
val POSITION_MASK = 0x1FFFFFFF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right that this is a subtle but important bug, but it looks like the intent is to use all but the top bit. That's 0x7FFFFFFF not 0x1FFFFFFF. Therefore the max position and size is 2^31-1, not 2^29, and that's already the max value of an int, so I don't think the check is needed. Well you could check for a negative value. Basically it's reusing the sign bit that would never otherwise be used since position and size must be positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy to make it support 2^30 capacity, but support of 2^31 will require some hacks. In JDK8 maximum array size is 2^31 - 1, so we'd need to store the item with hashCode 2^31 - 1 somewhere else. It will require additional check that will probably affect performance.
As I remember, in JDK6 max array size is either 2^31 - 4 or 2^31 - 5, so JDK6 support will require some additional work.

I see following possibilities:

  1. Leave the fix as is
  2. Update it to support capacity 2^30
  3. Make it support 2^31 with some hacks
  4. Make it support even larger capacity by splitting value storage into several arrays.

IMO, second option is most reasonable, since 1B max capacity is definitely better than 500M. :)
On the other hand, options 3 & 4 look like an overkill: due to distributed nature of Spark, it's usually not necessary to collect more than a billion items on a single machine even when working with multi-billion datasets.


/**
* A set of specialized hash function implementation to avoid boxing hash code computation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,14 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
map(null) = 0
assert(map.contains(null))
}

test("support for more than 12M items") {
val cnt = 12000000 // 12M
val map = new OpenHashMap[Int, Int](cnt)
for (i <- 0 until cnt) {
map(i) = 1
}
val numInvalidValues = map.iterator.count(_._2 == 0)
assertResult(0)(numInvalidValues)
}
}