Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5384f7a
[SPARK-21213][SQL] Support collecting partition-level statistics: row…
mbasmanova Jun 12, 2017
3ee5ebf
[SPARK-21213][SQL] review comments
mbasmanova Jun 28, 2017
d17aa4b
[SPARK-21213][SQL] improved comments per review feedback
mbasmanova Jun 28, 2017
e0e351e
[SPARK-21213][SQL] typo
mbasmanova Jun 28, 2017
8dad9bc
[SPARK-21213][SQL] add support for partial partition specs
mbasmanova Jun 29, 2017
4fdefd5
[SPARK-21213][SQL] add support for partition specs where some partiti…
mbasmanova Jun 29, 2017
1d696c3
[SPARK-21213][SQL] comment update
mbasmanova Jun 29, 2017
89c0767
[SPARK-21213][SQL] removed extra space
mbasmanova Jun 29, 2017
7210568
[SPARK-21213][SQL] addressed easy review comments
mbasmanova Jul 5, 2017
9aa2a1e
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Jul 5, 2017
fa21860
[SPARK-21213][SQL] added test case for (ds, hr=11) partition spec
mbasmanova Jul 5, 2017
f76f49f
[SPARK-21213][SQL] addressed review comments; fixed PARTITION (ds, hr…
mbasmanova Jul 11, 2017
8f31f53
[SPARK-21213][SQL] shorted new test
mbasmanova Jul 11, 2017
fae6d49
[SPARK-21213][SQL] added documentation; added test for an empty table
mbasmanova Jul 11, 2017
8880fbd
[SPARK-21213][SQL] review comments
mbasmanova Jul 31, 2017
1053991
[SPARK-21213][SQL] fixed bad merge of SPARK-21599
mbasmanova Aug 7, 2017
41ab30d
[SPARK-21213][SQL] added support for spark.sql.caseSensitive; address…
mbasmanova Aug 8, 2017
dc488e5
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Aug 8, 2017
c839855
[SPARK-21213][SQL] Added a test for DESC PARTITION after ANALYZE; rev…
mbasmanova Aug 10, 2017
72e2cd5
[SPARK-21213][SQL] added DROP TABLE to describe-part-after-analyze.sql
mbasmanova Aug 10, 2017
87594d6
[SPARK-21213][SQL] check that partition columns in the partition spec…
mbasmanova Aug 17, 2017
3353afa
[SPARK-21213][SQL] use PartitioningUtils.normalizePartitionSpec to ha…
mbasmanova Aug 18, 2017
8ffb140
[SPARK-21213][SQL] review comments
mbasmanova Aug 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21213][SQL] check that partition columns in the partition spec…
… form a prefix of the partition columns defined in table schema
  • Loading branch information
mbasmanova committed Aug 17, 2017
commit 87594d6aee3e7e1d5a3e5e340331dd7965ea5426
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,37 @@ case class AnalyzePartitionCommand(

private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = {
val partitionColumnNames = table.partitionColumnNames.toSet
val keys =
if (conf.caseSensitiveAnalysis) partitionSpec.keys
else partitionSpec.keys.map(_.toLowerCase)
val invalidColumnNames = keys.filterNot(partitionColumnNames.contains(_))
val partitionSpecWithCase =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of changing the case, could you call sparkSession.sessionState.conf.resolver? There are many examples in the code base. Thanks!

if (conf.caseSensitiveAnalysis) partitionSpec
else partitionSpec.map { case (k, v) => (k.toLowerCase, v)}
val invalidColumnNames = partitionSpecWithCase.keys.filterNot(partitionColumnNames.contains(_))
if (invalidColumnNames.nonEmpty) {
val tableId = table.identifier
throw new AnalysisException(s"Partition specification for table '${tableId.table}' " +
s"in database '${tableId.database.get}' refers to unknown partition column(s): " +
invalidColumnNames.mkString(","))
}

val filteredSpec = partitionSpec.filter(_._2.isDefined)
// Report an error if partition columns in partition specification do not form
// a prefix of the list of partition columns defined in the table schema
val isSpecified =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> isNotSpecified

table.partitionColumnNames.map(partitionSpecWithCase.getOrElse(_, None).isEmpty)
if (isSpecified.init.zip(isSpecified.tail).contains((true, false))) {
val tableId = table.identifier
val schemaColumns = table.partitionColumnNames.mkString(",")
val specColumns = partitionSpecWithCase.keys.mkString(",")
throw new AnalysisException("The list of partition columns with values " +
s"in partition specification for table '${tableId.table}' " +
s"in database '${tableId.database.get}' is not a prefix of the list of " +
"partition columns defined in the table schema. " +
s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
}

val filteredSpec = partitionSpecWithCase.filter(_._2.isDefined).mapValues(_.get)
if (filteredSpec.isEmpty) {
None
} else {
if (conf.caseSensitiveAnalysis) Some(filteredSpec.mapValues(_.get))
else Some(filteredSpec.map { case (key, value) => (key.toLowerCase, value.get) })
Some(filteredSpec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,45 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}

test("analyze partial partition specifications") {

val tableName = "analyzeTable_part"

def assertAnalysisException(partitionSpec: String): Unit = {
val message = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $tableName $partitionSpec COMPUTE STATISTICS")
}.getMessage
assert(message.contains("The list of partition columns with values " +
s"in partition specification for table '${tableName.toLowerCase}' in database 'default' " +
"is not a prefix of the list of partition columns defined in the table schema"))
}

withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName (key STRING, value STRING)
|PARTITIONED BY (a STRING, b INT, c STRING)
""".stripMargin)

sql(s"INSERT INTO TABLE $tableName PARTITION (a='a1', b=10, c='c1') SELECT * FROM src")

sql(s"ANALYZE TABLE $tableName PARTITION (a='a1') COMPUTE STATISTICS")
sql(s"ANALYZE TABLE $tableName PARTITION (a='a1', b=10) COMPUTE STATISTICS")
sql(s"ANALYZE TABLE $tableName PARTITION (A='a1', b=10) COMPUTE STATISTICS")
sql(s"ANALYZE TABLE $tableName PARTITION (b=10, a='a1') COMPUTE STATISTICS")
sql(s"ANALYZE TABLE $tableName PARTITION (b=10, A='a1') COMPUTE STATISTICS")

assertAnalysisException("PARTITION (b=10)")
assertAnalysisException("PARTITION (a, b=10)")
assertAnalysisException("PARTITION (b=10, c='c1')")
assertAnalysisException("PARTITION (a, b=10, c='c1')")
assertAnalysisException("PARTITION (c='c1')")
assertAnalysisException("PARTITION (a, b, c='c1')")
assertAnalysisException("PARTITION (a='a1', c='c1')")
assertAnalysisException("PARTITION (a='a1', b, c='c1')")
}
}

test("analyze non-existent partition") {

def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = {
Expand Down