Skip to content
Prev Previous commit
Next Next commit
fix style
  • Loading branch information
lianhuiwang committed Jun 30, 2015
commit e70f79f13c39591e5d33653e4de8e51d25e079dd
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
/**
* release other Spillable's memory of current thread until freeMemory >= requestedMemory
*/
private[spark] def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long =
private[this] def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long =
synchronized {
val threadId = Thread.currentThread().getId
if (toGrant >= requestMemory || !threadReservedList.contains(threadId)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{
protected val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager

// Threshold for this collection's size in bytes before we start tracking its memory usage
protected var myMemoryThreshold = 0L
private[this] var myMemoryThreshold = 0L

// Number of elements read from input since last spill
protected var _elementsRead = 0L
private[this] var _elementsRead = 0L

// Number of bytes spilled in total
protected var _memoryBytesSpilled = 0L
private[this] var _memoryBytesSpilled = 0L

// Number of spills
protected var _spillCount = 0
private[this] var _spillCount = 0

/**
* Spills the current in-memory collection to disk if needed. Attempts to acquire more
Expand Down Expand Up @@ -110,4 +110,18 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
_spillCount, if (_spillCount > 1) "s" else ""))
}

/**
* log ForceSpill and return collection's size
*/
protected def logForceSpill(currentMemory: Long): Long = {
_spillCount += 1
logSpillage(currentMemory)

_elementsRead = 0
_memoryBytesSpilled += currentMemory
val freeMemory = myMemoryThreshold
myMemoryThreshold = 0L
freeMemory
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,9 @@ class ExternalAppendOnlyMap[K, V, C](
override def forceSpill(): Long = {
var freeMemory = 0L
if (memoryOrDiskIter.isDefined) {
_spillCount += 1
logSpillage(currentMap.estimateSize())

freeMemory = logForceSpill(currentMap.estimateSize())
memoryOrDiskIter.get.spill()

_elementsRead = 0
_memoryBytesSpilled += currentMap.estimateSize()
freeMemory = myMemoryThreshold
myMemoryThreshold = 0L
}

freeMemory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ private[spark] class ExternalSorter[K, V, C](
* Spill in-memory inMemory to a temporary file on disk.
* Return on-disk iterator over a temporary file.
*/
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = {
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)])
: Iterator[((Int, K), C)] = {

val it = new WritablePartitionedIterator {
private[this] var cur = if (iterator.hasNext) iterator.next() else null

Expand All @@ -643,7 +645,8 @@ private[spark] class ExternalSorter[K, V, C](
* An iterator that read elements from in-memory iterator or disk iterator when in-memory
* iterator have spilled to disk.
*/
case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] {
case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)])
extends Iterator[((Int, K), C)] {

var currentIter = memIter

Expand Down Expand Up @@ -671,26 +674,13 @@ private[spark] class ExternalSorter[K, V, C](
override def forceSpill(): Long = {
var freeMemory = 0L
if (memoryOrDiskIter.isDefined) {
// has memory buffer that can be spilled
_spillCount += 1

val shouldCombine = aggregator.isDefined
if (shouldCombine) {
logSpillage(map.estimateSize())
freeMemory = logForceSpill(map.estimateSize())
} else {
logSpillage(buffer.estimateSize())
freeMemory = logForceSpill(buffer.estimateSize())
}

memoryOrDiskIter.get.spill()

_elementsRead = 0
if (shouldCombine) {
_memoryBytesSpilled += map.estimateSize()
} else {
_memoryBytesSpilled += buffer.estimateSize()
}
freeMemory = myMemoryThreshold
myMemoryThreshold = 0L
}
freeMemory
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.spark.SparkFunSuite
import org.apache.spark.Spillable

class FakeSpillable extends Spillable {
private[this] class FakeSpillable extends Spillable {

var myMemoryThreshold: Long = 0L

Expand Down