Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5384f7a
[SPARK-21213][SQL] Support collecting partition-level statistics: row…
mbasmanova Jun 12, 2017
3ee5ebf
[SPARK-21213][SQL] review comments
mbasmanova Jun 28, 2017
d17aa4b
[SPARK-21213][SQL] improved comments per review feedback
mbasmanova Jun 28, 2017
e0e351e
[SPARK-21213][SQL] typo
mbasmanova Jun 28, 2017
8dad9bc
[SPARK-21213][SQL] add support for partial partition specs
mbasmanova Jun 29, 2017
4fdefd5
[SPARK-21213][SQL] add support for partition specs where some partiti…
mbasmanova Jun 29, 2017
1d696c3
[SPARK-21213][SQL] comment update
mbasmanova Jun 29, 2017
89c0767
[SPARK-21213][SQL] removed extra space
mbasmanova Jun 29, 2017
7210568
[SPARK-21213][SQL] addressed easy review comments
mbasmanova Jul 5, 2017
9aa2a1e
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Jul 5, 2017
fa21860
[SPARK-21213][SQL] added test case for (ds, hr=11) partition spec
mbasmanova Jul 5, 2017
f76f49f
[SPARK-21213][SQL] addressed review comments; fixed PARTITION (ds, hr…
mbasmanova Jul 11, 2017
8f31f53
[SPARK-21213][SQL] shorted new test
mbasmanova Jul 11, 2017
fae6d49
[SPARK-21213][SQL] added documentation; added test for an empty table
mbasmanova Jul 11, 2017
8880fbd
[SPARK-21213][SQL] review comments
mbasmanova Jul 31, 2017
1053991
[SPARK-21213][SQL] fixed bad merge of SPARK-21599
mbasmanova Aug 7, 2017
41ab30d
[SPARK-21213][SQL] added support for spark.sql.caseSensitive; address…
mbasmanova Aug 8, 2017
dc488e5
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Aug 8, 2017
c839855
[SPARK-21213][SQL] Added a test for DESC PARTITION after ANALYZE; rev…
mbasmanova Aug 10, 2017
72e2cd5
[SPARK-21213][SQL] added DROP TABLE to describe-part-after-analyze.sql
mbasmanova Aug 10, 2017
87594d6
[SPARK-21213][SQL] check that partition columns in the partition spec…
mbasmanova Aug 17, 2017
3353afa
[SPARK-21213][SQL] use PartitioningUtils.normalizePartitionSpec to ha…
mbasmanova Aug 18, 2017
8ffb140
[SPARK-21213][SQL] review comments
mbasmanova Aug 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21213][SQL] addressed review comments; fixed PARTITION (ds, hr…
…) case
  • Loading branch information
mbasmanova committed Aug 16, 2017
commit f76f49ffc4358208651fc8876ba978ed73f336ba
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if (ctx.identifier != null &&
        ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
      throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val partitionSpec = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)


val partitionSpec =
if (ctx.partitionSpec != null) {
val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined)
if (filteredSpec.isEmpty) {
None
} else {
Some(filteredSpec.mapValues(_.get))
}
} else {
None
}

val table = visitTableIdentifier(ctx.tableIdentifier)
if (ctx.identifierSeq() == null) {
if (partitionSpec.isDefined) {
AnalyzePartitionCommand(table, partitionSpec.get, noscan = ctx.identifier != null)
if (ctx.partitionSpec != null) {
AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
noscan = ctx.identifier != null)
} else {
AnalyzeTableCommand(table, noscan = ctx.identifier != null)
Copy link
Contributor

@wzhfy wzhfy Jul 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova IIUC, the logic is wrong here. For example, when analyzing partition (ds, hr), we should not remove them in parser. Currently we parse it to AnalyzeTableCommand, which collects table-level stats. But what we need to do is to collect partition-level stats for all partitions.
Please check hive's behavior here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy , good catch. Thank you! I'll fix this.

}
} else {
if (partitionSpec.isDefined) {
logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}")
if (ctx.partitionSpec != null) {
logWarning("Partition specification is ignored when collecting column statistics: " +
ctx.partitionSpec.getText)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member

@gatorsmile gatorsmile Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should issue an exception here; otherwise, users might get confused since it has different behaviors from Hive. See the following Hive doc:

If you issue the command:
ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS FOR COLUMNS;
then column statistics are gathered for all columns for partition3 (ds='2008-04-09', hr=11). This is available in Hive 0.10.0 and later.

Copy link
Contributor Author

@mbasmanova mbasmanova Jul 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing behavior. ANALYZE TABLE command simply ignores PARTITION clause and issues a warning. This PR is adding support for PARTITION clause to COMPUTE STATISTICS command, but keeps COMPUTE STATISTICS FOR COLUMNS behavior unmodified. I'm planning to add partition support to FOR COLUMN In a follow-up PR.

Changing this code to throw an exception in this PR will break existing uses if folks are relying on a skip-with-warning behavior.

Would it be OK to keep this functionality unchanged in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let us keep it unchanged,

}
AnalyzeColumnCommand(
table,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}

/**
* Analyzes a given set of partitions to generate per-partition statistics, which will be used in
* query optimizations.
*/
case class AnalyzePartitionCommand(
tableIdent: TableIdentifier,
partitionSpec: Map[String, Option[String]],
noscan: Boolean = true) extends RunnableCommand {

private def validatePartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = {
Copy link
Member

@gatorsmile gatorsmile Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validatePartitionSpec -> getStaticPartitionSpec?

val partitionColumnNames = table.partitionColumnNames.toSet
val invalidColumnNames = partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we also need to consider the conf spark.sql.caseSensitive. There are multiple examples for the similar issues.

if (invalidColumnNames.nonEmpty) {
val tableId = table.identifier
throw new AnalysisException(s"Partition specification for table '${tableId.table}' " +
s"in database '${tableId.database}' refers to unknown partition column(s): " +
invalidColumnNames.mkString(","))
}

val filteredSpec = partitionSpec.filter(_._2.isDefined)
if (filteredSpec.isEmpty) {
None
} else {
Some(filteredSpec.mapValues(_.get))
}
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}

val partitionValueSpec = validatePartitionSpec(tableMeta)

val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)

if (partitions.isEmpty) {
if (partitionValueSpec.isDefined) {
throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
} else {
// the user requested to analyze all partitions for a table which has no partitions
// return normally, since there is nothing to do
return Seq.empty[Row]
}
}

// Compute statistics for individual partitions
val rowCounts: Map[TablePartitionSpec, BigInt] =
if (noscan) {
Map.empty
} else {
calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
}

// Update the metastore if newly computed statistics are different from those
// recorded in the metastore.
val partitionStats = partitions.map { p =>
val newTotalSize = CommandUtils.calculateLocationSize(sessionState,
tableMeta.identifier, p.storage.locationUri)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

      val newTotalSize = CommandUtils.calculateLocationSize(
        sessionState, tableMeta.identifier, p.storage.locationUri)

val newRowCount = rowCounts.get(p.spec)
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
(p, newStats)
}

val newPartitions = partitionStats.filter(_._2.isDefined).map { case (p, newStats) =>
p.copy(stats = newStats)
}.toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 98-108 can be simplified to

    val newPartitions = partitions.flatMap { p =>
      val newTotalSize = CommandUtils.calculateLocationSize(
        sessionState, tableMeta.identifier, p.storage.locationUri)
      val newRowCount = rowCounts.get(p.spec)
      val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
      newStats.map(_ => p.copy(stats = newStats))
    }


if (newPartitions.nonEmpty) {
sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
}

Seq.empty[Row]
}

