Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.execution

import java.io.IOException
import java.io.{File, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
Expand Down Expand Up @@ -97,12 +97,24 @@ case class InsertIntoHiveTable(
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
val stagingPathName: String =
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}

// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
stagingPathName = new Path(inputPathName, ".hive-staging").toString
}

val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,34 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
spark.table("t").write.insertInto(tableName)
}
}

test(
"""SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
|staging directory needs to avoid being deleted when users set hive.exec.stagingdir
|under the table directory.""".stripMargin) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without your fix, this test case still passes. Could you please check it?

Copy link
Author

@zuotingbing zuotingbing May 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, as i tested again, we must create table rather than simplify by spark.range(1).write.saveAsTable("test_table"). Thanks again. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, because that is not to create a Hive table. How about simplifying the test case to?

  test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
    // Set hive.exec.stagingdir under the table directory without start with ".".
    withSQLConf("hive.exec.stagingdir" -> "./test") {
      withTable("test_table") {
        sql("CREATE TABLE test_table (key int)")
        sql("INSERT OVERWRITE TABLE test_table SELECT 1")
        checkAnswer(sql("SELECT * FROM test_table"), Row(1))
      }
    }
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good choice. thanks for your time!


withTable("test_table") {
spark.range(1).write.saveAsTable("test_table")

// Make sure the table has also been updated.
checkAnswer(
sql("SELECT * FROM test_table"),
Row(0)
)

// Set hive.exec.stagingdir under the table directory without start with ".".
sql("set hive.exec.stagingdir=./test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you set it back after this test case.


// Now overwrite.
sql("INSERT OVERWRITE TABLE test_table SELECT 1")

// Make sure the table has also been updated.
checkAnswer(
sql("SELECT * FROM test_table"),
Row(1)
)

sql("reset")
}
}
}