Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5384f7a
[SPARK-21213][SQL] Support collecting partition-level statistics: row…
mbasmanova Jun 12, 2017
3ee5ebf
[SPARK-21213][SQL] review comments
mbasmanova Jun 28, 2017
d17aa4b
[SPARK-21213][SQL] improved comments per review feedback
mbasmanova Jun 28, 2017
e0e351e
[SPARK-21213][SQL] typo
mbasmanova Jun 28, 2017
8dad9bc
[SPARK-21213][SQL] add support for partial partition specs
mbasmanova Jun 29, 2017
4fdefd5
[SPARK-21213][SQL] add support for partition specs where some partiti…
mbasmanova Jun 29, 2017
1d696c3
[SPARK-21213][SQL] comment update
mbasmanova Jun 29, 2017
89c0767
[SPARK-21213][SQL] removed extra space
mbasmanova Jun 29, 2017
7210568
[SPARK-21213][SQL] addressed easy review comments
mbasmanova Jul 5, 2017
9aa2a1e
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Jul 5, 2017
fa21860
[SPARK-21213][SQL] added test case for (ds, hr=11) partition spec
mbasmanova Jul 5, 2017
f76f49f
[SPARK-21213][SQL] addressed review comments; fixed PARTITION (ds, hr…
mbasmanova Jul 11, 2017
8f31f53
[SPARK-21213][SQL] shorted new test
mbasmanova Jul 11, 2017
fae6d49
[SPARK-21213][SQL] added documentation; added test for an empty table
mbasmanova Jul 11, 2017
8880fbd
[SPARK-21213][SQL] review comments
mbasmanova Jul 31, 2017
1053991
[SPARK-21213][SQL] fixed bad merge of SPARK-21599
mbasmanova Aug 7, 2017
41ab30d
[SPARK-21213][SQL] added support for spark.sql.caseSensitive; address…
mbasmanova Aug 8, 2017
dc488e5
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Aug 8, 2017
c839855
[SPARK-21213][SQL] Added a test for DESC PARTITION after ANALYZE; rev…
mbasmanova Aug 10, 2017
72e2cd5
[SPARK-21213][SQL] added DROP TABLE to describe-part-after-analyze.sql
mbasmanova Aug 10, 2017
87594d6
[SPARK-21213][SQL] check that partition columns in the partition spec…
mbasmanova Aug 17, 2017
3353afa
[SPARK-21213][SQL] use PartitioningUtils.normalizePartitionSpec to ha…
mbasmanova Aug 18, 2017
8ffb140
[SPARK-21213][SQL] review comments
mbasmanova Aug 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ object CatalogStorageFormat {
*
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
* @param parameters some parameters for the partition, for example, stats.
* @param parameters some parameters for the partition
* @param stats optional statistics (number of rows, total size, etc.)
*/
case class CatalogTablePartition(
spec: CatalogTypes.TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty) {
parameters: Map[String, String] = Map.empty,
stats: Option[CatalogStatistics] = None) {

def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
val map = new mutable.LinkedHashMap[String, String]()
Expand All @@ -106,6 +108,7 @@ case class CatalogTablePartition(
if (parameters.nonEmpty) {
map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
}
stats.foreach(s => map.put("Partition Statistics", s.simpleString))
Copy link
Member

@gatorsmile gatorsmile Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a test case. Could you add a new test file to this test suite for analyzing and describing table partitions? You can generate the result file by running the command:

SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile , this code doesn't make partition stats appear in the output of DESC command. It only adds stats into to CatalogTablePartition.toString output (similar to CatalogTable.toString). Do you still want me to add some tests for this functionality?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please add it. Try it and we should expose it to the external users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. It works like you said it would. :-) Adding a test.

map
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}

/**
* Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command.
* Example SQL for analyzing table :
* Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]]
* or an [[AnalyzeColumnCommand]] command.
* Example SQL for analyzing a table or a set of partitions :
* {{{
* ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
* ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
* COMPUTE STATISTICS [NOSCAN];
* }}}
*
* Example SQL for analyzing columns :
* {{{
* ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
* ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*/
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
if (ctx.partitionSpec != null) {
logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}")
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}
if (ctx.identifier != null) {
if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)

val table = visitTableIdentifier(ctx.tableIdentifier)
if (ctx.identifierSeq() == null) {
if (ctx.partitionSpec != null) {
AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
noscan = ctx.identifier != null)
} else {
AnalyzeTableCommand(table, noscan = ctx.identifier != null)
}
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
} else if (ctx.identifierSeq() == null) {
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false)
} else {
if (ctx.partitionSpec != null) {
logWarning("Partition specification is ignored when collecting column statistics: " +
ctx.partitionSpec.getText)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member

@gatorsmile gatorsmile Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should issue an exception here; otherwise, users might get confused since it has different behaviors from Hive. See the following Hive doc:

If you issue the command:
ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS FOR COLUMNS;
then column statistics are gathered for all columns for partition3 (ds='2008-04-09', hr=11). This is available in Hive 0.10.0 and later.

Copy link
Contributor Author

@mbasmanova mbasmanova Jul 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing behavior. ANALYZE TABLE command simply ignores PARTITION clause and issues a warning. This PR is adding support for PARTITION clause to COMPUTE STATISTICS command, but keeps COMPUTE STATISTICS FOR COLUMNS behavior unmodified. I'm planning to add partition support to FOR COLUMN In a follow-up PR.

Changing this code to throw an exception in this PR will break existing uses if folks are relying on a skip-with-warning behavior.

Would it be OK to keep this functionality unchanged in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let us keep it unchanged,

}
AnalyzeColumnCommand(
visitTableIdentifier(ctx.tableIdentifier),
table,
visitIdentifierSeq(ctx.identifierSeq()))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
import org.apache.spark.sql.execution.datasources.PartitioningUtils

/**
* Analyzes a given set of partitions to generate per-partition statistics, which will be used in
* query optimizations.
*
* When `partitionSpec` is empty, statistics for all partitions are collected and stored in
* Metastore.
*
* When `partitionSpec` mentions only some of the partition columns, all partitions with
* matching values for specified columns are processed.
*
* If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised.
*
* By default, total number of rows and total size in bytes are calculated. When `noscan`
* is `true`, only total size in bytes is computed.
*/
case class AnalyzePartitionCommand(
tableIdent: TableIdentifier,
partitionSpec: Map[String, Option[String]],
noscan: Boolean = true) extends RunnableCommand {

private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = {
val normalizedPartitionSpec =
PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames,
table.identifier.quotedString, conf.resolver)

// Report an error if partition columns in partition specification do not form
// a prefix of the list of partition columns defined in the table schema
val isNotSpecified =
table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty)
if (isNotSpecified.init.zip(isNotSpecified.tail).contains((true, false))) {
val tableId = table.identifier
val schemaColumns = table.partitionColumnNames.mkString(",")
val specColumns = normalizedPartitionSpec.keys.mkString(",")
throw new AnalysisException("The list of partition columns with values " +
s"in partition specification for table '${tableId.table}' " +
s"in database '${tableId.database.get}' is not a prefix of the list of " +
"partition columns defined in the table schema. " +
s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
}

val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
if (filteredSpec.isEmpty) {
None
} else {
Some(filteredSpec)
}
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}

val partitionValueSpec = getPartitionSpec(tableMeta)

val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)

