Skip to content

Commit 5ad0360

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-25271][SQL] Hive ctas commands should use data source if it is convertible
## What changes were proposed in this pull request? In Spark 2.3.0 and previous versions, Hive CTAS command will convert to use data source to write data into the table when the table is convertible. This behavior is controlled by the configs like HiveUtils.CONVERT_METASTORE_ORC and HiveUtils.CONVERT_METASTORE_PARQUET. In 2.3.1, we drop this optimization by mistake in the PR [SPARK-22977](https://github.com/apache/spark/pull/20521/files#r217254430). Since that Hive CTAS command only uses Hive Serde to write data. This patch adds this optimization back to Hive CTAS command. This patch adds OptimizedCreateHiveTableAsSelectCommand which uses data source to write data. ## How was this patch tested? Added test. Closes apache#22514 from viirya/SPARK-25271-2. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 61c443a commit 5ad0360

File tree

8 files changed

+230
-91
lines changed

8 files changed

+230
-91
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,14 @@ object DDLUtils {
820820
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
821821
}
822822

823+
def readHiveTable(table: CatalogTable): HiveTableRelation = {
824+
HiveTableRelation(
825+
table,
826+
// Hive table columns are always nullable.
827+
table.dataSchema.asNullable.toAttributes,
828+
table.partitionSchema.asNullable.toAttributes)
829+
}
830+
823831
/**
824832
* Throws a standard error for actions that require partitionProvider = hive.
825833
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
244244
})
245245
}
246246

247-
private def readHiveTable(table: CatalogTable): LogicalPlan = {
248-
HiveTableRelation(
249-
table,
250-
// Hive table columns are always nullable.
251-
table.dataSchema.asNullable.toAttributes,
252-
table.partitionSchema.asNullable.toAttributes)
253-
}
254-
255247
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
256248
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
257249
if DDLUtils.isDatasourceTable(tableMeta) =>
258250
i.copy(table = readDataSourceTable(tableMeta))
259251

260252
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
261-
i.copy(table = readHiveTable(tableMeta))
253+
i.copy(table = DDLUtils.readHiveTable(tableMeta))
262254

263255
case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
264256
readDataSourceTable(tableMeta)
265257

266258
case UnresolvedCatalogRelation(tableMeta) =>
267-
readHiveTable(tableMeta)
259+
DDLUtils.readHiveTable(tableMeta)
268260
}
269261
}
270262

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import java.util.Locale
21+
2022
import scala.util.control.NonFatal
2123

2224
import com.google.common.util.concurrent.Striped
@@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
2931
import org.apache.spark.sql.catalyst.catalog._
3032
import org.apache.spark.sql.catalyst.plans.logical._
3133
import org.apache.spark.sql.execution.datasources._
34+
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
35+
import org.apache.spark.sql.internal.SQLConf
3236
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
3337
import org.apache.spark.sql.types._
3438

@@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
113117
}
114118
}
115119

116-
def convertToLogicalRelation(
120+
// Return true for Apache ORC and Hive ORC-related configuration names.
121+
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
122+
private def isOrcProperty(key: String) =
123+
key.startsWith("orc.") || key.contains(".orc.")
124+
125+
private def isParquetProperty(key: String) =
126+
key.startsWith("parquet.") || key.contains(".parquet.")
127+
128+
def convert(relation: HiveTableRelation): LogicalRelation = {
129+
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
130+
131+
// Consider table and storage properties. For properties existing in both sides, storage
132+
// properties will supersede table properties.
133+
if (serde.contains("parquet")) {
134+
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
135+
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
136+
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
137+
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
138+
} else {
139+
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
140+
relation.tableMeta.storage.properties
141+
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
142+
convertToLogicalRelation(
143+
relation,
144+
options,
145+
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
146+
"orc")
147+
} else {
148+
convertToLogicalRelation(
149+
relation,
150+
options,
151+
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
152+
"orc")
153+
}
154+
}
155+
}
156+
157+
private def convertToLogicalRelation(
117158
relation: HiveTableRelation,
118159
options: Map[String, String],
119160
fileFormatClass: Class[_ <: FileFormat],

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
3131
import org.apache.spark.sql.catalyst.rules.Rule
3232
import org.apache.spark.sql.execution._
3333
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
34-
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
35-
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
34+
import org.apache.spark.sql.execution.datasources.CreateTable
3635
import org.apache.spark.sql.hive.execution._
3736
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3837

@@ -181,62 +180,39 @@ case class RelationConversions(
181180
conf: SQLConf,
182181
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
183182
private def isConvertible(relation: HiveTableRelation): Boolean = {
184-
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
185-
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
186-
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
183+
isConvertible(relation.tableMeta)
187184
}
188185

189-
// Return true for Apache ORC and Hive ORC-related configuration names.
190-
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
191-
private def isOrcProperty(key: String) =
192-
key.startsWith("orc.") || key.contains(".orc.")
193-
194-
private def isParquetProperty(key: String) =
195-
key.startsWith("parquet.") || key.contains(".parquet.")
196-
197-
private def convert(relation: HiveTableRelation): LogicalRelation = {
198-
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
199-
200-
// Consider table and storage properties. For properties existing in both sides, storage
201-
// properties will supersede table properties.
202-
if (serde.contains("parquet")) {
203-
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
204-
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
205-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
206-
sessionCatalog.metastoreCatalog
207-
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
208-
} else {
209-
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
210-
relation.tableMeta.storage.properties
211-
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
212-
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
213-
relation,
214-
options,
215-
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
216-
"orc")
217-
} else {
218-
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
219-
relation,
220-
options,
221-
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
222-
"orc")
223-
}
224-
}
186+
private def isConvertible(tableMeta: CatalogTable): Boolean = {
187+
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
188+
serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
189+
serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
225190
}
226191

192+
private val metastoreCatalog = sessionCatalog.metastoreCatalog
193+
227194
override def apply(plan: LogicalPlan): LogicalPlan = {
228195
plan resolveOperators {
229196
// Write path
230197
case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
231198
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
232199
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
233200
!r.isPartitioned && isConvertible(r) =>
234-
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
201+
InsertIntoTable(metastoreCatalog.convert(r), partition,
202+
query, overwrite, ifPartitionNotExists)
235203

236204
// Read path
237205
case relation: HiveTableRelation
238206
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
239-
convert(relation)
207+
metastoreCatalog.convert(relation)
208+
209+
// CTAS
210+
case CreateTable(tableDesc, mode, Some(query))
211+
if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
212+
isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
213+
DDLUtils.checkDataColNames(tableDesc)
214+
OptimizedCreateHiveTableAsSelectCommand(
215+
tableDesc, query, query.output.map(_.name), mode)
240216
}
241217
}
242218
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ private[spark] object HiveUtils extends Logging {
110110
.booleanConf
111111
.createWithDefault(true)
112112

113+
val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas")
114+
.doc("When set to true, Spark will try to use built-in data source writer " +
115+
"instead of Hive serde in CTAS. This flag is effective only if " +
116+
"`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " +
117+
"enabled respectively for Parquet and ORC formats")
118+
.booleanConf
119+
.createWithDefault(true)
120+
113121
val HIVE_METASTORE_SHARED_PREFIXES = buildConf("spark.sql.hive.metastore.sharedPrefixes")
114122
.doc("A comma separated list of class prefixes that should be loaded using the classloader " +
115123
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 97 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,26 @@ package org.apache.spark.sql.hive.execution
2020
import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
23-
import org.apache.spark.sql.catalyst.catalog.CatalogTable
24-
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
2524
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2625
import org.apache.spark.sql.execution.SparkPlan
27-
import org.apache.spark.sql.execution.command.DataWritingCommand
26+
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils}
27+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
28+
import org.apache.spark.sql.hive.HiveSessionCatalog
2829

30+
trait CreateHiveTableAsSelectBase extends DataWritingCommand {
31+
val tableDesc: CatalogTable
32+
val query: LogicalPlan
33+
val outputColumnNames: Seq[String]
34+
val mode: SaveMode
2935

30-
/**
31-
* Create table and insert the query result into it.
32-
*
33-
* @param tableDesc the Table Describe, which may contain serde, storage handler etc.
34-
* @param query the query whose result will be insert into the new relation
35-
* @param mode SaveMode
36-
*/
37-
case class CreateHiveTableAsSelectCommand(
38-
tableDesc: CatalogTable,
39-
query: LogicalPlan,
40-
outputColumnNames: Seq[String],
41-
mode: SaveMode)
42-
extends DataWritingCommand {
43-
44-
private val tableIdentifier = tableDesc.identifier
36+
protected val tableIdentifier = tableDesc.identifier
4537

4638
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
4739
val catalog = sparkSession.sessionState.catalog
48-
if (catalog.tableExists(tableIdentifier)) {
40+
val tableExists = catalog.tableExists(tableIdentifier)
41+
42+
if (tableExists) {
4943
assert(mode != SaveMode.Overwrite,
5044
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
5145

@@ -57,15 +51,8 @@ case class CreateHiveTableAsSelectCommand(
5751
return Seq.empty
5852
}
5953

60-
// For CTAS, there is no static partition values to insert.
61-
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
62-
InsertIntoHiveTable(
63-
tableDesc,
64-
partition,
65-
query,
66-
overwrite = false,
67-
ifPartitionNotExists = false,
68-
outputColumnNames = outputColumnNames).run(sparkSession, child)
54+
val command = getWritingCommand(catalog, tableDesc, tableExists = true)
55+
command.run(sparkSession, child)
6956
} else {
7057
// TODO ideally, we should get the output data ready first and then
7158
// add the relation into catalog, just in case of failure occurs while data
@@ -77,15 +64,8 @@ case class CreateHiveTableAsSelectCommand(
7764
try {
7865
// Read back the metadata of the table which was created just now.
7966
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
80-
// For CTAS, there is no static partition values to insert.
81-
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
82-
InsertIntoHiveTable(
83-
createdTableMeta,
84-
partition,
85-
query,
86-
overwrite = true,
87-
ifPartitionNotExists = false,
88-
outputColumnNames = outputColumnNames).run(sparkSession, child)
67+
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
68+
command.run(sparkSession, child)
8969
} catch {
9070
case NonFatal(e) =>
9171
// drop the created table.
@@ -97,9 +77,89 @@ case class CreateHiveTableAsSelectCommand(
9777
Seq.empty[Row]
9878
}
9979

80+
// Returns `DataWritingCommand` which actually writes data into the table.
81+
def getWritingCommand(
82+
catalog: SessionCatalog,
83+
tableDesc: CatalogTable,
84+
tableExists: Boolean): DataWritingCommand
85+
10086
override def argString: String = {
10187
s"[Database:${tableDesc.database}, " +
10288
s"TableName: ${tableDesc.identifier.table}, " +
10389
s"InsertIntoHiveTable]"
10490
}
10591
}
92+
93+
/**
94+
* Create table and insert the query result into it.
95+
*
96+
* @param tableDesc the table description, which may contain serde, storage handler etc.
97+
* @param query the query whose result will be insert into the new relation
98+
* @param mode SaveMode
99+
*/
100+
case class CreateHiveTableAsSelectCommand(
101+
tableDesc: CatalogTable,
102+
query: LogicalPlan,
103+
outputColumnNames: Seq[String],
104+
mode: SaveMode)
105+
extends CreateHiveTableAsSelectBase {
106+
107+
override def getWritingCommand(
108+
catalog: SessionCatalog,
109+
tableDesc: CatalogTable,
110+
tableExists: Boolean): DataWritingCommand = {
111+
// For CTAS, there is no static partition values to insert.
112+
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
113+
InsertIntoHiveTable(
114+
tableDesc,
115+
partition,
116+
query,
117+
overwrite = if (tableExists) false else true,
118+
ifPartitionNotExists = false,
119+
outputColumnNames = outputColumnNames)
120+
}
121+
}
122+
123+
/**
124+
* Create table and insert the query result into it. This creates Hive table but inserts
125+
* the query result into it by using data source.
126+
*
127+
* @param tableDesc the table description, which may contain serde, storage handler etc.
128+
* @param query the query whose result will be insert into the new relation
129+
* @param mode SaveMode
130+
*/
131+
case class OptimizedCreateHiveTableAsSelectCommand(
132+
tableDesc: CatalogTable,
133+
query: LogicalPlan,
134+
outputColumnNames: Seq[String],
135+
mode: SaveMode)
136+
extends CreateHiveTableAsSelectBase {
137+
138+
override def getWritingCommand(
139+
catalog: SessionCatalog,
140+
tableDesc: CatalogTable,
141+
tableExists: Boolean): DataWritingCommand = {
142+
val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
143+
val hiveTable = DDLUtils.readHiveTable(tableDesc)
144+
145+
val hadoopRelation = metastoreCatalog.convert(hiveTable) match {
146+
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
147+
case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " +
148+
"HadoopFsRelation.")
149+
}
150+
151+
InsertIntoHadoopFsRelationCommand(
152+
hadoopRelation.location.rootPaths.head,
153+
Map.empty, // We don't support to convert partitioned table.
154+
false,
155+
Seq.empty, // We don't support to convert partitioned table.
156+
hadoopRelation.bucketSpec,
157+
hadoopRelation.fileFormat,
158+
hadoopRelation.options,
159+
query,
160+
if (tableExists) mode else SaveMode.Overwrite,
161+
Some(tableDesc),
162+
Some(hadoopRelation.location),
163+
query.output.map(_.name))
164+
}
165+
}

0 commit comments

Comments
 (0)