Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.util.Date

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.{StructField, StructType}

Expand Down Expand Up @@ -161,7 +161,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
stats: Option[Statistics] = None,
stats: Option[CatalogStatistics] = None,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
Expand Down Expand Up @@ -237,6 +237,34 @@ case class CatalogTable(
}


/**
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add few words explaining why don't use Statistics for CatalogTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* We define this new class instead of directly using [[Statistics]] here because there are no
* concepts of attributes or broadcast hint in catalog.
*/
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty) {

/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute]): Statistics = {
val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
attributeStats = AttributeMap(matched))
}

/** Readable string representation for the CatalogStatistics. */
def simpleString: String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you define a simpleString instead of override toString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we don't print column stats in it, it's not a "complete" string representation. Column stats can be too much and make CatalogTable unreadable.

val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
s"$sizeInBytes bytes$rowCountString"
}
}


case class CatalogTableType private(name: String)
object CatalogTableType {
val EXTERNAL = new CatalogTableType("EXTERNAL")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ import org.apache.spark.sql.types._
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
* @param rowCount Estimated number of rows.
* @param colStats Column-level statistics.
* @param attributeStats Statistics for Attributes.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/
case class Statistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty,
attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we estimate statistics for all attributes in logical plan?

I meant if an attribute is not coming from a leaf node but from a later plan like Join, do we still have ColumnStat for it?

If not, I think we don't need to call this parameter as attributeStats, instead of original colStats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will estimate attributes in logical plan from the bottom up.

isBroadcastable: Boolean = false) {

override def toString: String = "Statistics(" + simpleString + ")"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -64,7 +64,7 @@ case class AnalyzeColumnCommand(
AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)

// We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = Statistics(
val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes,
rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SessionState

Expand Down Expand Up @@ -62,9 +61,9 @@ case class AnalyzeTableCommand(
def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[Statistics] = None
var newStats: Option[CatalogStatistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
newStats = Some(Statistics(sizeInBytes = newTotalSize))
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
Expand All @@ -76,7 +75,8 @@ case class AnalyzeTableCommand(
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
Some(CatalogStatistics(
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ case class LogicalRelation(
override lazy val cleanArgs: Seq[Any] = Seq(relation)

@transient override lazy val statistics: Statistics = {
catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
Statistics(sizeInBytes = relation.sizeInBytes))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable
import scala.util.Random

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.StaticSQLConf
Expand All @@ -39,7 +40,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
import testImplicits._

private def checkTableStats(tableName: String, expectedRowCount: Option[Int])
: Option[Statistics] = {
: Option[CatalogStatistics] = {
val df = spark.table(tableName)
val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
Expand Down Expand Up @@ -260,4 +261,46 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
}
}
}

// This test will be run twice: with and without Hive support
test("conversion from CatalogStatistics to Statistics") {
withTable("ds_tbl", "hive_tbl") {
// Test data source table
checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
// Test hive serde table
if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
}
}
}

private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
// Create an empty table and run analyze command on it.
val createTableSql = if (isDatasourceTable) {
s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
} else {
s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
}
sql(createTableSql)
// Analyze only one column.
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
// Check catalog statistics
assert(catalogTable.stats.isDefined)
assert(catalogTable.stats.get.sizeInBytes == 0)
assert(catalogTable.stats.get.rowCount == Some(0))
assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))

// Check relation statistics
assert(relation.statistics.sizeInBytes == 0)
assert(relation.statistics.rowCount == Some(0))
assert(relation.statistics.attributeStats.size == 1)
val (attribute, colStat) = relation.statistics.attributeStats.head
assert(attribute.name == "c1")
assert(colStat == emptyColStat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
Expand Down Expand Up @@ -656,7 +656,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

table = table.copy(
stats = Some(Statistics(
stats = Some(CatalogStatistics(
sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
colStats = colStats.toMap)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[hive] case class MetastoreRelation(
}

@transient override lazy val statistics: Statistics = {
catalogTable.stats.getOrElse(Statistics(
catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
Expand Down Expand Up @@ -152,7 +152,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}

private def checkTableStats(
stats: Option[Statistics],
stats: Option[CatalogStatistics],
hasSizeInBytes: Boolean,
expectedRowCounts: Option[Int]): Unit = {
if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
Expand All @@ -168,7 +168,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
tableName: String,
isDataSourceTable: Boolean,
hasSizeInBytes: Boolean,
expectedRowCounts: Option[Int]): Option[Statistics] = {
expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
val df = sql(s"SELECT * FROM $tableName")
val stats = df.queryExecution.analyzed.collect {
case rel: MetastoreRelation =>
Expand Down Expand Up @@ -435,10 +435,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}

/** Used to test refreshing cached metadata once table stats are updated. */
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = {
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean)
: (CatalogStatistics, CatalogStatistics) = {
val tableName = "tbl"
var statsBeforeUpdate: Statistics = null
var statsAfterUpdate: Statistics = null
var statsBeforeUpdate: CatalogStatistics = null
var statsAfterUpdate: CatalogStatistics = null
withTable(tableName) {
val tableIndent = TableIdentifier(tableName, Some("default"))
val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
Expand Down