if (partitions.isEmpty) {
if (partitionValueSpec.isDefined) {
throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
} else {
// the user requested to analyze all partitions for a table which has no partitions
// return normally, since there is nothing to do
return Seq.empty[Row]
}
}

// Compute statistics for individual partitions
val rowCounts: Map[TablePartitionSpec, BigInt] =
if (noscan) {
Map.empty
} else {
calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
}

// Update the metastore if newly computed statistics are different from those
// recorded in the metastore.
val newPartitions = partitions.flatMap { p =>
val newTotalSize = CommandUtils.calculateLocationSize(
sessionState, tableMeta.identifier, p.storage.locationUri)
val newRowCount = rowCounts.get(p.spec)
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
newStats.map(_ => p.copy(stats = newStats))
}

if (newPartitions.nonEmpty) {
sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
}

Seq.empty[Row]
}

private def calculateRowCountsPerPartition(
sparkSession: SparkSession,
tableMeta: CatalogTable,
partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = {
val filter = if (partitionValueSpec.isDefined) {
val filters = partitionValueSpec.get.map {
case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
}
filters.reduce(And)
} else {
Literal.TrueLiteral
}

val tableDf = sparkSession.table(tableMeta.identifier)
val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))

val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count()

df.collect().map { r =>
val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
val count = BigInt(r.getLong(partitionColumns.size))
(spec, count)
}.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType


/**
Expand All @@ -37,31 +37,15 @@ case class AnalyzeTableCommand(
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}

// Compute stats for the whole table
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
val newRowCount =
if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))

val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[CatalogStatistics] = None
if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = sparkSession.table(tableIdentWithDB).count()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
Some(CatalogStatistics(
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
}
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
// Refresh the cached data source table in the catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.internal.SessionState


Expand Down Expand Up @@ -112,4 +112,29 @@ object CommandUtils extends Logging {
size
}

def compareAndGetNewStats(
oldStats: Option[CatalogStatistics],
newTotalSize: BigInt,
newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(-1L)
val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[CatalogStatistics] = None
if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (newRowCount.isDefined) {
if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = newRowCount))
} else {
Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount))
}
}
}
newStats
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet
PARTITIONED BY (ds, hr);

INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10)
VALUES ('k1', 100), ('k2', 200), ('k3', 300);

INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11)
VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401);

INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5)
VALUES ('k1', 102), ('k2', 202);

DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);

-- Collect stats for a single partition
ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS;

DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);

-- Collect stats for 2 partitions
ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS;

DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);

-- Collect stats for all partitions
ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS;

DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5);

-- DROP TEST TABLES/VIEWS
DROP TABLE t;
Loading