Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support creating hive table with DataFrameWriter and Catalog
  • Loading branch information
cloud-fan committed Jan 10, 2017
commit 4e06830dbfa41f972ea5e63c656f39ac009f1dbb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -361,10 +360,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}

val catalog = df.sparkSession.sessionState.catalog
val tableExists = catalog.tableExists(tableIdent)
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
Expand All @@ -385,6 +380,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
// Only do the check if the table is a data source table (the relation is a BaseRelation).
// TODO(cloud-fan): also check hive table relation here when we support overwrite mode
// for creating hive tables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are also facing the same issue in the insertInto(tableIdent: TableIdentifier) API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insertInto is different, it generates InsertIntoTable plan instead of CreateTable plan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      Seq((1, 2)).toDF("i", "j").write.format("parquet").saveAsTable(tableName)
      table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)

We captured the exceptions when the format is parquet. Now, when the format is hive, should we do the same thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFrameWriter.insertInto will ignore the specified provider, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we ignore the specified provider, we still respect the actual format of the table. For example, below is the Hive table. We are not blocking it. Should we block it to make them consistent?

      sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value")
      val df = sql(s"SELECT key, value FROM $tableName")
      df.write.mode("overwrite").insertInto(tableName)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not block it. This generates InsertIntoTable, and it supports hive table. What we should block is saveAsTable with Overwrite mode, which generates CreateTable.

insert overwrite is different from create table with overwrite mode

case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
if (source.toLowerCase == "hive") {
throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
}

val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val tableDesc = CatalogTable(
identifier = tableIdent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType

class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
Expand Down Expand Up @@ -1289,4 +1290,66 @@ class HiveDDLSuite
}
}
}

test("create hive serde table with Catalog") {
withTable("t") {
withTempDir { dir =>
val df = spark.catalog.createExternalTable(
"t",
"hive",
new StructType().add("i", "int"),
Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If path is not provided, it still works. However, based on our latest design decision, users must provide path when they creating an external table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the design decision, we want to hide the managed/external concept from users. I not sure if we want to rename this API...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, just issue an exception when users do not provide a path? Otherwise, we have to add new APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this problem in a follow-up PR, other data source also have this problem, e.g. users can create an external parquet table without path, so this PR doesn't introduce new problems.

assert(df.collect().isEmpty)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
}
}
}

test("create hive serde table with DataFrameWriter.saveAsTable") {
withTable("t", "t2") {
Seq(1 -> "a").toDF("i", "j")
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))

sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)

val e = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
}
assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
"to create a partitioned table using Hive"))

val e2 = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
}
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))

val e3 = intercept[AnalysisException] {
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
}
assert(e3.message.contains(
"CTAS for hive serde tables does not support append or overwrite semantics"))
}
}
}