Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5384f7a
[SPARK-21213][SQL] Support collecting partition-level statistics: row…
mbasmanova Jun 12, 2017
3ee5ebf
[SPARK-21213][SQL] review comments
mbasmanova Jun 28, 2017
d17aa4b
[SPARK-21213][SQL] improved comments per review feedback
mbasmanova Jun 28, 2017
e0e351e
[SPARK-21213][SQL] typo
mbasmanova Jun 28, 2017
8dad9bc
[SPARK-21213][SQL] add support for partial partition specs
mbasmanova Jun 29, 2017
4fdefd5
[SPARK-21213][SQL] add support for partition specs where some partiti…
mbasmanova Jun 29, 2017
1d696c3
[SPARK-21213][SQL] comment update
mbasmanova Jun 29, 2017
89c0767
[SPARK-21213][SQL] removed extra space
mbasmanova Jun 29, 2017
7210568
[SPARK-21213][SQL] addressed easy review comments
mbasmanova Jul 5, 2017
9aa2a1e
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Jul 5, 2017
fa21860
[SPARK-21213][SQL] added test case for (ds, hr=11) partition spec
mbasmanova Jul 5, 2017
f76f49f
[SPARK-21213][SQL] addressed review comments; fixed PARTITION (ds, hr…
mbasmanova Jul 11, 2017
8f31f53
[SPARK-21213][SQL] shorted new test
mbasmanova Jul 11, 2017
fae6d49
[SPARK-21213][SQL] added documentation; added test for an empty table
mbasmanova Jul 11, 2017
8880fbd
[SPARK-21213][SQL] review comments
mbasmanova Jul 31, 2017
1053991
[SPARK-21213][SQL] fixed bad merge of SPARK-21599
mbasmanova Aug 7, 2017
41ab30d
[SPARK-21213][SQL] added support for spark.sql.caseSensitive; address…
mbasmanova Aug 8, 2017
dc488e5
[SPARK-21213][SQL] addressed remaining review comments
mbasmanova Aug 8, 2017
c839855
[SPARK-21213][SQL] Added a test for DESC PARTITION after ANALYZE; rev…
mbasmanova Aug 10, 2017
72e2cd5
[SPARK-21213][SQL] added DROP TABLE to describe-part-after-analyze.sql
mbasmanova Aug 10, 2017
87594d6
[SPARK-21213][SQL] check that partition columns in the partition spec…
mbasmanova Aug 17, 2017
3353afa
[SPARK-21213][SQL] use PartitioningUtils.normalizePartitionSpec to ha…
mbasmanova Aug 18, 2017
8ffb140
[SPARK-21213][SQL] review comments
mbasmanova Aug 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21213][SQL] addressed easy review comments
  • Loading branch information
mbasmanova committed Aug 16, 2017
commit 7210568198e3b60ce3e255a1c8c5f46faa64b41f
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {

val partitionSpec =
if (ctx.partitionSpec != null) {
val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(x => x._2.isDefined)
val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined)
if (filteredSpec.isEmpty) {
None
} else {
Some(filteredSpec.mapValues(v => v.get))
Some(filteredSpec.mapValues(_.get))
}
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ case class AnalyzeTableCommand(
calculateRowCountsPerPartition(sparkSession, tableMeta)
}

partitions.foreach(p => {
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta, p)
partitions.foreach { p =>
val newTotalSize = CommandUtils.calculateLocationSize(sessionState,
tableMeta.identifier, p.storage.locationUri)
val newRowCount = rowCounts.get(p.spec)

def updateStats(newStats: CatalogStatistics): Unit = {
Expand All @@ -87,7 +88,7 @@ case class AnalyzeTableCommand(
}

calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, updateStats)
})
}
}

Seq.empty[Row]
Expand All @@ -99,27 +100,22 @@ case class AnalyzeTableCommand(
val filters = partitionSpec.get.map {
case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
}
val filter = filters match {
case head :: tail =>
if (tail.isEmpty) head
else tail.foldLeft(head: Expression)((a, b) => And(a, b))
}
val filter = filters.reduce(And)

val partitionColumns = tableMeta.partitionColumnNames.map(c => Column(c))
val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))

val df = sparkSession.table(tableMeta.identifier).filter(Column(filter))
.groupBy(partitionColumns: _*).count()

val numPartitionColumns = partitionColumns.size
val partitionColumnIndexes = 0 to (numPartitionColumns - 1)

df.collect().map(r => {
val partitionColumnValues = partitionColumnIndexes.map(i => r.get(i).toString)
df.collect().map { r =>
val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
val spec: TablePartitionSpec =
tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
val count = BigInt(r.getLong(numPartitionColumns))
(spec, count)
}).toMap
}.toMap
}

private def calculateAndUpdateStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ object CommandUtils extends Logging {
}
}

def calculateTotalSize(
sessionState: SessionState,
catalogTable: CatalogTable,
partition: CatalogTablePartition): Long = {
calculateLocationSize(sessionState, catalogTable.identifier, partition.storage.locationUri)
}

def calculateLocationSize(
sessionState: SessionState,
identifier: TableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// convert table statistics to properties so that we can persist them through hive client
var statsProperties =
if (stats.isDefined) {
statsToHiveProperties(stats.get, rawTable.schema)
statsToProperties(stats.get, rawTable.schema)
} else {
new mutable.HashMap[String, String]()
}
Expand Down Expand Up @@ -696,7 +696,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

// Restore Spark's statistics from information in Metastore.
val restoredStats =
statsFromHiveProperties(table.properties, table.identifier.table, table.schema)
statsFromProperties(table.properties, table.identifier.table, table.schema)
if (restoredStats.isDefined) {
table = table.copy(stats = restoredStats)
}
Expand Down Expand Up @@ -1002,7 +1002,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
currentFullPath
}

private def statsToHiveProperties(
private def statsToProperties(
stats: CatalogStatistics,
schema: StructType): Map[String, String] = {

Expand All @@ -1023,7 +1023,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
statsProperties
}

private def statsFromHiveProperties(
private def statsFromProperties(
properties: Map[String, String],
table: String,
schema: StructType): Option[CatalogStatistics] = {
Expand Down Expand Up @@ -1075,7 +1075,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// convert partition statistics to properties so that we can persist them through hive api
val withStatsProps = lowerCasedParts.map(p => {
if (p.stats.isDefined) {
val statsProperties = statsToHiveProperties(p.stats.get, rawTable.schema)
val statsProperties = statsToProperties(p.stats.get, rawTable.schema)
p.copy(parameters = p.parameters ++ statsProperties)
} else {
p
Expand Down Expand Up @@ -1105,7 +1105,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

// construct Spark's statistics from information in Hive metastore
val restoredStats =
statsFromHiveProperties(partition.parameters, table.identifier.table, table.schema)
statsFromProperties(partition.parameters, table.identifier.table, table.schema)
if (restoredStats.isDefined) {
partition.copy(
spec = restoredSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
""".stripMargin)
sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') SELECT * FROM src")

sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS").collect()
sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS")

sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS").collect()
sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS")

assert(queryStats("2010-01-01").rowCount.get === 500)
assert(queryStats("2010-01-01").sizeInBytes === 5812)
Expand Down