Skip to content

Commit 49da287

Browse files
committed
Use correct timezone for partition values.
1 parent dae7eba commit 49da287

File tree

8 files changed

+21
-14
lines changed

8 files changed

+21
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ abstract class ExternalCatalog {
251251
def listPartitionsByFilter(
252252
db: String,
253253
table: String,
254-
predicates: Seq[Expression]): Seq[CatalogTablePartition]
254+
predicates: Seq[Expression],
255+
defaultTimeZoneId: String): Seq[CatalogTablePartition]
255256

256257
// --------------------------------------------------------------------------
257258
// Functions

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,8 @@ class InMemoryCatalog(
547547
override def listPartitionsByFilter(
548548
db: String,
549549
table: String,
550-
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
550+
predicates: Seq[Expression],
551+
defaultTimeZoneId: String): Seq[CatalogTablePartition] = {
551552
// TODO: Provide an implementation
552553
throw new UnsupportedOperationException(
553554
"listPartitionsByFilter is not implemented for InMemoryCatalog")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,7 @@ class SessionCatalog(
838838
val table = formatTableName(tableName.table)
839839
requireDbExists(db)
840840
requireTableExists(TableIdentifier(table, Option(db)))
841-
externalCatalog.listPartitionsByFilter(db, table, predicates)
841+
externalCatalog.listPartitionsByFilter(db, table, predicates, conf.sessionLocalTimeZone)
842842
}
843843

844844
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import org.apache.spark.sql.AnalysisException
2525
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
2626
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
2727
import org.apache.spark.sql.catalyst.plans.logical._
28+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
2829
import org.apache.spark.sql.catalyst.util.quoteIdentifier
29-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3030
import org.apache.spark.sql.types.StructType
3131

3232

@@ -112,11 +112,11 @@ case class CatalogTablePartition(
112112
/**
113113
* Given the partition schema, returns a row with that schema holding the partition values.
114114
*/
115-
def toRow(partitionSchema: StructType): InternalRow = {
115+
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
116+
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
117+
val timeZoneId = caseInsensitiveProperties.getOrElse("timeZone", defaultTimeZondId)
116118
InternalRow.fromSeq(partitionSchema.map { field =>
117-
// TODO: use correct timezone for partition values.
118-
Cast(Literal(spec(field.name)), field.dataType,
119-
Option(DateTimeUtils.defaultTimeZone().getID)).eval()
119+
Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
120120
})
121121
}
122122
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ class CatalogFileIndex(
7272
val path = new Path(p.location)
7373
val fs = path.getFileSystem(hadoopConf)
7474
PartitionPath(
75-
p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
75+
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
76+
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
7677
}
7778
val partitionSpec = PartitionSpec(partitionSchema, partitions)
7879
new PrunedInMemoryFileIndex(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.catalog._
3838
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
3939
import org.apache.spark.sql.catalyst.expressions._
4040
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
41-
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
41+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
4242
import org.apache.spark.sql.execution.command.DDLUtils
4343
import org.apache.spark.sql.execution.datasources.PartitioningUtils
4444
import org.apache.spark.sql.hive.client.HiveClient
@@ -1014,7 +1014,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10141014
override def listPartitionsByFilter(
10151015
db: String,
10161016
table: String,
1017-
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
1017+
predicates: Seq[Expression],
1018+
defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
10181019
val rawTable = getRawTable(db, table)
10191020
val catalogTable = restoreTableMetadata(rawTable)
10201021
val partitionColumnNames = catalogTable.partitionColumnNames.toSet
@@ -1040,7 +1041,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10401041
val index = partitionSchema.indexWhere(_.name == att.name)
10411042
BoundReference(index, partitionSchema(index).dataType, nullable = true)
10421043
})
1043-
clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
1044+
clientPrunedPartitions.filter { p =>
1045+
boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId))
1046+
}
10441047
} else {
10451048
client.getPartitions(catalogTable).map { part =>
10461049
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))

sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ private[hive] case class MetastoreRelation(
108108
sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
109109
catalogTable.database,
110110
catalogTable.identifier.table,
111-
predicates)
111+
predicates,
112+
sparkSession.sessionState.conf.sessionLocalTimeZone)
112113
} else {
113114
allPartitions
114115
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
5252

5353
test("list partitions by filter") {
5454
val catalog = newBasicCatalog()
55-
val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1))
55+
val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT")
5656
assert(selectedPartitions.length == 1)
5757
assert(selectedPartitions.head.spec == part1.spec)
5858
}

0 commit comments

Comments
 (0)