Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.io.IOException

import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
Expand Down Expand Up @@ -222,4 +223,115 @@ class HiveParquetSourceSuite extends ParquetPartitioningTest {
assert(df4.columns === Array("str", "max_int"))
}
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Seq("true", "false").foreach { parquetConversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
withTempPath { path =>
withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") {
val someDF1 = Seq((1, 1, "parq1"), (2, 2, "parq2")).
toDF("c1", "c2", "c3").repartition(1)
val someDF2 = Seq((3, 3, "parq3"), (4, 4, "parq4")).
toDF("c1", "c2", "c3").repartition(1)
val someDF3 = Seq((5, 5, "parq5"), (6, 6, "parq6")).
toDF("c1", "c2", "c3").repartition(1)
someDF1.write.parquet(s"${path.getCanonicalPath}/l1/")
someDF2.write.parquet(s"${path.getCanonicalPath}/l1/l2/")
someDF3.write.parquet(s"${path.getCanonicalPath}/l1/l2/l3/")

val topDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}"}'""".stripMargin
sql(topDirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl1"), Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we capitalize the SQL statement like SELECT * FROM tbl1?

} else {
intercept[IOException](sql("select * from tbl1").show())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had better check the exception message.

}

val l1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/"}'""".stripMargin
sql(l1DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl2"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT * FROM tbl2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

(1 to 2).map(i => Row(i, i, s"parq$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge 269 and 270 into one line here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

} else {
intercept[IOException](sql("select * from tbl2").show())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the exception message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole exception message is
Not a file: file:/Users/qianyangyu/IdeaProjects/spark/target/tmp/spark-abc8c1ad-4a3a-420f-b4fc-58d995be9bb0/l1, I will check the first part Not a file:.

}

val l2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(l2DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl3"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT * FROM tbl3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

(3 to 4).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl3").show())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the exception message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the checking

}

val wildcardTopDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl4(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/*").toURI}'""".stripMargin
sql(wildcardTopDirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl4"),
(1 to 2).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl4").show())
}

val wildcardL1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl5(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/l1/*").toURI}'""".stripMargin
sql(wildcardL1DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl5"),
(1 to 4).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl5").show())
}

val wildcardL2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl6(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/l1/l2/*").toURI}'""".stripMargin
sql(wildcardL2DirStatement)
checkAnswer(sql("select * from tbl6"),
(3 to 6).map(i => Row(i, i, s"parq$i")))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,155 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
test("SPARK-11412 read and merge orc schemas in parallel") {
testMergeSchemasInParallel(OrcFileOperator.readOrcSchemasInParallel)
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Seq(true, false).foreach { convertMetastore =>
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> s"$convertMetastore") {
withTempDir { dir =>
try {
hiveClient.runSqlHive("USE default")
hiveClient.runSqlHive(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use runSqlHive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I will change to sql.

"""
|CREATE EXTERNAL TABLE hive_orc(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here.
@kevinyu98 . Do you want to get a table created by Hive here?
Usually, we use the table name, hive_orc, for that table. Please see https://github.com/apache/spark/pull/27130/files#diff-a8c26a35def87a13e6b59db19d9fb8a1R68 .

And, you still using hiveClient.runSqlHive at line 192. I'm wondering what is the test target in this PR~.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks for pointing out this. I was using other test cases without thinking too much. I have changed the name. I also replaced the hiveClient.runSqlHive for the insert stmt.

| C1 INT,
| C2 INT,
| C3 STRING)
|STORED AS orc""".stripMargin)
// Hive throws an exception if I assign the location in the create table statement.
hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION " +
s"'${new File(s"${dir.getCanonicalPath}/l1/").toURI}'")
hiveClient.runSqlHive(
"""
|INSERT INTO TABLE hive_orc
|VALUES (1, 1, 'orc1'), (2, 2, 'orc2')""".stripMargin)

hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION " +
s"'${new File(s"${dir.getCanonicalPath}/l1/l2/").toURI}'")
hiveClient.runSqlHive(
"""
|INSERT INTO TABLE hive_orc
|VALUES (3, 3, 'orc3'), (4, 4, 'orc4')""".stripMargin)

hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION " +
s"'${new File(s"${dir.getCanonicalPath}/l1/l2/l3/").toURI}'")
hiveClient.runSqlHive(
"""
|INSERT INTO TABLE hive_orc
|VALUES (5, 5, 'orc5'), (6, 6, 'orc6')""".stripMargin)

withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") {
val topDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}"}'""".stripMargin
sql(topDirStatement)
val topDirSqlStatement = s"select * from tbl1"
if (convertMetastore) {
checkAnswer(sql(topDirSqlStatement), Nil)
} else {
checkAnswer(sql(topDirSqlStatement),
(1 to 6).map(i => Row(i, i, s"orc$i")))
}

val l1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/"}'""".stripMargin
sql(l1DirStatement)
val l1DirSqlStatement = s"select * from tbl2"
if (convertMetastore) {
checkAnswer(sql(l1DirSqlStatement),
(1 to 2).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make line 250 and 251 as one line like HiveParquetSourceSuite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

} else {
checkAnswer(sql(l1DirSqlStatement),
(1 to 6).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

val l2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(l2DirStatement)
val l2DirSqlStatement = s"select * from tbl3"
if (convertMetastore) {
checkAnswer(sql(l2DirSqlStatement),
(3 to 4).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

} else {
checkAnswer(sql(l2DirSqlStatement),
(3 to 6).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

val wildcardTopDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl4(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/*").toURI}'""".stripMargin
sql(wildcardTopDirStatement)
val wildcardTopDirSqlStatement = s"select * from tbl4"
if (convertMetastore) {
checkAnswer(sql(wildcardTopDirSqlStatement),
(1 to 2).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

} else {
checkAnswer(sql(wildcardTopDirSqlStatement), Nil)
}

val wildcardL1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl5(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/l1/*").toURI}'""".stripMargin
sql(wildcardL1DirStatement)
val wildcardL1DirSqlStatement = s"select * from tbl5"
if (convertMetastore) {
checkAnswer(sql(wildcardL1DirSqlStatement),
(1 to 4).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

} else {
checkAnswer(sql(wildcardL1DirSqlStatement), Nil)
}

val wildcardL2Statement =
s"""
|CREATE EXTERNAL TABLE tbl6(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/l1/l2/*").toURI}'""".stripMargin
sql(wildcardL2Statement)
val wildcardL2SqlStatement = s"select * from tbl6"
if (convertMetastore) {
checkAnswer(sql(wildcardL2SqlStatement),
(3 to 6).map(i => Row(i, i, s"orc$i")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

} else {
checkAnswer(sql(wildcardL2SqlStatement), Nil)
}
}
} finally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we convert this try..finally with withTable like HiveParquetSourceSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
}
}
}
}
}
}