Skip to content
Prev Previous commit
Next Next commit
fix file's leak
  • Loading branch information
lianhuiwang committed Jul 1, 2015
commit f4737bbafc495f56fef2246f41c964c7295a7eb2
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,8 @@ private[spark] class ExternalSorter[K, V, C](

/**
* Spill in-memory inMemory to a temporary file on disk.
* Return on-disk iterator over a temporary file.
*/
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)])
: Iterator[((Int, K), C)] = {
private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): SpilledFile = {

val it = new WritablePartitionedIterator {
private[this] var cur = if (iterator.hasNext) iterator.next() else null
Expand All @@ -633,12 +631,7 @@ private[spark] class ExternalSorter[K, V, C](
def nextPartition(): Int = cur._1._1
}

val spillReader = new SpillReader(spillMemoryToDisk(it))

(0 until numPartitions).iterator.flatMap { p =>
val iterator = spillReader.readNextPartition()
iterator.map(cur => ((p, cur._1), cur._2))
}
spillMemoryToDisk(it)
}

/**
Expand All @@ -649,14 +642,21 @@ private[spark] class ExternalSorter[K, V, C](
extends Iterator[((Int, K), C)] {

var currentIter = memIter
var spillFile: Option[SpilledFile] = None

override def hasNext: Boolean = currentIter.hasNext

override def next(): ((Int, K), C) = currentIter.next()

private[spark] def spill() = {
if (hasNext) {
currentIter = spillMemoryToDisk(currentIter)
spillFile = Some(spillMemoryToDisk(currentIter))
val spillReader = new SpillReader(spillFile.get)

currentIter = (0 until numPartitions).iterator.flatMap { p =>
val iterator = spillReader.readNextPartition()
iterator.map(cur => ((p, cur._1), cur._2))
}
} else {
// in-memory iterator is already drained, release it by giving an empty iterator
currentIter = new Iterator[((Int, K), C)]{
Expand All @@ -666,6 +666,10 @@ private[spark] class ExternalSorter[K, V, C](
logInfo("nothing in memory inMemory, do nothing")
}
}

def cleanup(): Unit = {
spillFile.foreach(_.file.delete())
}
}

/**
Expand Down Expand Up @@ -792,6 +796,7 @@ private[spark] class ExternalSorter[K, V, C](
def stop(): Unit = {
spills.foreach(s => s.file.delete())
spills.clear()
memoryOrDiskIter.foreach(_.cleanup())
}

/**
Expand Down