Skip to content
Prev Previous commit
Next Next commit
[SPARK-9844] Added unit test for closing FileAppender InputStream asy…
…nchronously
  • Loading branch information
BryanCutler committed Jan 12, 2016
commit f937e183d466d014f1545d823ecdf30f9becdf77
77 changes: 77 additions & 0 deletions core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.spark.util

import java.io._
import java.util.concurrent.CountDownLatch

import scala.collection.mutable.HashSet
import scala.reflect._

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.log4j.{Level, Appender, Logger}
import org.apache.log4j.spi.LoggingEvent
import org.mockito.ArgumentCaptor
import org.mockito.Mockito.{atLeast, mock, verify}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -188,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
}

test("file appender async close stream abruptly") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]

// Make sure only logging errors
val logger = Logger.getRootLogger
logger.setLevel(Level.ERROR)
logger.addAppender(mockAppender)

val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream)

// Close the stream before appender tries to read will cause an IOException
testInputStream.close()
testOutputStream.close()
val appender = FileAppender(testInputStream, testFile, new SparkConf)

appender.awaitTermination()

// If InputStream was closed without first stopping the appender, an exception will be logged
verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
val loggingEvent = loggingEventCaptor.getValue
assert(loggingEvent.getLevel == Level.ERROR)
assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
}

test("file appender async close stream gracefully") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]

// Make sure only logging errors
val logger = Logger.getRootLogger
logger.setLevel(Level.ERROR)
logger.addAppender(mockAppender)

val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream

// Close the stream before appender tries to read will cause an IOException
testInputStream.close()
testOutputStream.close()
val appender = FileAppender(testInputStream, testFile, new SparkConf)

// Stop the appender before an IOException is called during read
testInputStream.latchReadStarted.await()
appender.stop()
testInputStream.latchReadProceed.countDown()

appender.awaitTermination()

// Make sure no IOException errors have been logged as a result of appender closing gracefully
verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
import scala.collection.JavaConverters._
loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
assert(loggingEvent.getLevel != Level.ERROR
|| !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
}
}

/**
* Run the rolling file appender with data and see whether all the data was written correctly
* across rolled over files.
Expand Down Expand Up @@ -228,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
file.getName.startsWith(testFile.getName)
}.foreach { _.delete() }
}

/** Used to synchronize when read is called on a stream */
private trait LatchedInputStream extends PipedInputStream {
val latchReadStarted = new CountDownLatch(1)
val latchReadProceed = new CountDownLatch(1)
abstract override def read(): Int = {
latchReadStarted.countDown()
latchReadProceed.await()
super.read()
}
}
}