Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
mv the logic to makeRDDForTable
  • Loading branch information
windpiger committed Feb 21, 2017
commit 4493a8f96320720e82dd8a66f61a3b4ebf920116
38 changes: 23 additions & 15 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,30 @@ class HadoopTableReader(
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)

// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing sparkSession.sessionState.newHadoopConf() by broadcastedHadoopConf.value.value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~


// if the table location is not exists, return an empty RDD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not exists -> does not exist

if (!fs.exists(locationPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, how about hive serde tables with storage handler? their data is stored in somewhere else(maybe in hbase) and the table path is always non-existing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! Thanks!
I test it in Hive, when the table created by stored by(e.g. HBase), even if there is a table path created under warehouse path when we create the table, but there is no data files exist after we insert into the table, and it is ok to select data after we delete the table path.

new EmptyRDD[InternalRow](sparkSession.sparkContext)
} else {
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)

deserializedHadoopRDD
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
}
}

override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,6 @@ case class HiveTableScanExec(
}

protected override def doExecute(): RDD[InternalRow] = {
val locationPath = new Path(relation.catalogTable.location)
val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf())

// if the table location is not exists, return an empty RDD
if (!fs.exists(locationPath)) {
return new EmptyRDD[InternalRow](sparkSession.sparkContext)
}
// Using dummyCallSite, as getCallSite can turn out to be expensive with
// with multiple partitions.
val rdd = if (!relation.hiveQlTable.isPartitioned) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1501,9 +1501,9 @@ class HiveDDLSuite
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getCanonicalPath}")
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getCanonicalPath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
Expand All @@ -1522,13 +1522,12 @@ class HiveDDLSuite
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/"))
assert(!newDirFile.exists())

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
Expand All @@ -1543,10 +1542,10 @@ class HiveDDLSuite
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
Expand All @@ -1563,9 +1562,9 @@ class HiveDDLSuite
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'")
val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " +
s"'${newDirFile.getAbsolutePath}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorten it to a single line?

assert(!newDirFile.exists())

// insert into a partition which location does not exists.
Expand All @@ -1581,9 +1580,9 @@ class HiveDDLSuite
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getAbsolutePath}")
|CREATE TABLE t(a string, b int)
|USING hive
|OPTIONS(path "file:${dir.getAbsolutePath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
Expand All @@ -1592,12 +1591,12 @@ class HiveDDLSuite
dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!new File(newDir).exists())
assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/"))
assert(!newDirFile.exists())
checkAnswer(spark.table("t"), Nil)
}
}
Expand All @@ -1608,19 +1607,19 @@ class HiveDDLSuite
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless?


spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'")
val newDirFile = new File(dir, "x")
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " +
s"'${newDirFile.getAbsolutePath}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorten it to a single line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

101 characters...
let me modify some code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, 101 is still ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e...is't it 100? let me test it...
I have modify some code to make it moer clear

assert(!newDirFile.exists())
// select from a partition which location has changed to a not existed location
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why setting this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't set it,it will throw an exception,if we set it,it will check if the partition path exists,and will not throw exception just return emptyrdd even if path not existed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected? I think hive will always return empty result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this conf will be removed soon, as it has bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~thanks~ then here we also need to modify something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hive return empty , if there is a bug here(could you describe what the bug is?), we can remove the conf ,and always return result?

Expand Down