From 1c3b06e48dc6a0e6ab01b4ff84a43c42aa140d0e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 12 Apr 2015 03:27:16 +0800 Subject: [PATCH 1/4] Clean up accumulators used in InMemoryRelation when it is uncached. --- .../org/apache/spark/sql/CacheManager.scala | 1 + .../columnar/InMemoryColumnarTableScan.scala | 43 +++++++++++++++++-- .../apache/spark/sql/CachedTableSuite.scala | 18 ++++++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index ca4a127120b3..acdacdaf0161 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -113,6 +113,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) + cachedData(dataIndex).cachedRepresentation.removeAccumulators() cachedData.remove(dataIndex) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 6eee0c86d6a1..787ea08bace6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.columnar import java.nio.ByteBuffer -import org.apache.spark.Accumulator +import org.apache.spark.{Accumulable, Accumulator, Accumulators} import org.apache.spark.sql.catalyst.expressions import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row +import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ @@ -41,6 +43,29 @@ private[sql] object InMemoryRelation { child: SparkPlan, tableName: Option[String]): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() + + private val accumulators = HashMap[String, ArrayBuffer[Accumulable[_, _]]]() + + private[sql] def addAccumulator(relation: InMemoryRelation, acc: Accumulable[_, _]): Unit = { + val index = relation.tableName.map { n => + s"In-memory table $n" + }.getOrElse(relation.child.toString) + + if (!accumulators.contains(index)) { + accumulators.put(index, ArrayBuffer[Accumulable[_, _]]()) + } + accumulators.get(index).get += acc + } + + private[sql] def removeAccumulators(relation: InMemoryRelation): Unit = { + val index = relation.tableName.map { n => + s"In-memory table $n" + }.getOrElse(relation.child.toString) + + if (accumulators.contains(index)) { + accumulators.get(index).get.foreach(acc => Accumulators.remove(acc.id)) + } + } } private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row) @@ -59,6 +84,19 @@ private[sql] case class InMemoryRelation( private val batchStats = child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row]) + InMemoryRelation.addAccumulator(this, batchStats) + + private[sql] def applyScanAccumulators(sc: SparkContext): (Accumulator[Int], Accumulator[Int]) = { + val accs = (sc.accumulator(0), sc.accumulator(0)) + InMemoryRelation.addAccumulator(this, accs._1) + InMemoryRelation.addAccumulator(this, accs._2) + accs + } + + private[sql] def removeAccumulators(): Unit = { + InMemoryRelation.removeAccumulators(this) + } + val partitionStatistics = new PartitionStatistics(output) private def computeSizeInBytes = { @@ -245,8 +283,7 @@ private[sql] case class InMemoryColumnarTableScan( } // Accumulators used for testing purposes - val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) - val readBatches: Accumulator[Int] = sparkContext.accumulator(0) + val (readPartitions, readBatches) = relation.applyScanAccumulators(sparkContext) private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index f7b5f08beb92..01e3b8671071 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -22,6 +22,7 @@ import scala.language.{implicitConversions, postfixOps} import org.scalatest.concurrent.Eventually._ +import org.apache.spark.Accumulators import org.apache.spark.sql.TestData._ import org.apache.spark.sql.columnar._ import org.apache.spark.sql.test.TestSQLContext._ @@ -297,4 +298,21 @@ class CachedTableSuite extends QueryTest { sql("Clear CACHE") assert(cacheManager.isEmpty) } + + test("Clear accumulators when uncacheTable to prevent memory leaking") { + val accsSize = Accumulators.originals.size + + sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1") + sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2") + cacheTable("t1") + cacheTable("t2") + sql("SELECT * FROM t1").count() + sql("SELECT * FROM t2").count() + sql("SELECT * FROM t1").count() + sql("SELECT * FROM t2").count() + uncacheTable("t1") + uncacheTable("t2") + + assert(accsSize >= Accumulators.originals.size) + } } From 26c9bb61b70391f2408bbb0846a8e486baf3ec4d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 12 Apr 2015 15:26:55 +0800 Subject: [PATCH 2/4] Add configuration to enable in-memory table scan accumulators. --- .../columnar/InMemoryColumnarTableScan.scala | 22 ++++++++++++++----- .../columnar/PartitionBatchPruningSuite.scala | 2 ++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 787ea08bace6..ddc3a45aa199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -282,14 +282,24 @@ private[sql] case class InMemoryColumnarTableScan( } } + lazy val enableAccumulators: Boolean = + sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean + // Accumulators used for testing purposes - val (readPartitions, readBatches) = relation.applyScanAccumulators(sparkContext) + lazy val (readPartitions, readBatches) = + if (enableAccumulators) { + relation.applyScanAccumulators(sparkContext) + } else { + (null, null) + } private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning override def execute(): RDD[Row] = { - readPartitions.setValue(0) - readBatches.setValue(0) + if (enableAccumulators) { + readPartitions.setValue(0) + readBatches.setValue(0) + } relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator => val partitionFilter = newPredicate( @@ -339,7 +349,7 @@ private[sql] case class InMemoryColumnarTableScan( } } - if (rows.hasNext) { + if (rows.hasNext && enableAccumulators) { readPartitions += 1 } @@ -358,7 +368,9 @@ private[sql] case class InMemoryColumnarTableScan( logInfo(s"Skipping partition based on stats $statsString") false } else { - readBatches += 1 + if (enableAccumulators) { + readBatches += 1 + } true } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala index e57bb06e7263..2a0b701cad7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala @@ -39,6 +39,8 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be // Enable in-memory partition pruning setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, "true") + // Enable in-memory table scan accumulators + setConf("spark.sql.inMemoryTableScanStatistics.enable", "true") } override protected def afterAll(): Unit = { From dc1d5d52493cd7317bf54580315d2a96e3814aeb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 15 Apr 2015 09:56:43 +0800 Subject: [PATCH 3/4] For comments. --- .../org/apache/spark/sql/CacheManager.scala | 3 +- .../columnar/InMemoryColumnarTableScan.scala | 67 ++++++------------- 2 files changed, 20 insertions(+), 50 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index acdacdaf0161..18584c2dcf79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -112,8 +112,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") - cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) - cachedData(dataIndex).cachedRepresentation.removeAccumulators() + cachedData(dataIndex).cachedRepresentation.uncache(blocking) cachedData.remove(dataIndex) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index ddc3a45aa199..6993e2ea941a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -43,29 +43,6 @@ private[sql] object InMemoryRelation { child: SparkPlan, tableName: Option[String]): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() - - private val accumulators = HashMap[String, ArrayBuffer[Accumulable[_, _]]]() - - private[sql] def addAccumulator(relation: InMemoryRelation, acc: Accumulable[_, _]): Unit = { - val index = relation.tableName.map { n => - s"In-memory table $n" - }.getOrElse(relation.child.toString) - - if (!accumulators.contains(index)) { - accumulators.put(index, ArrayBuffer[Accumulable[_, _]]()) - } - accumulators.get(index).get += acc - } - - private[sql] def removeAccumulators(relation: InMemoryRelation): Unit = { - val index = relation.tableName.map { n => - s"In-memory table $n" - }.getOrElse(relation.child.toString) - - if (accumulators.contains(index)) { - accumulators.get(index).get.foreach(acc => Accumulators.remove(acc.id)) - } - } } private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row) @@ -78,24 +55,15 @@ private[sql] case class InMemoryRelation( child: SparkPlan, tableName: Option[String])( private var _cachedColumnBuffers: RDD[CachedBatch] = null, - private var _statistics: Statistics = null) + private var _statistics: Statistics = null, + private var _batchStats: Accumulable[ArrayBuffer[Row], Row] = null) extends LogicalPlan with MultiInstanceRelation { - private val batchStats = - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row]) - - InMemoryRelation.addAccumulator(this, batchStats) - - private[sql] def applyScanAccumulators(sc: SparkContext): (Accumulator[Int], Accumulator[Int]) = { - val accs = (sc.accumulator(0), sc.accumulator(0)) - InMemoryRelation.addAccumulator(this, accs._1) - InMemoryRelation.addAccumulator(this, accs._2) - accs - } - - private[sql] def removeAccumulators(): Unit = { - InMemoryRelation.removeAccumulators(this) - } + private val batchStats: Accumulable[ArrayBuffer[Row], Row] = + if (_batchStats == null) + child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row]) + else + _batchStats val partitionStatistics = new PartitionStatistics(output) @@ -199,7 +167,7 @@ private[sql] case class InMemoryRelation( def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { InMemoryRelation( newOutput, useCompression, batchSize, storageLevel, child, tableName)( - _cachedColumnBuffers, statisticsToBePropagated) + _cachedColumnBuffers, statisticsToBePropagated, batchStats) } override def children: Seq[LogicalPlan] = Seq.empty @@ -213,13 +181,20 @@ private[sql] case class InMemoryRelation( child, tableName)( _cachedColumnBuffers, - statisticsToBePropagated).asInstanceOf[this.type] + statisticsToBePropagated, + batchStats).asInstanceOf[this.type] } def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers override protected def otherCopyArgs: Seq[AnyRef] = - Seq(_cachedColumnBuffers, statisticsToBePropagated) + Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) + + private[sql] def uncache(blocking: Boolean): Unit = { + Accumulators.remove(batchStats.id) + cachedColumnBuffers.unpersist(blocking) + _cachedColumnBuffers = null + } } private[sql] case class InMemoryColumnarTableScan( @@ -286,12 +261,8 @@ private[sql] case class InMemoryColumnarTableScan( sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean // Accumulators used for testing purposes - lazy val (readPartitions, readBatches) = - if (enableAccumulators) { - relation.applyScanAccumulators(sparkContext) - } else { - (null, null) - } + lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) + lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0) private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning From 0b41235b4becf1cd7becd399796bc29d41f5f66e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 15 Apr 2015 10:06:42 +0800 Subject: [PATCH 4/4] fix style. --- .../spark/sql/columnar/InMemoryColumnarTableScan.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 6993e2ea941a..d9b6fb43ab83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -60,10 +60,11 @@ private[sql] case class InMemoryRelation( extends LogicalPlan with MultiInstanceRelation { private val batchStats: Accumulable[ArrayBuffer[Row], Row] = - if (_batchStats == null) + if (_batchStats == null) { child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row]) - else + } else { _batchStats + } val partitionStatistics = new PartitionStatistics(output)