Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class AnalysisException protected[sql] (
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None,
val plan: Option[LogicalPlan] = None)
extends Exception with Serializable {
val plan: Option[LogicalPlan] = None,
val cause: Option[Throwable] = None)
extends Exception(message, cause.orNull) with Serializable {

def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
val newException = new AnalysisException(message, line, startPosition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ class SparkSession private(
}


/* ------------------------ *
| Catalog-related methods |
* ----------------- ------ */
/* ------------------------- *
| Catalog-related methods |
* ------------------------- */

/**
* Interface through which the user may create, drop, alter or query underlying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ case class CreateDataSourceTableCommand(
/**
* A command used to create a data source table using the result of a query.
*
* Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference.
* This is not intended for temporary tables.
* Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for
* difference. This is not intended for temporary tables.
*
* The syntax of using this command in SQL is:
* {{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,3 @@ object HiveSerDe {
serdeMap.get(key)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException

Expand All @@ -35,7 +37,9 @@ import org.apache.spark.sql.hive.client.HiveClient
* A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety.
*/
private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging {
private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration)
extends ExternalCatalog with Logging {

import CatalogTypes.TablePartitionSpec

// Exceptions thrown by the hive client that we would like to wrap
Expand Down Expand Up @@ -68,7 +72,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
body
} catch {
case NonFatal(e) if isClientException(e) =>
throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserve the original exception so that we can see Hive internal stack trace.

}
}

Expand Down Expand Up @@ -147,7 +152,41 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
ignoreIfExists: Boolean): Unit = withClient {
requireDbExists(db)
requireDbMatches(db, tableDefinition)
client.createTable(tableDefinition, ignoreIfExists)

if (
// If this is an external data source table...
tableDefinition.properties.contains("spark.sql.sources.provider") &&
tableDefinition.tableType == CatalogTableType.EXTERNAL &&
// ... that is not persisted as Hive compatible format (external tables in Hive compatible
// format always set `locationUri` to the actual data location and should NOT be hacked as
// following.)
tableDefinition.storage.locationUri.isEmpty
) {
// !! HACK ALERT !!
//
// Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary
// directory that doesn't exist yet but can definitely be successfully created, and then
// delete it right after creating the external data source table. This location will be
// persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't
// really use it. Also, since we only do this workaround for external tables, deleting the
// directory after the fact doesn't do any harm.
//
// Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details.
val tempPath = {
val dbLocation = getDatabase(tableDefinition.database).locationUri
new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__")
}

try {
client.createTable(
tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this location is stored in the metastore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

ignoreIfExists)
} finally {
FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true)
}
} else {
client.createTable(tableDefinition, ignoreIfExists)
}
}

override def dropTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
/**
* A catalog that interacts with the Hive metastore.
*/
override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)

override lazy val externalCatalog =
new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration)
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand Down Expand Up @@ -323,7 +324,7 @@ private[hive] class HiveClientImpl(
}

override def listDatabases(pattern: String): Seq[String] = withHiveState {
client.getDatabasesByPattern(pattern).asScala.toSeq
client.getDatabasesByPattern(pattern).asScala
}

override def getTableOption(
Expand Down Expand Up @@ -351,6 +352,8 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "bucketing"
}

