diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala index d2003fd6892e1..6911843999392 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -32,8 +32,9 @@ class AnalysisException protected[sql] ( val message: String, val line: Option[Int] = None, val startPosition: Option[Int] = None, - val plan: Option[LogicalPlan] = None) - extends Exception with Serializable { + val plan: Option[LogicalPlan] = None, + val cause: Option[Throwable] = None) + extends Exception(message, cause.orNull) with Serializable { def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = { val newException = new AnalysisException(message, line, startPosition) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index cf9286e6b97a6..371c198aa3493 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 20e22baa351a9..f48b8cde422ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -556,9 +556,9 @@ class SparkSession private( } - /* ------------------------ * - | Catalog-related methods | - * ----------------- ------ */ + /* ------------------------- * + | Catalog-related methods | + * ------------------------- */ /** * Interface through which the user may create, drop, alter or query underlying diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 4b9aab612e7c3..9956c5b09236d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -118,8 +118,8 @@ case class CreateDataSourceTableCommand( /** * A command used to create a data source table using the result of a query. * - * Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference. - * This is not intended for temporary tables. + * Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for + * difference. This is not intended for temporary tables. * * The syntax of using this command in SQL is: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index 38317d46dd82d..d554937d8b400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -77,4 +77,3 @@ object HiveSerDe { serdeMap.get(key) } } - diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5ffd8ef149a1e..b8bc9ab900ad1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -21,6 +21,8 @@ import java.util import scala.util.control.NonFatal +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.thrift.TException @@ -35,7 +37,9 @@ import org.apache.spark.sql.hive.client.HiveClient * A persistent implementation of the system catalog using Hive. * All public methods must be synchronized for thread-safety. */ -private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging { +private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration) + extends ExternalCatalog with Logging { + import CatalogTypes.TablePartitionSpec // Exceptions thrown by the hive client that we would like to wrap @@ -68,7 +72,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat body } catch { case NonFatal(e) if isClientException(e) => - throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage) + throw new AnalysisException( + e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) } } @@ -147,7 +152,41 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat ignoreIfExists: Boolean): Unit = withClient { requireDbExists(db) requireDbMatches(db, tableDefinition) - client.createTable(tableDefinition, ignoreIfExists) + + if ( + // If this is an external data source table... + tableDefinition.properties.contains("spark.sql.sources.provider") && + tableDefinition.tableType == CatalogTableType.EXTERNAL && + // ... that is not persisted as Hive compatible format (external tables in Hive compatible + // format always set `locationUri` to the actual data location and should NOT be hacked as + // following.) + tableDefinition.storage.locationUri.isEmpty + ) { + // !! HACK ALERT !! + // + // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary + // directory that doesn't exist yet but can definitely be successfully created, and then + // delete it right after creating the external data source table. This location will be + // persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't + // really use it. Also, since we only do this workaround for external tables, deleting the + // directory after the fact doesn't do any harm. + // + // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details. + val tempPath = { + val dbLocation = getDatabase(tableDefinition.database).locationUri + new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__") + } + + try { + client.createTable( + tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + ignoreIfExists) + } finally { + FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) + } + } else { + client.createTable(tableDefinition, ignoreIfExists) + } } override def dropTable( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index f0d96403e8551..a0106ee882e76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -51,6 +51,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext) /** * A catalog that interacts with the Hive metastore. */ - override lazy val externalCatalog = new HiveExternalCatalog(metadataHive) - + override lazy val externalCatalog = + new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 71d5c9960a70c..47fa41823cd09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.util.{CircularBuffer, Utils} /** @@ -323,7 +324,7 @@ private[hive] class HiveClientImpl( } override def listDatabases(pattern: String): Seq[String] = withHiveState { - client.getDatabasesByPattern(pattern).asScala.toSeq + client.getDatabasesByPattern(pattern).asScala } override def getTableOption( @@ -351,6 +352,8 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "bucketing" } + val properties = h.getParameters.asScala.toMap + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -368,14 +371,27 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h), + locationUri = shim.getDataLocation(h).filterNot { _ => + // SPARK-15269: Persisted data source tables always store the location URI as a SerDe + // property named "path" instead of standard Hive `dataLocation`, because Hive only + // allows directory paths as location URIs while Spark SQL data source tables also + // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL + // data source tables. + DDLUtils.isDatasourceTable(properties) && + h.getTableType == HiveTableType.EXTERNAL_TABLE && + // Spark SQL may also save external data source in Hive compatible format when + // possible, so that these tables can be directly accessed by Hive. For these tables, + // `dataLocation` is still necessary. Here we also check for input format class + // because only these Hive compatible tables set this field. + h.getInputFormatClass == null + }, inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), serde = Option(h.getSerializationLib), compressed = h.getTTable.getSd.isCompressed, serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap ), - properties = h.getParameters.asScala.toMap, + properties = properties, viewOriginalText = Option(h.getViewOriginalText), viewText = Option(h.getViewExpandedText), unsupportedFeatures = unsupportedFeatures) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index bf9935ae41b30..175889b08b49f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -37,7 +37,8 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { protected override val utils: CatalogTestUtils = new CatalogTestUtils { override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat" - override def newEmptyCatalog(): ExternalCatalog = new HiveExternalCatalog(client) + override def newEmptyCatalog(): ExternalCatalog = + new HiveExternalCatalog(client, new Configuration()) } protected override def resetState(): Unit = client.reset() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 2c50cc88cc4cc..3d8123d3c06d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1104,4 +1104,17 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } } + + test("SPARK-15269 external data source table creation") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(1).write.json(path) + + withTable("t") { + sql(s"CREATE TABLE t USING json OPTIONS (PATH '$path')") + sql("DROP TABLE t") + sql(s"CREATE TABLE t USING json AS SELECT 1 AS c") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index f789d88d5dd4a..3f3dc122093b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -28,11 +28,11 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing import testImplicits._ test("data source table with user specified schema") { - withTable("ddl_test1") { + withTable("ddl_test") { val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile sql( - s"""CREATE TABLE ddl_test1 ( + s"""CREATE TABLE ddl_test ( | a STRING, | b STRING, | `extra col` ARRAY, @@ -45,55 +45,55 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing """.stripMargin ) - checkCreateTable("ddl_test1") + checkCreateTable("ddl_test") } } test("data source table CTAS") { - withTable("ddl_test2") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test2 + s"""CREATE TABLE ddl_test |USING json |AS SELECT 1 AS a, "foo" AS b """.stripMargin ) - checkCreateTable("ddl_test2") + checkCreateTable("ddl_test") } } test("partitioned data source table") { - withTable("ddl_test3") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test3 + s"""CREATE TABLE ddl_test |USING json |PARTITIONED BY (b) |AS SELECT 1 AS a, "foo" AS b """.stripMargin ) - checkCreateTable("ddl_test3") + checkCreateTable("ddl_test") } } test("bucketed data source table") { - withTable("ddl_test3") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test3 + s"""CREATE TABLE ddl_test |USING json |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS |AS SELECT 1 AS a, "foo" AS b """.stripMargin ) - checkCreateTable("ddl_test3") + checkCreateTable("ddl_test") } } test("partitioned bucketed data source table") { - withTable("ddl_test4") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test4 + s"""CREATE TABLE ddl_test |USING json |PARTITIONED BY (c) |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS @@ -101,12 +101,12 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing """.stripMargin ) - checkCreateTable("ddl_test4") + checkCreateTable("ddl_test") } } test("data source table using Dataset API") { - withTable("ddl_test5") { + withTable("ddl_test") { spark .range(3) .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e) @@ -114,9 +114,9 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing .mode("overwrite") .partitionBy("a", "b") .bucketBy(2, "c", "d") - .saveAsTable("ddl_test5") + .saveAsTable("ddl_test") - checkCreateTable("ddl_test5") + checkCreateTable("ddl_test") } }