Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class HadoopTableReader(
private val _broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

private lazy val STORAGE_HANDLER_KEY = "storage_handler"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed.


override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
hiveTable,
Expand Down Expand Up @@ -114,22 +116,34 @@ class HadoopTableReader(
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)

// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value)

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
// if the location of the table which is not created by 'stored by' does not exist,
// return an empty RDD
// TODO: after SparkSQL supports 'stored by', we should check if this condition
// is still proper.
val storageHandler = hiveTable.getParameters.getOrDefault(STORAGE_HANDLER_KEY, null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace STORAGE_HANDLER_KEY by META_TABLE_STORAGE, which has been defined in hive_metastoreConstants

if (storageHandler == null && !fs.exists(locationPath)) {
new EmptyRDD[InternalRow](sparkSession.sparkContext)
} else {
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
deserializedHadoopRDD
}
}

override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
Expand Down Expand Up @@ -1494,4 +1495,151 @@ class HiveDDLSuite
}
}
}

test("insert data to a hive serde table which has a not existed location should succeed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: non-existing location

withTable("t") {
withTempDir { dir =>
val dirPath = dir.getAbsolutePath.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:$dirPath")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use LOCATION ..., and I think we don't need the file prefix

""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:$dirPath"
assert(table.location.stripSuffix("/") == expectedPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to Path and compare please


val tableLocFile = new File(table.location.stripPrefix("file:"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new File(new URI(table.location))? please avoid .stripPrefix("file:") which looks very hacky.

tableLocFile.delete()
assert(!tableLocFile.exists())
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists())
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.stripSuffix("/") is it needed?

spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location.stripSuffix("/") == newDirPath)
assert(!newDirFile.exists())

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
assert(newDirFile.exists())
}
}
}

test("insert into a hive serde table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
val dirPath = dir.getAbsolutePath.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:$dirPath"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:$dirPath"
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(s"$dirPath/a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'")
assert(!newDirFile.exists())

// insert into a partition which location does not exists.
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10")
assert(newDirFile.exists())
checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil)
}
}
}

test("read data from a hive serde table which has a not existed location should succeed") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test case failed without this fix. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is~

withTable("t") {
withTempDir { dir =>
val dirPath = dir.getAbsolutePath.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getAbsolutePath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:$dirPath"
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/")
spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location.stripSuffix("/") == newDirPath)
assert(!newDirFile.exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a hive serde table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'")
assert(!newDirFile.exists())
// select from a partition which location has changed to a not existed location
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why setting this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't set it,it will throw an exception,if we set it,it will check if the partition path exists,and will not throw exception just return emptyrdd even if path not existed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected? I think hive will always return empty result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this conf will be removed soon, as it has bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~thanks~ then here we also need to modify something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hive return empty , if there is a bug here(could you describe what the bug is?), we can remove the conf ,and always return result?

checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6")
checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil)
assert(newDirFile.exists())

// select from a partition which location has been deleted.
Utils.deleteRecursively(newDirFile)
assert(!newDirFile.exists())
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}
}