val properties = h.getParameters.asScala.toMap

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
Expand All @@ -368,14 +371,27 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
locationUri = shim.getDataLocation(h),
locationUri = shim.getDataLocation(h).filterNot { _ =>
// SPARK-15269: Persisted data source tables always store the location URI as a SerDe
// property named "path" instead of standard Hive `dataLocation`, because Hive only
// allows directory paths as location URIs while Spark SQL data source tables also
// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
// data source tables.
DDLUtils.isDatasourceTable(properties) &&
h.getTableType == HiveTableType.EXTERNAL_TABLE &&
// Spark SQL may also save external data source in Hive compatible format when
// possible, so that these tables can be directly accessed by Hive. For these tables,
// `dataLocation` is still necessary. Here we also check for input format class
// because only these Hive compatible tables set this field.
h.getInputFormatClass == null
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have to store the placeholder location URI into metastore for external data source tables, and I'd like to avoid exposing it to user space.

Copy link
Contributor Author

@liancheng liancheng May 31, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, it doesn't harm to keep this field since Spark SQL doesn't use it. But this placeholder location URI doesn't make sense anywhere, and it can be error prone to keep it in the locationUri field since future maintainers may think it is the real data location.

inputFormat = Option(h.getInputFormatClass).map(_.getName),
outputFormat = Option(h.getOutputFormatClass).map(_.getName),
serde = Option(h.getSerializationLib),
compressed = h.getTTable.getSd.isCompressed,
serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
),
properties = h.getParameters.asScala.toMap,
properties = properties,
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
unsupportedFeatures = unsupportedFeatures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
override def newEmptyCatalog(): ExternalCatalog = new HiveExternalCatalog(client)
override def newEmptyCatalog(): ExternalCatalog =
new HiveExternalCatalog(client, new Configuration())
}

protected override def resetState(): Unit = client.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,4 +1104,17 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
}

test("SPARK-15269 external data source table creation") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(1).write.json(path)

withTable("t") {
sql(s"CREATE TABLE t USING json OPTIONS (PATH '$path')")
sql("DROP TABLE t")
sql(s"CREATE TABLE t USING json AS SELECT 1 AS c")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
import testImplicits._

test("data source table with user specified schema") {
withTable("ddl_test1") {
withTable("ddl_test") {
Copy link
Contributor Author

@liancheng liancheng May 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I firstly noticed this bug while writing tests in this test suite. I found that test cases always fail if I use the same table name in multiple test cases. That's why I made the changes to this file as additional tests.

val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile

sql(
s"""CREATE TABLE ddl_test1 (
s"""CREATE TABLE ddl_test (
| a STRING,
| b STRING,
| `extra col` ARRAY<INT>,
Expand All @@ -45,78 +45,78 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
""".stripMargin
)

checkCreateTable("ddl_test1")
checkCreateTable("ddl_test")
}
}

test("data source table CTAS") {
withTable("ddl_test2") {
withTable("ddl_test") {
sql(
s"""CREATE TABLE ddl_test2
s"""CREATE TABLE ddl_test
|USING json
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin
)

checkCreateTable("ddl_test2")
checkCreateTable("ddl_test")
}
}

test("partitioned data source table") {
withTable("ddl_test3") {
withTable("ddl_test") {
sql(
s"""CREATE TABLE ddl_test3
s"""CREATE TABLE ddl_test
|USING json
|PARTITIONED BY (b)
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin
)

checkCreateTable("ddl_test3")
checkCreateTable("ddl_test")
}
}

test("bucketed data source table") {
withTable("ddl_test3") {
withTable("ddl_test") {
sql(
s"""CREATE TABLE ddl_test3
s"""CREATE TABLE ddl_test
|USING json
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin
)

checkCreateTable("ddl_test3")
checkCreateTable("ddl_test")
}
}

test("partitioned bucketed data source table") {
withTable("ddl_test4") {
withTable("ddl_test") {
sql(
s"""CREATE TABLE ddl_test4
s"""CREATE TABLE ddl_test
|USING json
|PARTITIONED BY (c)
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|AS SELECT 1 AS a, "foo" AS b, 2.5 AS c
""".stripMargin
)

checkCreateTable("ddl_test4")
checkCreateTable("ddl_test")
}
}

test("data source table using Dataset API") {
withTable("ddl_test5") {
withTable("ddl_test") {
spark
.range(3)
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e)
.write
.mode("overwrite")
.partitionBy("a", "b")
.bucketBy(2, "c", "d")
.saveAsTable("ddl_test5")
.saveAsTable("ddl_test")

checkCreateTable("ddl_test5")
checkCreateTable("ddl_test")
}
}

Expand Down