private def calculateRowCountsPerPartition(
sparkSession: SparkSession,
tableMeta: CatalogTable,
partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = {
val filter = if (partitionValueSpec.isDefined) {
val filters = partitionValueSpec.get.map {
case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
}
Some(filters.reduce(And))
} else {
None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to Literal.TrueLiteral

}

val tableDf = sparkSession.table(tableMeta.identifier)
val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))

val df = if (filter.isDefined) {
tableDf.filter(Column(filter.get)).groupBy(partitionColumns: _*).count()
} else {
tableDf.groupBy(partitionColumns: _*).count()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the above change, then this line can be simplified to

val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count()


val numPartitionColumns = partitionColumns.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please Inline this one


df.collect().map { r =>
val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
val spec: TablePartitionSpec =
tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap

val count = BigInt(r.getLong(numPartitionColumns))
(spec, count)
}.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType}


/**
* Analyzes the given table to generate statistics, which will be used in
* query optimizations.
* Analyzes the given table to generate statistics, which will be used in query optimizations.
*/
case class AnalyzeTableCommand(
tableIdent: TableIdentifier,
Expand Down Expand Up @@ -61,81 +57,3 @@ case class AnalyzeTableCommand(
Seq.empty[Row]
}
}

/**
* Analyzes a given set of partitions to generate per-partition statistics, which will be used in
* query optimizations.
*/
case class AnalyzePartitionCommand(
tableIdent: TableIdentifier,
partitionSpec: TablePartitionSpec,
noscan: Boolean = true) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}

val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, Some(partitionSpec))

if (partitions.isEmpty) {
throw new NoSuchPartitionException(db, tableIdent.table, partitionSpec)
}

// Compute statistics for individual partitions
val rowCounts: Map[TablePartitionSpec, BigInt] =
if (noscan) {
Map.empty
} else {
calculateRowCountsPerPartition(sparkSession, tableMeta)
}

// Update the metastore if newly computed statistics are different from those
// recorded in the metastore.
val partitionStats = partitions.map { p =>
val newTotalSize = CommandUtils.calculateLocationSize(sessionState,
tableMeta.identifier, p.storage.locationUri)
val newRowCount = rowCounts.get(p.spec)
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
(p, newStats)
}

val newPartitions = partitionStats.filter(_._2.isDefined).map { case (p, newStats) =>
p.copy(stats = newStats)
}.toList

if (newPartitions.nonEmpty) {
sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
}

Seq.empty[Row]
}

private def calculateRowCountsPerPartition(
sparkSession: SparkSession,
tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = {
val filters = partitionSpec.map {
case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
}
val filter = filters.reduce(And)

val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))

val df = sparkSession.table(tableMeta.identifier).filter(Column(filter))
.groupBy(partitionColumns: _*).count()

val numPartitionColumns = partitionColumns.size

df.collect().map { r =>
val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
val spec: TablePartitionSpec =
tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
val count = BigInt(r.getLong(numPartitionColumns))
(spec, count)
}.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,31 +259,33 @@ class SparkSqlParserSuite extends AnalysisTest {
assertEqual("analyze table t compute statistics noscan",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
assertEqual("analyze table t partition (a) compute statistics nOscAn",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true))

// Partitions specified
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> "2008-04-09", "hr" -> "11")))
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> "2008-04-09", "hr" -> "11")))
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> "2008-04-09")))
partitionSpec = Map("ds" -> Some("2008-04-09"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> "2008-04-09")))
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> "2008-04-09")))
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("hr" -> "11")))
partitionSpec = Map("ds" -> None, "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> None, "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> None, "hr" -> None)))

intercept("analyze table t compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
Expand Down
Loading