Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ import org.apache.spark.annotation.DeveloperApi
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
*
* The map can support up to 536870912 elements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a note on how you arrived at this number?

*
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
@DeveloperApi
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
extends Iterable[(K, V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")

private val MAXIMUM_CAPACITY = (1 << 29)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just put this in the object like you did in PartitionedPairBuffer? In general I think it's good to keep constants static.


require(initialCapacity <= MAXIMUM_CAPACITY,
s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")

private val LOAD_FACTOR = 0.7
Expand Down Expand Up @@ -193,8 +199,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)

/** Increase table size by 1, rehashing if necessary */
private def incrementSize() {
if (curSize == MAXIMUM_CAPACITY) {
throw new IllegalStateException(s"Can't put more that ${MAXIMUM_CAPACITY} elements")
}
curSize += 1
if (curSize > growThreshold) {
if (curSize > growThreshold && capacity < MAXIMUM_CAPACITY) {
growTable()
}
}
Expand All @@ -206,12 +215,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)

/** Double the table's size and re-hash everything */
protected def growTable() {
val newCapacity = capacity * 2
if (newCapacity >= (1 << 30)) {
// We can't make the table this big because we want an array of 2x
// that size for our data, but array sizes are at most Int.MaxValue
throw new Exception("Can't make capacity bigger than 2^29 elements")
}
// capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
val newCapacity = (capacity * 2) min MAXIMUM_CAPACITY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we prefer min(capacity * 2, MAXIMUM_CAPACITY) over this kind of Scala syntax.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean capacity * 2).min(MAXIMUM_CAPACITY), right? Updated.

val newData = new Array[AnyRef](2 * newCapacity)
val newMask = newCapacity - 1
// Insert all our old values into the new array. Note that because our old keys are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
/**
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
* The buffer can support up to 1073741823 elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
{
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
import PartitionedPairBuffer._

require(initialCapacity <= MAXIMUM_CAPACITY,
s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")

// Basic growable array data structure. We use a single array of AnyRef to hold both the keys
Expand All @@ -51,11 +56,15 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)

/** Double the size of the array because we've reached capacity */
private def growArray(): Unit = {
if (capacity == (1 << 29)) {
// Doubling the capacity would create an array bigger than Int.MaxValue, so don't
throw new Exception("Can't grow buffer beyond 2^29 elements")
if (capacity >= MAXIMUM_CAPACITY) {
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity = capacity * 2
val newCapacity =
if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
}
val newArray = new Array[AnyRef](2 * newCapacity)
System.arraycopy(data, 0, newArray, 0, 2 * capacity)
data = newArray
Expand Down Expand Up @@ -90,3 +99,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}
}
}

private[spark] object PartitionedPairBuffer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could actually be strictly private. Not a big deal at all if you decide to keep this as is.

val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._
* | keyStart | keyValLen | partitionId |
* +-------------+------------+------------+-------------+
*
* The buffer can support up to 536870911 records.
*
* @param metaInitialRecords The initial number of entries in the metadata buffer.
* @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records.
* @param serializerInstance the serializer used for serializing inserted records.
Expand All @@ -63,6 +65,8 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
" Java-serialized objects.")
}

require(metaInitialRecords <= MAXIMUM_RECORDS,
s"Can't make capacity bigger than ${MAXIMUM_RECORDS} records")
private var metaBuffer = IntBuffer.allocate(metaInitialRecords * RECORD_SIZE)

private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize)
Expand All @@ -89,11 +93,17 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](

/** Double the size of the array because we've reached capacity */
private def growMetaBuffer(): Unit = {
if (metaBuffer.capacity.toLong * 2 > Int.MaxValue) {
// Doubling the capacity would create an array bigger than Int.MaxValue, so don't
throw new Exception(s"Can't grow buffer beyond ${Int.MaxValue} bytes")
if (metaBuffer.capacity >= MAXIMUM_META_BUFFER_CAPACITY) {
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_RECORDS} records")
}
val newMetaBuffer = IntBuffer.allocate(metaBuffer.capacity * 2)
val newCapacity =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have too many records, we'll end up failing when we try to do a put, right? Can we make this fail with a more explicit message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. How about now?

if (metaBuffer.capacity * 2 < 0 || metaBuffer.capacity * 2 > MAXIMUM_META_BUFFER_CAPACITY) {
// Overflow
MAXIMUM_META_BUFFER_CAPACITY
} else {
metaBuffer.capacity * 2
}
val newMetaBuffer = IntBuffer.allocate(newCapacity)
newMetaBuffer.put(metaBuffer.array)
metaBuffer = newMetaBuffer
}
Expand Down Expand Up @@ -257,6 +267,9 @@ private[spark] object PartitionedSerializedPairBuffer {
val PARTITION = 3
val RECORD_SIZE = PARTITION + 1 // num ints of metadata

val MAXIMUM_RECORDS = Int.MaxValue / RECORD_SIZE // (2 ^ 29) - 1
val MAXIMUM_META_BUFFER_CAPACITY = MAXIMUM_RECORDS * RECORD_SIZE // (2 ^ 31) - 4

def getKeyStartPos(metaBuffer: IntBuffer, metaBufferPos: Int): Long = {
val lower32 = metaBuffer.get(metaBufferPos + KEY_START)
val upper32 = metaBuffer.get(metaBufferPos + KEY_START + 1)
Expand Down