From 65a0d4382f449297966d272d776aa6c03307c9bc Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Thu, 27 Jun 2019 00:51:48 -0700 Subject: [PATCH 1/2] [SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo --- .../apache/spark/sql/DataFrameWriter.scala | 54 ++++++++- .../v2/DataSourceV2DataFrameSuite.scala | 107 ++++++++++++++++++ 2 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d1d0d83bacb4..e12031ee3f90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -22,16 +22,19 @@ import java.util.{Locale, Properties, UUID} import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.StructType @@ -356,7 +359,54 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def insertInto(tableName: String): Unit = { - insertInto(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)) + import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} + + df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { + case CatalogObjectIdentifier(Some(catalog), ident) => + insertInto(catalog, ident) + case AsTableIdentifier(tableIdentifier) => + insertInto(tableIdentifier) + } + } + + private def insertInto(catalog: CatalogPlugin, ident: Identifier): Unit = { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + assertNotBucketed("insertInto") + + if (partitioningColumns.isDefined) { + throw new AnalysisException( + "insertInto() can't be used together with partitionBy(). " + + "Partition columns have already been defined for the table. " + + "It is not necessary to use partitionBy()." + ) + } + + val table = DataSourceV2Relation.create(catalog.asTableCatalog.loadTable(ident)) + + val command = modeForDSV2 match { + case SaveMode.Append => + AppendData.byName(table, df.logicalPlan) + + case SaveMode.Overwrite => + val conf = df.sparkSession.sessionState.conf + val dynamicPartitionOverwrite = table.table.partitioning.size > 0 && + conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + + if (dynamicPartitionOverwrite) { + OverwritePartitionsDynamic.byName(table, df.logicalPlan) + } else { + OverwriteByExpression.byName(table, df.logicalPlan, Literal(true)) + } + + case other => + throw new AnalysisException(s"insertInto does not support $other mode, " + + s"please use Append or Overwrite mode instead.") + } + + runCommand(df.sparkSession, "insertInto") { + command + } } private def insertInto(tableIdent: TableIdentifier): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala new file mode 100644 index 000000000000..53b5dd18769c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} +import org.apache.spark.sql.test.SharedSQLContext + +class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { + import testImplicits._ + + before { + spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) + + val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") + df.createOrReplaceTempView("source") + val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") + df2.createOrReplaceTempView("source2") + } + + after { + spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() + spark.sql("DROP VIEW source") + spark.sql("DROP VIEW source2") + } + + test("insertInto: append") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") + spark.table("source").select("id", "data").write.insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("insertInto: append - across catalog") { + val t1 = "testcat.ns1.ns2.tbl" + val t2 = "testcat2.db.tbl" + withTable(t1, t2) { + sql(s"CREATE TABLE $t1 USING foo AS TABLE source") + sql(s"CREATE TABLE $t2 (id bigint, data string) USING foo") + spark.table(t1).write.insertInto(t2) + checkAnswer(spark.table(t2), spark.table("source")) + } + } + + test("insertInto: append partitioned table - dynamic clause") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") + spark.table("source").write.insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("insertInto: overwrite non-partitioned table") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS TABLE source") + spark.table("source2").write.mode("overwrite").insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source2")) + } + } + + test("insertInto: overwrite - dynamic clause - static mode") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") + Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data").write.insertInto(t1) + spark.table("source").write.mode("overwrite").insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + } + + test("insertInto: overwrite - dynamic clause - dynamic mode") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") + Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data").write.insertInto(t1) + spark.table("source").write.mode("overwrite").insertInto(t1) + checkAnswer(spark.table(t1), + spark.table("source").union(sql("SELECT 4L, 'keep'"))) + } + } + } +} From c4eeee5b4f8bdaff442fea7db2f26d1db102c3d4 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Mon, 29 Jul 2019 09:29:39 -0700 Subject: [PATCH 2/2] Wenchen's comments --- .../apache/spark/sql/DataFrameWriter.scala | 30 +++++++------------ .../v2/DataSourceV2DataFrameSuite.scala | 6 ++-- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e12031ee3f90..39b14f5ceb98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,6 +361,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def insertInto(tableName: String): Unit = { import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} + assertNotBucketed("insertInto") + + if (partitioningColumns.isDefined) { + throw new AnalysisException( + "insertInto() can't be used together with partitionBy(). " + + "Partition columns have already been defined for the table. " + + "It is not necessary to use partitionBy()." + ) + } + df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => insertInto(catalog, ident) @@ -372,16 +382,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def insertInto(catalog: CatalogPlugin, ident: Identifier): Unit = { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - assertNotBucketed("insertInto") - - if (partitioningColumns.isDefined) { - throw new AnalysisException( - "insertInto() can't be used together with partitionBy(). " + - "Partition columns have already been defined for the table. " + - "It is not necessary to use partitionBy()." - ) - } - val table = DataSourceV2Relation.create(catalog.asTableCatalog.loadTable(ident)) val command = modeForDSV2 match { @@ -410,16 +410,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def insertInto(tableIdent: TableIdentifier): Unit = { - assertNotBucketed("insertInto") - - if (partitioningColumns.isDefined) { - throw new AnalysisException( - "insertInto() can't be used together with partitionBy(). " + - "Partition columns have already been defined for the table. " + - "It is not necessary to use partitionBy()." - ) - } - runCommand(df.sparkSession, "insertInto") { InsertIntoTable( table = UnresolvedRelation(tableIdent), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala index 53b5dd18769c..86735c627cc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -62,7 +62,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("insertInto: append partitioned table - dynamic clause") { + test("insertInto: append partitioned table") { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") @@ -80,7 +80,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("insertInto: overwrite - dynamic clause - static mode") { + test("insertInto: overwrite - static mode") { withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) { @@ -92,7 +92,7 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be } } - test("insertInto: overwrite - dynamic clause - dynamic mode") { + test("insertInto: overwrite - dynamic mode") { withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { val t1 = "testcat.ns1.ns2.tbl" withTable(t1) {