diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d2a1af080091..d003365db4de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,8 +21,8 @@ import java.util.Date import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.{StructField, StructType} @@ -161,7 +161,7 @@ case class CatalogTable( createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, properties: Map[String, String] = Map.empty, - stats: Option[Statistics] = None, + stats: Option[CatalogStatistics] = None, viewOriginalText: Option[String] = None, viewText: Option[String] = None, comment: Option[String] = None, @@ -237,6 +237,34 @@ case class CatalogTable( } +/** + * This class of statistics is used in [[CatalogTable]] to interact with metastore. + * We define this new class instead of directly using [[Statistics]] here because there are no + * concepts of attributes or broadcast hint in catalog. + */ +case class CatalogStatistics( + sizeInBytes: BigInt, + rowCount: Option[BigInt] = None, + colStats: Map[String, ColumnStat] = Map.empty) { + + /** + * Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based + * on column names. + */ + def toPlanStats(planOutput: Seq[Attribute]): Statistics = { + val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _)) + Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount, + attributeStats = AttributeMap(matched)) + } + + /** Readable string representation for the CatalogStatistics. */ + def simpleString: String = { + val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else "" + s"$sizeInBytes bytes$rowCountString" + } +} + + case class CatalogTableType private(name: String) object CatalogTableType { val EXTERNAL = new CatalogTableType("EXTERNAL") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 465fbab5716a..91404d4bb81b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -41,13 +41,13 @@ import org.apache.spark.sql.types._ * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it * defaults to the product of children's `sizeInBytes`. * @param rowCount Estimated number of rows. - * @param colStats Column-level statistics. + * @param attributeStats Statistics for Attributes. * @param isBroadcastable If true, output is small enough to be used in a broadcast join. */ case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, - colStats: Map[String, ColumnStat] = Map.empty, + attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil), isBroadcastable: Boolean = false) { override def toString: String = "Statistics(" + simpleString + ")" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 9dffe3614a87..1340c9bece38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -64,7 +64,7 @@ case class AnalyzeColumnCommand( AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames) // We also update table-level stats in order to keep them consistent with column-level stats. - val statistics = Statistics( + val statistics = CatalogStatistics( sizeInBytes = sizeInBytes, rowCount = Some(rowCount), // Newly computed column stats should override the existing ones. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 52a8fc88c56c..4a994e34aff8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -25,8 +25,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SessionState @@ -62,9 +61,9 @@ case class AnalyzeTableCommand( def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) - var newStats: Option[Statistics] = None + var newStats: Option[CatalogStatistics] = None if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - newStats = Some(Statistics(sizeInBytes = newTotalSize)) + newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) } // We only set rowCount when noscan is false, because otherwise: // 1. when total size is not changed, we don't need to alter the table; @@ -76,7 +75,8 @@ case class AnalyzeTableCommand( newStats = if (newStats.isDefined) { newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) } else { - Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) + Some(CatalogStatistics( + sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 7c28d48f2641..3fd40384d28e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -73,7 +73,7 @@ case class LogicalRelation( override lazy val cleanArgs: Seq[Any] = Seq(relation) @transient override lazy val statistics: Statistics = { - catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse( + catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse( Statistics(sizeInBytes = relation.sizeInBytes)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index c663b31351b5..18abb18587ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.StaticSQLConf @@ -39,7 +40,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared import testImplicits._ private def checkTableStats(tableName: String, expectedRowCount: Option[Int]) - : Option[Statistics] = { + : Option[CatalogStatistics] = { val df = spark.table(tableName) val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation => assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount) @@ -260,4 +261,46 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils } } } + + // This test will be run twice: with and without Hive support + test("conversion from CatalogStatistics to Statistics") { + withTable("ds_tbl", "hive_tbl") { + // Test data source table + checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true) + // Test hive serde table + if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { + checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false) + } + } + } + + private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = { + // Create an empty table and run analyze command on it. + val createTableSql = if (isDatasourceTable) { + s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET" + } else { + s"CREATE TABLE $tableName (c1 INT, c2 STRING)" + } + sql(createTableSql) + // Analyze only one column. + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") + val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { + case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable) + case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) + }.head + val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) + // Check catalog statistics + assert(catalogTable.stats.isDefined) + assert(catalogTable.stats.get.sizeInBytes == 0) + assert(catalogTable.stats.get.rowCount == Some(0)) + assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat)) + + // Check relation statistics + assert(relation.statistics.sizeInBytes == 0) + assert(relation.statistics.rowCount == Some(0)) + assert(relation.statistics.attributeStats.size == 1) + val (attribute, colStat) = relation.statistics.attributeStats.head + assert(attribute.name == "c1") + assert(colStat == emptyColStat) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 544f277cdf97..1f9a667490fb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils @@ -656,7 +656,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } table = table.copy( - stats = Some(Statistics( + stats = Some(CatalogStatistics( sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)), rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), colStats = colStats.toMap))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 3bbac05a79c2..2e60cba09d52 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -113,7 +113,7 @@ private[hive] case class MetastoreRelation( } @transient override lazy val statistics: Statistics = { - catalogTable.stats.getOrElse(Statistics( + catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( sizeInBytes = { val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5ae202fdc98d..8803ea36de95 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ @@ -152,7 +152,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } private def checkTableStats( - stats: Option[Statistics], + stats: Option[CatalogStatistics], hasSizeInBytes: Boolean, expectedRowCounts: Option[Int]): Unit = { if (hasSizeInBytes || expectedRowCounts.nonEmpty) { @@ -168,7 +168,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto tableName: String, isDataSourceTable: Boolean, hasSizeInBytes: Boolean, - expectedRowCounts: Option[Int]): Option[Statistics] = { + expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { val df = sql(s"SELECT * FROM $tableName") val stats = df.queryExecution.analyzed.collect { case rel: MetastoreRelation => @@ -435,10 +435,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } /** Used to test refreshing cached metadata once table stats are updated. */ - private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = { + private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean) + : (CatalogStatistics, CatalogStatistics) = { val tableName = "tbl" - var statsBeforeUpdate: Statistics = null - var statsAfterUpdate: Statistics = null + var statsBeforeUpdate: CatalogStatistics = null + var statsAfterUpdate: CatalogStatistics = null withTable(tableName) { val tableIndent = TableIdentifier(tableName, Some("default")) val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]