Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Clean up HDFSMetadataLog for Hadoop 2.6
  • Loading branch information
zsxwing committed Feb 14, 2017
commit 81bcc48e0c608a2c98369c7598d5040d3b39197e
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
}


testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
test("OffsetSeqLog serialization - deserialization") {
withTempDir { temp =>
// use non-existent directory to test whether log make the dir
val dir = new File(temp, "dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,43 +109,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
if (fileManager.isLocalFileSystem) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
// while writing the batch file. This is because there is a potential dead-lock in
// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
// the file permission if using the local file system, and can get deadlocked if the
// stream execution thread is stopped by interrupt. Hence, we make sure that
// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
// interrupts here. Also see SPARK-14131.
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
"a o.a.spark.util.UninterruptibleThread")
}
} else {
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
writeBatch(batchId, metadata, serialize)
}
writeBatch(batchId, metadata)
true
}
}

def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
var nextId = 0
def writeTempBatch(metadata: T): Option[Path] = {
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
try {
val output = fileManager.create(tempPath)
try {
writer(metadata, output)
serialize(metadata, output)
return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
Expand All @@ -164,7 +139,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// big problem because it requires the attacker must have the permission to write the
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
}
}
None
Expand All @@ -176,8 +150,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
val tempPath = writeTempBatch(metadata, writer).getOrElse(
private def writeBatch(batchId: Long, metadata: T): Unit = {
val tempPath = writeTempBatch(metadata).getOrElse(
throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
try {
// Try to commit the batch
Expand All @@ -195,12 +169,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
} finally {
fileManager.delete(tempPath)
}
Expand Down Expand Up @@ -332,9 +300,6 @@ object HDFSMetadataLog {

/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
def delete(path: Path): Unit

/** Whether the file systme is a local FS. */
def isLocalFileSystem: Boolean
}

/**
Expand Down Expand Up @@ -379,13 +344,6 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}

override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
case _: local.LocalFs | _: local.RawLocalFs =>
// LocalFs = RawLocalFs + ChecksumFs
true
case _ => false
}
}

/**
Expand Down Expand Up @@ -442,12 +400,5 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}

override def isLocalFileSystem: Boolean = fs match {
case _: LocalFileSystem | _: RawLocalFileSystem =>
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ class StreamExecution(

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential endless loop in
* `KafkaConsumer`. See KAFKA-1894 for more details.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
})
}

testWithUninterruptibleThread("compact") {
test("compact") {
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
defaultCompactInterval = 3,
Expand All @@ -174,7 +174,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
})
}

testWithUninterruptibleThread("delete expired file") {
test("delete expired file") {
// Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("compact") {
test("compact") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
for (batchId <- 0 to 10) {
Expand All @@ -149,7 +149,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("delete expired file") {
test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: basic") {
test("HDFSMetadataLog: basic") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
Expand All @@ -82,8 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread(
"HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
spark.conf.set(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
Expand All @@ -103,7 +102,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: purge") {
test("HDFSMetadataLog: purge") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand All @@ -128,7 +127,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: restart") {
test("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
}

testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") {
test("OffsetSeqLog - serialization - deserialization") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(map.isNewFile("b", 10))
}

testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
test("do not recheck that files exist during getBatch") {
withTempDir { temp =>
spark.conf.set(
s"fs.$scheme.impl",
Expand Down