Skip to content
Prev Previous commit
Next Next commit
remove initialMemoryThreshold
  • Loading branch information
lianhuiwang committed Jun 30, 2015
commit 5280ef05df50eb72b2df0f90f2066c5a366c9ef3
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,8 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{
// Memory manager that can be used to acquire/release memory
protected val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager

// Initial threshold for the size of a collection before we start tracking its memory usage
// Exposed for testing
protected val initialMemoryThreshold: Long =
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 0L)

// Threshold for this collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
protected var myMemoryThreshold = initialMemoryThreshold
protected var myMemoryThreshold = 0L

// Number of elements read from input since last spill
protected var _elementsRead = 0L
Expand Down Expand Up @@ -103,8 +97,8 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{
*/
private def releaseMemoryForThisThread(): Unit = {
// The amount we requested does not include the initial memory tracking threshold
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
myMemoryThreshold = initialMemoryThreshold
shuffleMemoryManager.release(myMemoryThreshold)
myMemoryThreshold = 0L
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ class ExternalAppendOnlyMap[K, V, C](

_elementsRead = 0
_memoryBytesSpilled += currentMap.estimateSize()
freeMemory = myMemoryThreshold - initialMemoryThreshold
myMemoryThreshold = initialMemoryThreshold
freeMemory = myMemoryThreshold
myMemoryThreshold = 0L
}

freeMemory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,8 @@ private[spark] class ExternalSorter[K, V, C](
} else {
_memoryBytesSpilled += buffer.estimateSize()
}
freeMemory = myMemoryThreshold - initialMemoryThreshold
myMemoryThreshold = initialMemoryThreshold
freeMemory = myMemoryThreshold
myMemoryThreshold = 0L
}
freeMemory
}
Expand Down