Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Changes as per review comments (round #1)
- Merged the conversion methods for parquet and orc in one single method `convertToLogicalRelation` as suggested by @@marmbrus
- @liancheng's comment about checking for bucket spec
- 4 tests were failing because of the change. My change alters the plan for the queries.

eg. Query from `date_serde.q` :

```
select * from date_serde_orc
```

Plan (BEFORE)
```
Project [c1#282,c2#283]
+- MetastoreRelation default, date_serde_orc, None
```

Plan (AFTER)
```
Project [c1#287,c2#288]
+- SubqueryAlias date_serde_orc
   +- Relation[c1#287,c2#288] HadoopFiles
```

Setting `CONVERT_METASTORE_ORC` to `false` by default to mitigate test failures. Other option was to make `SQLBuilder` work with `Relation` but that is out of scope of the current PR. In my opinion, it would be better to have this config turned on by default so that anyone trying out Spark out of the box gets better perf. w/o needing to tweak such configs.

Open items:
- @liancheng's review comment : Test case added does not verify if the new codepath is hit
  • Loading branch information
tejasapatil committed Mar 26, 2016
commit e76b474a01a61d29a758c9dcb100e365d4f2ca00
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ private[hive] object HiveContext extends Logging {
"when \"spark.sql.hive.convertMetastoreParquet\" is true.")

val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
defaultValue = Some(true),
defaultValue = Some(false),
doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
"the built in support.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.{datasources, FileRelation}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.client._
Expand Down Expand Up @@ -446,6 +446,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
expectedBucketSpec: Option[BucketSpec],
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {

cachedDataSourceTables.getIfPresent(tableIdentifier) match {
Expand All @@ -461,6 +462,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val useCached =
relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
logical.schema.sameType(schemaInMetastore) &&
relation.bucketSpec == expectedBucketSpec &&
relation.partitionSpec == partitionSpecInMetastore.getOrElse {
PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory])
}
Expand Down Expand Up @@ -491,24 +493,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
}

private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
private def convertToLogicalRelation(metastoreRelation: MetastoreRelation,
options: Map[String, String],
defaultSource: FileFormat,
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging

val parquetOptions = Map(
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
metastoreRelation.tableName,
Some(metastoreRelation.databaseName)
).unquotedString
)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.

val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
// We're converting the entire table into ParquetRelation, so predicates to Hive metastore
// We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore
// are empty.
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
val location = p.getLocation
Expand All @@ -523,57 +521,63 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
tableIdentifier,
metastoreRelation,
metastoreSchema,
classOf[ParquetDefaultSource],
fileFormatClass,
bucketSpec,
Some(partitionSpec))

val parquetRelation = cached.getOrElse {
val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val format = new ParquetDefaultSource()
val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())

val mergedSchema = inferredSchema.map { inferred =>
ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get
}

val relation = HadoopFsRelation(
sqlContext = hive,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = mergedSchema,
bucketSpec = None, // We don't support hive bucketed tables, only ones we write out.
fileFormat = new ParquetDefaultSource(),
options = parquetOptions)
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)

val created = LogicalRelation(relation)
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
hadoopFsRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)

val cached = getCached(tableIdentifier,
metastoreRelation,
metastoreSchema,
classOf[ParquetDefaultSource],
fileFormatClass,
bucketSpec,
None)
val parquetRelation = cached.getOrElse {
val logicalRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
options = parquetOptions,
className = "parquet").resolveRelation())
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation())

cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
logicalRelation
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
Expand All @@ -583,6 +587,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = {
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
hive.convertMetastoreParquet
}

private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
val defaultSource = new ParquetDefaultSource()
val fileFormatClass = classOf[ParquetDefaultSource]

val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val options = Map(
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
relation.tableName,
Some(relation.databaseName)
).unquotedString
)

convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.resolved || plan.analyzed) {
return plan
Expand All @@ -592,123 +617,61 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)

// Write path
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)

// Read path
case relation: MetastoreRelation if hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
}
}

private def convertToOrcRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)

val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

val orcOptions = Map[String, String]()

val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
// We're converting the entire table into OrcRelation, so predicates to Hive metastore
// are empty.
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
val location = p.getLocation
val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
})
PartitionDirectory(values, location)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)

val cached = getCached(
tableIdentifier,
metastoreRelation,
metastoreSchema,
classOf[OrcDefaultSource],
Some(partitionSpec))

val orcRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val format = new OrcDefaultSource()
val inferredSchema = format.inferSchema(hive, orcOptions, fileCatalog.allFiles()).get

val relation = HadoopFsRelation(
sqlContext = hive,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = None, // We don't support hive bucketed tables, only ones we write out.
fileFormat = new OrcDefaultSource(),
options = orcOptions)

val created = LogicalRelation(relation)
cachedDataSourceTables.put(tableIdentifier, created)
created
}

orcRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)

val cached = getCached(tableIdentifier,
metastoreRelation,
metastoreSchema,
classOf[OrcDefaultSource],
None)
val orcRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
options = orcOptions,
className = "orc").resolveRelation())

cachedDataSourceTables.put(tableIdentifier, created)
created
}

orcRelation
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}

/**
* When scanning Metastore ORC tables, convert them to ORC data source relations
* for better performance.
*/
* When scanning Metastore ORC tables, convert them to ORC data source relations
* for better performance.
*/
object OrcConversions extends Rule[LogicalPlan] {
private def isConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
hive.convertMetastoreOrc
}

private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
val defaultSource = new OrcDefaultSource()
val fileFormatClass = classOf[OrcDefaultSource]
val options = Map[String, String]()

convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.resolved || plan.analyzed) {
return plan
}

plan transformUp {
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)

// Write path
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)

// Read path
case relation: MetastoreRelation if isConvertMetastoreOrc(relation) =>
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
val orcRelation = convertToOrcRelation(relation)
SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.apache.spark.sql.hive.HiveContext
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql._
Expand Down Expand Up @@ -403,7 +404,8 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {

test("SPARK-14070 Use ORC data source for SQL queries on ORC tables") {
withTempPath { dir =>
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true",
HiveContext.CONVERT_METASTORE_ORC.key -> "true") {
val path = dir.getCanonicalPath

withTable("dummy_orc") {
Expand All @@ -420,7 +422,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sqlContext.sql(
s"""INSERT INTO TABLE dummy_orc
|SELECT key, value FROM single
""".stripMargin).collect()
""".stripMargin)

val df = sqlContext.sql("SELECT * FROM dummy_orc WHERE key=0")
checkAnswer(df, singleRowDF)
Expand Down