Skip to content

Commit 81bcc48

Browse files
committed
Clean up HDFSMetadataLog for Hadoop 2.6
1 parent e02ac30 commit 81bcc48

File tree

8 files changed

+18
-68
lines changed

8 files changed

+18
-68
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
5555
}
5656

5757

58-
testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
58+
test("OffsetSeqLog serialization - deserialization") {
5959
withTempDir { temp =>
6060
// use non-existent directory to test whether log make the dir
6161
val dir = new File(temp, "dir")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -109,43 +109,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
109109
override def add(batchId: Long, metadata: T): Boolean = {
110110
get(batchId).map(_ => false).getOrElse {
111111
// Only write metadata when the batch has not yet been written
112-
if (fileManager.isLocalFileSystem) {
113-
Thread.currentThread match {
114-
case ut: UninterruptibleThread =>
115-
// When using a local file system, "writeBatch" must be called on a
116-
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
117-
// while writing the batch file. This is because there is a potential dead-lock in
118-
// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
119-
// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
120-
// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
121-
// the file permission if using the local file system, and can get deadlocked if the
122-
// stream execution thread is stopped by interrupt. Hence, we make sure that
123-
// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
124-
// interrupts here. Also see SPARK-14131.
125-
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
126-
case _ =>
127-
throw new IllegalStateException(
128-
"HDFSMetadataLog.add() on a local file system must be executed on " +
129-
"a o.a.spark.util.UninterruptibleThread")
130-
}
131-
} else {
132-
// For a distributed file system, such as HDFS or S3, if the network is broken, write
133-
// operations may just hang until timeout. We should enable interrupts to allow stopping
134-
// the query fast.
135-
writeBatch(batchId, metadata, serialize)
136-
}
112+
writeBatch(batchId, metadata)
137113
true
138114
}
139115
}
140116

141-
def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
142-
var nextId = 0
117+
def writeTempBatch(metadata: T): Option[Path] = {
143118
while (true) {
144119
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
145120
try {
146121
val output = fileManager.create(tempPath)
147122
try {
148-
writer(metadata, output)
123+
serialize(metadata, output)
149124
return Some(tempPath)
150125
} finally {
151126
IOUtils.closeQuietly(output)
@@ -164,7 +139,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
164139
// big problem because it requires the attacker must have the permission to write the
165140
// metadata path. In addition, the old Streaming also have this issue, people can create
166141
// malicious checkpoint files to crash a Streaming application too.
167-
nextId += 1
168142
}
169143
}
170144
None
@@ -176,8 +150,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
176150
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
177151
* valid behavior, we still need to prevent it from destroying the files.
178152
*/
179-
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
180-
val tempPath = writeTempBatch(metadata, writer).getOrElse(
153+
private def writeBatch(batchId: Long, metadata: T): Unit = {
154+
val tempPath = writeTempBatch(metadata).getOrElse(
181155
throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
182156
try {
183157
// Try to commit the batch
@@ -195,12 +169,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
195169
// So throw an exception to tell the user this is not a valid behavior.
196170
throw new ConcurrentModificationException(
197171
s"Multiple HDFSMetadataLog are using $path", e)
198-
case e: FileNotFoundException =>
199-
// Sometimes, "create" will succeed when multiple writers are calling it at the same
200-
// time. However, only one writer can call "rename" successfully, others will get
201-
// FileNotFoundException because the first writer has removed it.
202-
throw new ConcurrentModificationException(
203-
s"Multiple HDFSMetadataLog are using $path", e)
204172
} finally {
205173
fileManager.delete(tempPath)
206174
}
@@ -332,9 +300,6 @@ object HDFSMetadataLog {
332300

333301
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
334302
def delete(path: Path): Unit
335-
336-
/** Whether the file systme is a local FS. */
337-
def isLocalFileSystem: Boolean
338303
}
339304

340305
/**
@@ -379,13 +344,6 @@ object HDFSMetadataLog {
379344
// ignore if file has already been deleted
380345
}
381346
}
382-
383-
override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
384-
case _: local.LocalFs | _: local.RawLocalFs =>
385-
// LocalFs = RawLocalFs + ChecksumFs
386-
true
387-
case _ => false
388-
}
389347
}
390348

391349
/**
@@ -442,12 +400,5 @@ object HDFSMetadataLog {
442400
// ignore if file has already been deleted
443401
}
444402
}
445-
446-
override def isLocalFileSystem: Boolean = fs match {
447-
case _: LocalFileSystem | _: RawLocalFileSystem =>
448-
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
449-
true
450-
case _ => false
451-
}
452403
}
453404
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ class StreamExecution(
179179

180180
/**
181181
* The thread that runs the micro-batches of this stream. Note that this thread must be
182-
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
183-
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
182+
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential endless loop in
183+
* `KafkaConsumer`. See KAFKA-1894 for more details.
184184
*/
185185
val microBatchThread =
186186
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
156156
})
157157
}
158158

159-
testWithUninterruptibleThread("compact") {
159+
test("compact") {
160160
withFakeCompactibleFileStreamLog(
161161
fileCleanupDelayMs = Long.MaxValue,
162162
defaultCompactInterval = 3,
@@ -174,7 +174,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
174174
})
175175
}
176176

177-
testWithUninterruptibleThread("delete expired file") {
177+
test("delete expired file") {
178178
// Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically
179179
withFakeCompactibleFileStreamLog(
180180
fileCleanupDelayMs = 0,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
129129
}
130130
}
131131

132-
testWithUninterruptibleThread("compact") {
132+
test("compact") {
133133
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
134134
withFileStreamSinkLog { sinkLog =>
135135
for (batchId <- 0 to 10) {
@@ -149,7 +149,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
149149
}
150150
}
151151

152-
testWithUninterruptibleThread("delete expired file") {
152+
test("delete expired file") {
153153
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
154154
// deterministically and one min batches to retain
155155
withSQLConf(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
5757
}
5858
}
5959

60-
testWithUninterruptibleThread("HDFSMetadataLog: basic") {
60+
test("HDFSMetadataLog: basic") {
6161
withTempDir { temp =>
6262
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
6363
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
@@ -82,8 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
8282
}
8383
}
8484

85-
testWithUninterruptibleThread(
86-
"HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
85+
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
8786
spark.conf.set(
8887
s"fs.$scheme.impl",
8988
classOf[FakeFileSystem].getName)
@@ -103,7 +102,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
103102
}
104103
}
105104

106-
testWithUninterruptibleThread("HDFSMetadataLog: purge") {
105+
test("HDFSMetadataLog: purge") {
107106
withTempDir { temp =>
108107
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
109108
assert(metadataLog.add(0, "batch0"))
@@ -128,7 +127,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
128127
}
129128
}
130129

131-
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
130+
test("HDFSMetadataLog: restart") {
132131
withTempDir { temp =>
133132
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
134133
assert(metadataLog.add(0, "batch0"))

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
3636
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
3737
}
3838

39-
testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") {
39+
test("OffsetSeqLog - serialization - deserialization") {
4040
withTempDir { temp =>
4141
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
4242
val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
11741174
assert(map.isNewFile("b", 10))
11751175
}
11761176

1177-
testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
1177+
test("do not recheck that files exist during getBatch") {
11781178
withTempDir { temp =>
11791179
spark.conf.set(
11801180
s"fs.$scheme.impl",

0 commit comments

Comments
 (0)