Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{EmptyRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -139,6 +140,13 @@ case class HiveTableScanExec(
}

protected override def doExecute(): RDD[InternalRow] = {
val locationPath = new Path(relation.catalogTable.location)
val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf())

// if the table location is not exists, return an empty RDD
if (!fs.exists(locationPath)) {
return new EmptyRDD[InternalRow](sparkSession.sparkContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do it in makeRDDForTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do it here for both non-partition table and partition table,while the partition table run well when the location does not exist with verifyPartitionPath set true,If we also want to run well when verifyPartitionPath set false, we should also do it in makeRDDForPartitionTable, then under this situation I do it here for both non-partition table and partition table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not reasonable to do this for partition table, because the real partition path maybe not under the location of the partition table, I moved this logic to makeRDDForTable

}
// Using dummyCallSite, as getCallSite can turn out to be expensive with
// with multiple partitions.
val rdd = if (!relation.hiveQlTable.isPartitioned) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
Expand Down Expand Up @@ -1494,4 +1495,149 @@ class HiveDDLSuite
}
}
}

test("insert data to a hive serde table which has a not existed location should succeed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: non-existing location

withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getCanonicalPath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to Path and compare please


val tableLocFile = new File(table.location.stripPrefix("file:"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new File(new URI(table.location))? please avoid .stripPrefix("file:") which looks very hacky.

tableLocFile.delete()
assert(!tableLocFile.exists())
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists())
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!newDirFile.exists())

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
assert(newDirFile.exists())
}
}
}

test("insert into a hive serde table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'")
assert(!newDirFile.exists())

// insert into a partition which location does not exists.
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10")
assert(newDirFile.exists())
checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil)
}
}
}

test("read data from a hive serde table which has a not existed location should succeed") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test case failed without this fix. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is~

withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getAbsolutePath}")
Copy link
Member

@gatorsmile gatorsmile Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent issues.

""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> new File(dir, "x")

spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!new File(newDir).exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a hive serde table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless?


spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'")
assert(!newDirFile.exists())
// select from a partition which location has changed to a not existed location
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why setting this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't set it,it will throw an exception,if we set it,it will check if the partition path exists,and will not throw exception just return emptyrdd even if path not existed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected? I think hive will always return empty result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this conf will be removed soon, as it has bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~thanks~ then here we also need to modify something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hive return empty , if there is a bug here(could you describe what the bug is?), we can remove the conf ,and always return result?

checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6")
checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil)
assert(newDirFile.exists())

// select from a partition which location has been deleted.
Utils.deleteRecursively(newDirFile)
assert(!newDirFile.exists())
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}
}