Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,80 +33,53 @@ import org.apache.spark.util.Utils
class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import spark.implicits._

test("SPARK-5068: query data when path doesn't exist") {
withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) {
val testData = sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString))).toDF()
testData.createOrReplaceTempView("testData")

val tmpDir = Files.createTempDir()
// create the table for test
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
"SELECT key,value FROM testData")

// test for the exist path
checkAnswer(sql("select key,value from table_with_partition"),
testData.toDF.collect ++ testData.toDF.collect
++ testData.toDF.collect ++ testData.toDF.collect)

// delete the path of one partition
tmpDir.listFiles
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
.foreach { f => Utils.deleteRecursively(f) }

// test for after delete the path
checkAnswer(sql("select key,value from table_with_partition"),
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
private def queryWhenPathNotExist(): Unit = {
withTempView("testData") {
withTable("table_with_partition", "createAndInsertTest") {
withTempDir { tmpDir =>
val testData = sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString))).toDF()
testData.createOrReplaceTempView("testData")

// create the table for test
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
"SELECT key,value FROM testData")

// test for the exist path
checkAnswer(sql("select key,value from table_with_partition"),
testData.union(testData).union(testData).union(testData))

// delete the path of one partition
tmpDir.listFiles
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
.foreach { f => Utils.deleteRecursively(f) }

// test for after delete the path
checkAnswer(sql("select key,value from table_with_partition"),
testData.union(testData).union(testData))
}
}
}
}

sql("DROP TABLE IF EXISTS table_with_partition")
sql("DROP TABLE IF EXISTS createAndInsertTest")
test("SPARK-5068: query data when path doesn't exist") {
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
queryWhenPathNotExist()
}
}

test("Replace spark.sql.hive.verifyPartitionPath by spark.files.ignoreMissingFiles") {
withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "false")) {
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false") {
sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
val testData = sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString))).toDF()
testData.createOrReplaceTempView("testData")

val tmpDir = Files.createTempDir()
// create the table for test
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
"SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
"SELECT key,value FROM testData")

// test for the exist path
checkAnswer(sql("select key,value from table_with_partition"),
testData.toDF.collect ++ testData.toDF.collect
++ testData.toDF.collect ++ testData.toDF.collect)

// delete the path of one partition
tmpDir.listFiles
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
.foreach { f => Utils.deleteRecursively(f) }

// test for after delete the path
checkAnswer(sql("select key,value from table_with_partition"),
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)

sql("DROP TABLE IF EXISTS table_with_partition")
sql("DROP TABLE IF EXISTS createAndInsertTest")
queryWhenPathNotExist()
}
}

Expand Down