From 573b657bca5a77297cafbde489ba380b3372c81c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Oct 2015 13:54:02 -0700 Subject: [PATCH 01/18] progress --- .../spark/streaming/scheduler/JobScheduler.scala | 6 ++++-- .../streaming/util/FileBasedWriteAheadLog.scala | 4 ++-- .../util/FileBasedWriteAheadLogReader.scala | 14 +++++++++++++- .../apache/spark/streaming/util/HdfsUtils.scala | 5 +++++ 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 2480b4ec093e..1ed6fb0aa9d5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { if (eventLoop == null) return // scheduler has already been stopped logDebug("Stopping JobScheduler") - // First, stop receiving - receiverTracker.stop(processAllReceivedData) + if (receiverTracker != null) { + // First, stop receiving + receiverTracker.stop(processAllReceivedData) + } // Second, stop generating jobs. If it has to process all received data, // then this will wait for all the processing through JobScheduler to be over. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index bc3f2486c21f..e7d0c5f9fff2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -126,11 +126,11 @@ private[streaming] class FileBasedWriteAheadLog( val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) - logFilesToRead.iterator.map { file => + logFilesToRead.par.map { file => logDebug(s"Creating log reader with $file") val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _) - }.flatten.asJava + }.flatten.toIterator.asJava } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala index c3bb59f3fef9..3077728bd6f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.streaming.util -import java.io.{Closeable, EOFException} +import java.io.{IOException, Closeable, EOFException} import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration @@ -55,6 +55,18 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config logDebug("Error reading next item, EOF reached", e) close() false + case e: IOException => + logWarning("Error while trying to read data. If the file was deleted, " + + "this should be okay.", e) + if (HdfsUtils.checkFileExists(path, conf)) { + throw e + } else { + // file was deleted. This can occur when the daemon cleanup thread takes time to + // delete the file during recovery. + close() + false + } + case e: Exception => logWarning("Error while trying to read data from HDFS.", e) close() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index f60688f173c4..976ab8046626 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -71,4 +71,9 @@ private[streaming] object HdfsUtils { case _ => fs } } + + def checkFileExists(path: Path, conf: Configuration): Boolean = { + val fs = getFileSystemForPath(path, conf) + fs.isFile(path) + } } From 655f4bff61f1ffa21565c73eb0fe732c8ffada3e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Oct 2015 16:53:23 -0700 Subject: [PATCH 02/18] ready for PR --- .../streaming/util/FileBasedWriteAheadLogReader.scala | 3 ++- .../scala/org/apache/spark/streaming/util/HdfsUtils.scala | 8 +++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala index 3077728bd6f3..825a5bd0e0e8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala @@ -58,12 +58,13 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config case e: IOException => logWarning("Error while trying to read data. If the file was deleted, " + "this should be okay.", e) + close() if (HdfsUtils.checkFileExists(path, conf)) { + // if file exists, this could be a legitimate error throw e } else { // file was deleted. This can occur when the daemon cleanup thread takes time to // delete the file during recovery. - close() false } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 976ab8046626..dfe38ba7220c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -72,8 +72,10 @@ private[streaming] object HdfsUtils { } } - def checkFileExists(path: Path, conf: Configuration): Boolean = { - val fs = getFileSystemForPath(path, conf) - fs.isFile(path) + /** Check if the file exists at the given path. */ + def checkFileExists(path: String, conf: Configuration): Boolean = { + val hdpPath = new Path(path) + val fs = getFileSystemForPath(hdpPath, conf) + fs.isFile(hdpPath) } } From 06da0d1389f658a1eb3e9aaaf1750fd7ad85567a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Oct 2015 23:12:41 -0700 Subject: [PATCH 03/18] ready for PR --- .../util/FileBasedWriteAheadLog.scala | 42 +++++---- .../streaming/ReceivedBlockTrackerSuite.scala | 89 ++++++++++++++++++- .../streaming/util/WriteAheadLogSuite.scala | 16 ++++ 3 files changed, 127 insertions(+), 20 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index e7d0c5f9fff2..2b3278c5141f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer +import java.util.concurrent.ConcurrentSkipListSet import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog( val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") - - def deleteFiles() { - oldLogFiles.foreach { logInfo => - try { - val path = new Path(logInfo.path) - val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf) - fs.delete(path, true) - synchronized { pastLogs -= logInfo } - logDebug(s"Cleared log file $logInfo") - } catch { - case ex: Exception => - logWarning(s"Error clearing write ahead log file $logInfo", ex) - } + synchronized { pastLogs --= oldLogFiles } + def deleteFile(walInfo: LogInfo): Unit = { + try { + val path = new Path(walInfo.path) + val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf) + fs.delete(path, true) + logDebug(s"Cleared log file $walInfo") + } catch { + case ex: Exception => + logWarning(s"Error clearing write ahead log file $walInfo", ex) } logInfo(s"Cleared log files in $logDirectory older than $threshTime") } - if (!executionContext.isShutdown) { - val f = Future { deleteFiles() } - if (waitForCompletion) { - import scala.concurrent.duration._ - Await.ready(f, 1 second) + oldLogFiles.foreach { logInfo => + if (!executionContext.isShutdown) { + val f = Future { deleteFile(logInfo) } + if (waitForCompletion) { + import scala.concurrent.duration._ + Await.ready(f, 1 second) + } } } } @@ -225,7 +225,11 @@ private[streaming] class FileBasedWriteAheadLog( private[streaming] object FileBasedWriteAheadLog { - case class LogInfo(startTime: Long, endTime: Long, path: String) + case class LogInfo(startTime: Long, endTime: Long, path: String) extends Comparable[LogInfo] { + override def compareTo(o: LogInfo): Int = { + endTime.compareTo(o.endTime) + } + } val logFileRegex = """log-(\d+)-(\d+)""".r diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index f793a12843b2..65ae4103a188 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader} +import org.apache.spark.streaming.util.{WriteAheadLogSuite, WriteAheadLogUtils, FileBasedWriteAheadLogReader} import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -207,6 +207,87 @@ class ReceivedBlockTrackerSuite tracker1.isWriteAheadLogEnabled should be (false) } + test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") { + val manualClock = new ManualClock + conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1") + require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1) + val tracker = createTracker(clock = manualClock) + + val addBlocks = generateBlockInfos() + val batch1 = addBlocks.slice(0, 1) + val batch2 = addBlocks.slice(1, 3) + val batch3 = addBlocks.slice(3, 6) + + def advanceTime(): Unit = manualClock.advance(1000) + + assert(getWriteAheadLogFiles().length === 0) + + val start = manualClock.getTimeMillis() + manualClock.advance(500) + tracker.cleanupOldBatches(start, waitForCompletion = false) + assert(getWriteAheadLogFiles().length === 1) + advanceTime() + batch1.foreach(tracker.addBlock) + assert(getWriteAheadLogFiles().length === 1) + advanceTime() + + val batch1Time = manualClock.getTimeMillis() + tracker.allocateBlocksToBatch(batch1Time) + advanceTime() + + batch2.foreach { block => + tracker.addBlock(block) + advanceTime() + } + assert(getWriteAheadLogFiles().length === 3) + + advanceTime() + + val batch2Time = manualClock.getTimeMillis() + tracker.allocateBlocksToBatch(batch2Time) + + advanceTime() + + assert(getWriteAheadLogFiles().length === 4) + tracker.cleanupOldBatches(batch1Time, waitForCompletion = true) + assert(getWriteAheadLogFiles().length === 3) + + batch3.foreach { block => + tracker.addBlock(block) + advanceTime() + } + val batch3Time = manualClock.getTimeMillis() + tracker.allocateBlocksToBatch(batch3Time) + + advanceTime() + assert(getWriteAheadLogFiles().length === 4) + advanceTime() + tracker.cleanupOldBatches(batch2Time, waitForCompletion = true) + assert(getWriteAheadLogFiles().length === 3) + + def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = { + subject.getBlocksOfBatchAndStream(batch3Time, streamId) should be( + base.getBlocksOfBatchAndStream(batch3Time, streamId)) + subject.getBlocksOfBatchAndStream(batch2Time, streamId) should be( + base.getBlocksOfBatchAndStream(batch2Time, streamId)) + subject.getBlocksOfBatchAndStream(batch1Time, streamId) should be(Nil) + } + + val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock) + compareTrackers(tracker, tracker2) + + WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(start), Seq(createBatchCleanup(start))) + + assert(getWriteAheadLogFiles().length === 4) + val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock) + compareTrackers(tracker, tracker3) + WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(batch1Time), + Seq(createBatchAllocation(batch1Time, batch1))) + assert(getWriteAheadLogFiles().length === 5) + val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock) + compareTrackers(tracker, tracker4) + } + /** * Create tracker object with the optional provided clock. Use fake clock if you * want to control time by manually incrementing it to test log clean. @@ -233,6 +314,12 @@ class ReceivedBlockTrackerSuite getWrittenLogData(Seq(logFile)) } + /** Get the log file name for the given log start time. */ + def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = { + checkpointDirectory.toString + File.separator + "receivedBlockMetadata" + + File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}" + } + /** * Get all the data written in the given write ahead log files. By default, it will read all * files in the test log directory. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 93ae41a3d2ec..5ec3c01b9676 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.concurrent.Eventually._ import org.scalatest.BeforeAndAfter +import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent import org.apache.spark.util.{ManualClock, Utils} import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} @@ -366,6 +367,21 @@ object WriteAheadLogSuite { segments } + /** + * Write received block tracker events to a file using the writer class and return an array of + * the file segments written. + */ + def writeEventsUsingWriter( + filePath: String, + events: Seq[ReceivedBlockTrackerLogEvent]): Seq[FileBasedWriteAheadLogSegment] = { + val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf) + val segments = events.map { + item => writer.write(ByteBuffer.wrap(Utils.serialize(item))) + } + writer.close() + segments + } + /** Write data to rotating files in log directory using the WriteAheadLog class. */ def writeDataUsingWriteAheadLog( logDirectory: String, From be5a2ab7f1dcd655eefea48918e445ed2bebad55 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Oct 2015 23:20:31 -0700 Subject: [PATCH 04/18] minor --- .../spark/streaming/util/FileBasedWriteAheadLog.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 2b3278c5141f..2c5b44526841 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -225,11 +225,7 @@ private[streaming] class FileBasedWriteAheadLog( private[streaming] object FileBasedWriteAheadLog { - case class LogInfo(startTime: Long, endTime: Long, path: String) extends Comparable[LogInfo] { - override def compareTo(o: LogInfo): Int = { - endTime.compareTo(o.endTime) - } - } + case class LogInfo(startTime: Long, endTime: Long, path: String) val logFileRegex = """log-(\d+)-(\d+)""".r From 7f8cfe340010e867ab73c40fc0ba39b0d0144695 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Oct 2015 23:29:08 -0700 Subject: [PATCH 05/18] fix whitespace --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 5ec3c01b9676..bbcdc62ead6f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -368,7 +368,7 @@ object WriteAheadLogSuite { } /** - * Write received block tracker events to a file using the writer class and return an array of + * Write received block tracker events to a file using the writer class and return an array of * the file segments written. */ def writeEventsUsingWriter( From 98da092d14dfa3f9f95261fe1b307f0dda7a785f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 9 Nov 2015 17:15:23 -0800 Subject: [PATCH 06/18] update --- .../util/FileBasedWriteAheadLog.scala | 31 +++-- .../util/FileBasedWriteAheadLogReader.scala | 4 +- .../streaming/ReceivedBlockTrackerSuite.scala | 106 +++++++++--------- .../streaming/util/WriteAheadLogSuite.scala | 15 --- 4 files changed, 77 insertions(+), 79 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 2c5b44526841..62086acdfc1b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -17,11 +17,11 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentSkipListSet import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import scala.collection.parallel.{ThreadPoolTaskSupport, ForkJoinTaskSupport} import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps @@ -58,8 +58,8 @@ private[streaming] class FileBasedWriteAheadLog( private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("") private val threadpoolName = s"WriteAheadLogManager $callerNameTag" - implicit private val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName)) + private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName) + private val executionContext = ExecutionContext.fromExecutorService(threadpool) override protected val logName = s"WriteAheadLogManager $callerNameTag" private var currentLogPath: Option[String] = None @@ -125,13 +125,21 @@ private[streaming] class FileBasedWriteAheadLog( */ def readAll(): JIterator[ByteBuffer] = synchronized { val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath - logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) - - logFilesToRead.par.map { file => + logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n")) + def readFile(file: String): Iterator[ByteBuffer] = { logDebug(s"Creating log reader with $file") val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _) - }.flatten.toIterator.asJava + } + if (!closeFileAfterWrite) { + logFilesToRead.iterator.map(readFile).flatten.asJava + } else { + // For performance gains, it makes sense to parallelize the recovery if + // closeFileAfterWrite = true + val parallelFilesToRead = logFilesToRead.par + parallelFilesToRead.tasksupport = new ThreadPoolTaskSupport(threadpool) + parallelFilesToRead.map(readFile).flatten.toIterator.asJava + } } /** @@ -147,10 +155,13 @@ private[streaming] class FileBasedWriteAheadLog( * asynchronously. */ def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { - val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } + val oldLogFiles = synchronized { + val expiredLogs = pastLogs.filter { _.endTime < threshTime } + pastLogs --= expiredLogs + expiredLogs + } logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") - synchronized { pastLogs --= oldLogFiles } def deleteFile(walInfo: LogInfo): Unit = { try { val path = new Path(walInfo.path) @@ -165,7 +176,7 @@ private[streaming] class FileBasedWriteAheadLog( } oldLogFiles.foreach { logInfo => if (!executionContext.isShutdown) { - val f = Future { deleteFile(logInfo) } + val f = Future { deleteFile(logInfo) }(executionContext) if (waitForCompletion) { import scala.concurrent.duration._ Await.ready(f, 1 second) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala index 825a5bd0e0e8..804ab033f83b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala @@ -60,10 +60,10 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config "this should be okay.", e) close() if (HdfsUtils.checkFileExists(path, conf)) { - // if file exists, this could be a legitimate error + // If file exists, this could be a legitimate error throw e } else { - // file was deleted. This can occur when the daemon cleanup thread takes time to + // File was deleted. This can occur when the daemon cleanup thread takes time to // delete the file during recovery. false } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 65ae4103a188..7db17abb7947 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.File +import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.{WriteAheadLogSuite, WriteAheadLogUtils, FileBasedWriteAheadLogReader} +import org.apache.spark.streaming.util._ import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -208,83 +209,71 @@ class ReceivedBlockTrackerSuite } test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") { - val manualClock = new ManualClock conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1") require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1) - val tracker = createTracker(clock = manualClock) val addBlocks = generateBlockInfos() val batch1 = addBlocks.slice(0, 1) val batch2 = addBlocks.slice(1, 3) - val batch3 = addBlocks.slice(3, 6) - - def advanceTime(): Unit = manualClock.advance(1000) + val batch3 = addBlocks.slice(3, addBlocks.length) assert(getWriteAheadLogFiles().length === 0) - val start = manualClock.getTimeMillis() - manualClock.advance(500) - tracker.cleanupOldBatches(start, waitForCompletion = false) - assert(getWriteAheadLogFiles().length === 1) - advanceTime() - batch1.foreach(tracker.addBlock) + // list of timestamps for files + val t = Seq.tabulate(5)(i => i * 1000) + + writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0)))) assert(getWriteAheadLogFiles().length === 1) - advanceTime() - val batch1Time = manualClock.getTimeMillis() - tracker.allocateBlocksToBatch(batch1Time) - advanceTime() + // The goal is to create several log files which should have been cleaned up. + // If we face any issue during recovery, because these old files exist, then we need to make + // deletion more robust rather than a parallelized operation where we fire and forget + val batch1Allocation = createBatchAllocation(t(1), batch1) + writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation) - batch2.foreach { block => - tracker.addBlock(block) - advanceTime() - } - assert(getWriteAheadLogFiles().length === 3) + writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1)))) - advanceTime() + val batch2Allocation = createBatchAllocation(t(3), batch2) + writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation) - val batch2Time = manualClock.getTimeMillis() - tracker.allocateBlocksToBatch(batch2Time) + writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent)) - advanceTime() + // We should have 5 different log files as we called `writeEventsManually` with 5 different + // timestamps + assert(getWriteAheadLogFiles().length === 5) - assert(getWriteAheadLogFiles().length === 4) - tracker.cleanupOldBatches(batch1Time, waitForCompletion = true) - assert(getWriteAheadLogFiles().length === 3) + // Create the tracker to recover from the log files. We're going to ask the tracker to clean + // things up, and then we're going to rewrite that data, and recover using a different tracker. + // They should have identical data no matter what + val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) - batch3.foreach { block => - tracker.addBlock(block) - advanceTime() + def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = { + subject.getBlocksOfBatchAndStream(t(3), streamId) should be( + base.getBlocksOfBatchAndStream(t(3), streamId)) + subject.getBlocksOfBatchAndStream(t(1), streamId) should be( + base.getBlocksOfBatchAndStream(t(1), streamId)) + subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil) } - val batch3Time = manualClock.getTimeMillis() - tracker.allocateBlocksToBatch(batch3Time) - advanceTime() - assert(getWriteAheadLogFiles().length === 4) - advanceTime() - tracker.cleanupOldBatches(batch2Time, waitForCompletion = true) + // ask the tracker to clean up some old files + tracker.cleanupOldBatches(t(3), waitForCompletion = true) assert(getWriteAheadLogFiles().length === 3) - def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = { - subject.getBlocksOfBatchAndStream(batch3Time, streamId) should be( - base.getBlocksOfBatchAndStream(batch3Time, streamId)) - subject.getBlocksOfBatchAndStream(batch2Time, streamId) should be( - base.getBlocksOfBatchAndStream(batch2Time, streamId)) - subject.getBlocksOfBatchAndStream(batch1Time, streamId) should be(Nil) - } - - val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock) + val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) compareTrackers(tracker, tracker2) - WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(start), Seq(createBatchCleanup(start))) - + // rewrite first file + writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0)))) assert(getWriteAheadLogFiles().length === 4) - val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock) + // make sure trackers are consistent + val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) compareTrackers(tracker, tracker3) - WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(batch1Time), - Seq(createBatchAllocation(batch1Time, batch1))) + + // rewrite second file + writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation) assert(getWriteAheadLogFiles().length === 5) - val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock) + // make sure trackers are consistent + val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) compareTrackers(tracker, tracker4) } @@ -309,6 +298,19 @@ class ReceivedBlockTrackerSuite BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L)))) } + /** + * Write received block tracker events to a file manually. + */ + def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = { + val writer = HdfsUtils.getOutputStream(filePath, hadoopConf) + events.foreach { event => + val bytes = Utils.serialize(event) + writer.writeInt(bytes.size) + writer.write(bytes) + } + writer.close() + } + /** Get all the data written in the given write ahead log file. */ def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = { getWrittenLogData(Seq(logFile)) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index bbcdc62ead6f..0eaeb12ef4bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -367,21 +367,6 @@ object WriteAheadLogSuite { segments } - /** - * Write received block tracker events to a file using the writer class and return an array of - * the file segments written. - */ - def writeEventsUsingWriter( - filePath: String, - events: Seq[ReceivedBlockTrackerLogEvent]): Seq[FileBasedWriteAheadLogSegment] = { - val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf) - val segments = events.map { - item => writer.write(ByteBuffer.wrap(Utils.serialize(item))) - } - writer.close() - segments - } - /** Write data to rotating files in log directory using the WriteAheadLog class. */ def writeDataUsingWriteAheadLog( logDirectory: String, From 0b7279fdda081e8f4557cc0fc0366331380e79e0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 9 Nov 2015 17:23:58 -0800 Subject: [PATCH 07/18] minor --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 62086acdfc1b..fd6d04f26087 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -21,7 +21,7 @@ import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.{ThreadPoolTaskSupport, ForkJoinTaskSupport} +import scala.collection.parallel.ThreadPoolTaskSupport import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps From 83aa28e05de4874eebc89be11dce0a0b8213007e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 10 Nov 2015 10:06:02 -0800 Subject: [PATCH 08/18] add check for openInputStream --- .../FileBasedWriteAheadLogRandomReader.scala | 2 +- .../util/FileBasedWriteAheadLogReader.scala | 2 +- .../apache/spark/streaming/util/HdfsUtils.scala | 17 +++++++++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala index f7168229ec15..56d4977da0b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala @@ -30,7 +30,7 @@ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: extends Closeable { private val instream = HdfsUtils.getInputStream(path, conf) - private var closed = false + private var closed = (instream == null) // the file may be deleted as we're opening the stream def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized { assertOpen() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala index 804ab033f83b..a375c0729534 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala @@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config extends Iterator[ByteBuffer] with Closeable with Logging { private val instream = HdfsUtils.getInputStream(path, conf) - private var closed = false + private var closed = (instream == null) // the file may be deleted as we're opening the stream private var nextItem: Option[ByteBuffer] = None override def hasNext: Boolean = synchronized { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index dfe38ba7220c..13a765d035ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.streaming.util +import java.io.IOException + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ @@ -42,8 +44,19 @@ private[streaming] object HdfsUtils { def getInputStream(path: String, conf: Configuration): FSDataInputStream = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) - val instream = dfs.open(dfsPath) - instream + if (dfs.isFile(dfsPath)) { + try { + dfs.open(dfsPath) + } catch { + case e: IOException => + // If we are really unlucky, the file may be deleted as we're opening the stream. + // This can happen as clean up is performed by daemon threads that may be left over from + // previous runs. + if (!dfs.isFile(dfsPath)) null else throw e + } + } else { + null + } } def checkState(state: Boolean, errorMsg: => String) { From 9162ed923e4b55fed58560177bf102f69a0f6fd6 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 10 Nov 2015 18:34:02 -0800 Subject: [PATCH 09/18] save change --- .../util/FileBasedWriteAheadLog.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index fd6d04f26087..5f04ec7fddfb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer +import java.util.concurrent.ThreadPoolExecutor import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ @@ -136,9 +137,7 @@ private[streaming] class FileBasedWriteAheadLog( } else { // For performance gains, it makes sense to parallelize the recovery if // closeFileAfterWrite = true - val parallelFilesToRead = logFilesToRead.par - parallelFilesToRead.tasksupport = new ThreadPoolTaskSupport(threadpool) - parallelFilesToRead.map(readFile).flatten.toIterator.asJava + parallelIteratorCreator(threadpool, logFilesToRead, readFile).asJava } } @@ -262,4 +261,21 @@ private[streaming] object FileBasedWriteAheadLog { } }.sortBy { _.startTime } } + + /** + * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory + * at any given time, where `n` is the size of the thread pool. This is crucial for use cases + * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to + * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize. + */ + def parallelIteratorCreator[I, O]( + threadpool: ThreadPoolExecutor, + source: Seq[I], + handler: I => Iterator[O]): Iterator[O] = { + val parallelCollection = source.grouped(threadpool.getPoolSize).toSeq.flatMap { element => + element.map(handler) + }.par + parallelCollection.tasksupport = new ThreadPoolTaskSupport(threadpool) + parallelCollection.flatten.toIterator + } } From c9ea4238e63b579f42179b1da6c00140d9aa0c0f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 10 Nov 2015 21:52:11 -0800 Subject: [PATCH 10/18] add reader limiting test --- .../util/FileBasedWriteAheadLog.scala | 10 ++-- .../streaming/util/WriteAheadLogSuite.scala | 60 ++++++++++++++++++- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 5f04ec7fddfb..c97637f1779a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -272,10 +272,10 @@ private[streaming] object FileBasedWriteAheadLog { threadpool: ThreadPoolExecutor, source: Seq[I], handler: I => Iterator[O]): Iterator[O] = { - val parallelCollection = source.grouped(threadpool.getPoolSize).toSeq.flatMap { element => - element.map(handler) - }.par - parallelCollection.tasksupport = new ThreadPoolTaskSupport(threadpool) - parallelCollection.flatten.toIterator + source.grouped(threadpool.getCorePoolSize).flatMap { element => + val parallelCollection = element.par + parallelCollection.tasksupport = new ThreadPoolTaskSupport(threadpool) + parallelCollection.map(handler) + }.flatten } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 1b2d11284323..7afe30e20d33 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -39,7 +40,7 @@ import org.scalatest.{BeforeAndAfterEach, BeforeAndAfter} import org.scalatest.mock.MockitoSugar import org.apache.spark.streaming.scheduler._ -import org.apache.spark.util.{ThreadUtils, ManualClock, Utils} +import org.apache.spark.util.{CompletionIterator, ThreadUtils, ManualClock, Utils} import org.apache.spark.{SparkConf, SparkFunSuite} /** Common tests for WriteAheadLogs that we would like to test with different configurations. */ @@ -197,6 +198,63 @@ class FileBasedWriteAheadLogSuite import WriteAheadLogSuite._ + test("FileBasedWriteAheadLog - parallel readAll opens at most 'numThreads' files") { + /* + If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of + files. This causes recovery to take a very long time. In order to make it quicker, we + parallelized the reading of these files. This test makes sure that we limit the number of + open files to the size of the number of threads in our thread pool rather than the size of + the list of files. + */ + val numThreads = 8 + val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool") + class GetMaxCounter { + private val value = new AtomicInteger() + @volatile private var max: Int = 0 + def increment(): Unit = { + val atInstant = value.incrementAndGet() + if (atInstant > max) max = atInstant + } + def decrement(): Unit = { value.decrementAndGet() } + def get(): Int = value.get() + def getMax(): Int = max + } + /** + * We need an object that can be iterated through, which will increment our counter once + * initialized, and decrement once closed. This way we can simulate how many "streams" will + * be opened during a real use case. + */ + class ReaderObject(cnt: GetMaxCounter, value: Int) extends Iterator[Int] with Closeable { + cnt.increment() + private var returnedValue: Boolean = false + override def hasNext(): Boolean = !returnedValue + override def next(): Int = { + if (!returnedValue) { + returnedValue = true + value + } else { + -1 + } + } + override def close(): Unit = { + cnt.decrement() + } + } + try { + val testSeq = 1 to 64 + val counter = new GetMaxCounter() + def handle(value: Int): Iterator[Int] = { + val reader = new ReaderObject(counter, value) + CompletionIterator[Int, Iterator[Int]](reader, reader.close) + } + val iterator = FileBasedWriteAheadLog.parallelIteratorCreator(tpool, testSeq, handle) + assert(iterator.toSeq === testSeq) + assert(counter.getMax() <= numThreads) + } finally { + tpool.shutdownNow() + } + } + test("FileBasedWriteAheadLogWriter - writing data") { val dataToWrite = generateRandomData() val segments = writeDataUsingWriter(testFile, dataToWrite) From c250d2e15e9435e8c7e9ac9c4927057e4c6cf9f7 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 10 Nov 2015 23:13:28 -0800 Subject: [PATCH 11/18] minor --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index c97637f1779a..3ca81895d186 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -272,9 +272,10 @@ private[streaming] object FileBasedWriteAheadLog { threadpool: ThreadPoolExecutor, source: Seq[I], handler: I => Iterator[O]): Iterator[O] = { + val taskSupport = new ThreadPoolTaskSupport(threadpool) source.grouped(threadpool.getCorePoolSize).flatMap { element => val parallelCollection = element.par - parallelCollection.tasksupport = new ThreadPoolTaskSupport(threadpool) + parallelCollection.tasksupport = taskSupport parallelCollection.map(handler) }.flatten } From 22fbacaa28a877b8f82e430bc3d648fb325cc8e6 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Nov 2015 08:49:41 -0800 Subject: [PATCH 12/18] address comments --- .../util/FileBasedWriteAheadLog.scala | 13 +++--- .../streaming/util/WriteAheadLogSuite.scala | 42 ++++++------------- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 3ca81895d186..660e7c04852a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -137,7 +137,7 @@ private[streaming] class FileBasedWriteAheadLog( } else { // For performance gains, it makes sense to parallelize the recovery if // closeFileAfterWrite = true - parallelIteratorCreator(threadpool, logFilesToRead, readFile).asJava + seqToParIterator(threadpool, logFilesToRead, readFile).asJava } } @@ -268,13 +268,14 @@ private[streaming] object FileBasedWriteAheadLog { * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize. */ - def parallelIteratorCreator[I, O]( - threadpool: ThreadPoolExecutor, + def seqToParIterator[I, O]( + tpool: ThreadPoolExecutor, source: Seq[I], handler: I => Iterator[O]): Iterator[O] = { - val taskSupport = new ThreadPoolTaskSupport(threadpool) - source.grouped(threadpool.getCorePoolSize).flatMap { element => - val parallelCollection = element.par + val taskSupport = new ThreadPoolTaskSupport(tpool) + val groupSize = math.max(math.max(tpool.getCorePoolSize, tpool.getPoolSize), 8) + source.grouped(groupSize).flatMap { group => + val parallelCollection = group.par parallelCollection.tasksupport = taskSupport parallelCollection.map(handler) }.flatten diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 7afe30e20d33..23ba178d46c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -198,7 +198,7 @@ class FileBasedWriteAheadLogSuite import WriteAheadLogSuite._ - test("FileBasedWriteAheadLog - parallel readAll opens at most 'numThreads' files") { + test("FileBasedWriteAheadLog - seqToParIterator") { /* If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of files. This causes recovery to take a very long time. In order to make it quicker, we @@ -211,43 +211,24 @@ class FileBasedWriteAheadLogSuite class GetMaxCounter { private val value = new AtomicInteger() @volatile private var max: Int = 0 - def increment(): Unit = { + def increment(): Unit = synchronized { val atInstant = value.incrementAndGet() if (atInstant > max) max = atInstant } - def decrement(): Unit = { value.decrementAndGet() } - def get(): Int = value.get() - def getMax(): Int = max - } - /** - * We need an object that can be iterated through, which will increment our counter once - * initialized, and decrement once closed. This way we can simulate how many "streams" will - * be opened during a real use case. - */ - class ReaderObject(cnt: GetMaxCounter, value: Int) extends Iterator[Int] with Closeable { - cnt.increment() - private var returnedValue: Boolean = false - override def hasNext(): Boolean = !returnedValue - override def next(): Int = { - if (!returnedValue) { - returnedValue = true - value - } else { - -1 - } - } - override def close(): Unit = { - cnt.decrement() - } + def decrement(): Unit = synchronized { value.decrementAndGet() } + def get(): Int = synchronized { value.get() } + def getMax(): Int = synchronized { max } } try { val testSeq = 1 to 64 val counter = new GetMaxCounter() def handle(value: Int): Iterator[Int] = { - val reader = new ReaderObject(counter, value) - CompletionIterator[Int, Iterator[Int]](reader, reader.close) + new CompletionIterator[Int, Iterator[Int]](Iterator(value)) { + counter.increment() + override def completion() { counter.decrement() } + } } - val iterator = FileBasedWriteAheadLog.parallelIteratorCreator(tpool, testSeq, handle) + val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle) assert(iterator.toSeq === testSeq) assert(counter.getMax() <= numThreads) } finally { @@ -641,6 +622,9 @@ object WriteAheadLogSuite { allowBatching: Boolean): Seq[String] = { val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching) val data = wal.readAll().asScala.map(byteBufferToString).toSeq + // compute data, otherwise the lazy computation will fail because of wal.close() as the + // thread pool for parallel recovery gets killed + data.length wal.close() data } From f43ecbe58b084d02baf01eccb4363dda0e15a384 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Nov 2015 08:51:12 -0800 Subject: [PATCH 13/18] minor2 --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 23ba178d46c9..9241093f7f20 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -230,6 +230,7 @@ class FileBasedWriteAheadLogSuite } val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle) assert(iterator.toSeq === testSeq) + assert(counter.getMax() > 1) // make sure we are doing a parallel computation! assert(counter.getMax() <= numThreads) } finally { tpool.shutdownNow() From 1ba834000b40f0d4cf39be5972cb585f9bbb9006 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Nov 2015 08:53:20 -0800 Subject: [PATCH 14/18] minor grammar --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 9241093f7f20..b86ad85c31f2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -623,8 +623,8 @@ object WriteAheadLogSuite { allowBatching: Boolean): Seq[String] = { val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching) val data = wal.readAll().asScala.map(byteBufferToString).toSeq - // compute data, otherwise the lazy computation will fail because of wal.close() as the - // thread pool for parallel recovery gets killed + // The thread pool for parallel recovery gets killed with wal.close(). Therefore we need to + // eagerly compute data, otherwise the lazy computation will fail. data.length wal.close() data From 7e1829b87e4809a2096b969cdb8de73f03afd616 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Nov 2015 18:19:06 -0800 Subject: [PATCH 15/18] address 3 --- .../util/FileBasedWriteAheadLog.scala | 2 +- .../streaming/util/WriteAheadLogSuite.scala | 50 +++++++++++++++---- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 660e7c04852a..0755ecb37f11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -273,7 +273,7 @@ private[streaming] object FileBasedWriteAheadLog { source: Seq[I], handler: I => Iterator[O]): Iterator[O] = { val taskSupport = new ThreadPoolTaskSupport(tpool) - val groupSize = math.max(math.max(tpool.getCorePoolSize, tpool.getPoolSize), 8) + val groupSize = tpool.getMaximumPoolSize.max(8) source.grouped(groupSize).flatMap { group => val parallelCollection = group.par parallelCollection.tasksupport = taskSupport diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index c57845059af9..8ec59b52cef5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -20,7 +20,7 @@ import java.io._ import java.nio.ByteBuffer import java.util.{Iterator => JIterator} import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -219,17 +219,36 @@ class FileBasedWriteAheadLogSuite def getMax(): Int = synchronized { max } } try { - val testSeq = 1 to 64 + // If Jenkins is slow, we may not have a chance to run many threads simultaneously. Having + // a latch will make sure that all the threads can be launched altogether. + val latch = new CountDownLatch(1) + val testSeq = 1 to 1000 val counter = new GetMaxCounter() def handle(value: Int): Iterator[Int] = { new CompletionIterator[Int, Iterator[Int]](Iterator(value)) { counter.increment() + // block so that other threads also launch + latch.await(10, TimeUnit.SECONDS) override def completion() { counter.decrement() } } } - val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle) - assert(iterator.toSeq === testSeq) - assert(counter.getMax() > 1) // make sure we are doing a parallel computation! + @volatile var collected: Seq[Int] = Nil + val t = new Thread() { + override def run() { + // run the calculation on a separate thread so that we can release the latch + val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle) + collected = iterator.toSeq + } + } + t.start() + eventually(Eventually.timeout(10.seconds)) { + // make sure we are doing a parallel computation! + assert(counter.getMax() > 1) + } + latch.countDown() + t.join(10000) + assert(collected === testSeq) + // make sure we didn't open too many Iterators assert(counter.getMax() <= numThreads) } finally { tpool.shutdownNow() @@ -297,6 +316,22 @@ class FileBasedWriteAheadLogSuite assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1))) } + test("FileBasedWriteAheadLogReader - handles errors when file doesn't exist") { + // Write data manually for testing the sequential reader + val dataToWrite = generateRandomData() + writeDataUsingWriter(testFile, dataToWrite) + val tFile = new File(testFile) + assert(tFile.exists()) + // Verify the data can be read and is same as the one correctly written + assert(readDataUsingReader(testFile) === dataToWrite) + + tFile.delete() + assert(!tFile.exists()) + + // Verify that no exception is thrown if file doesn't exist + assert(readDataUsingReader(testFile) === Nil) + } + test("FileBasedWriteAheadLogRandomReader - reading data using random reader") { // Write data manually for testing the random reader val writtenData = generateRandomData() @@ -619,10 +654,7 @@ object WriteAheadLogSuite { closeFileAfterWrite: Boolean, allowBatching: Boolean): Seq[String] = { val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching) - val data = wal.readAll().asScala.map(byteBufferToString).toSeq - // The thread pool for parallel recovery gets killed with wal.close(). Therefore we need to - // eagerly compute data, otherwise the lazy computation will fail. - data.length + val data = wal.readAll().asScala.map(byteBufferToString).toArray wal.close() data } From dbb31e372d178c70bb3a6f8b18c931ce0867d4b2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 11 Nov 2015 22:11:02 -0800 Subject: [PATCH 16/18] address minor --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 3 ++- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 0755ecb37f11..620b4be27b49 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -59,7 +59,7 @@ private[streaming] class FileBasedWriteAheadLog( private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("") private val threadpoolName = s"WriteAheadLogManager $callerNameTag" - private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName) + private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 8) private val executionContext = ExecutionContext.fromExecutorService(threadpool) override protected val logName = s"WriteAheadLogManager $callerNameTag" @@ -161,6 +161,7 @@ private[streaming] class FileBasedWriteAheadLog( } logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") + def deleteFile(walInfo: LogInfo): Unit = { try { val path = new Path(walInfo.path) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 8ec59b52cef5..4273fd7dda8b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -328,6 +328,10 @@ class FileBasedWriteAheadLogSuite tFile.delete() assert(!tFile.exists()) + val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf) + assert(!reader.hasNext) + reader.close() + // Verify that no exception is thrown if file doesn't exist assert(readDataUsingReader(testFile) === Nil) } From a31822c1762383f17f379c4794c29c8bbe3203fa Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 12 Nov 2015 08:31:29 -0800 Subject: [PATCH 17/18] increase thread size for Jenkins --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 620b4be27b49..b24ab92d3473 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -59,7 +59,7 @@ private[streaming] class FileBasedWriteAheadLog( private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("") private val threadpoolName = s"WriteAheadLogManager $callerNameTag" - private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 8) + private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 64) private val executionContext = ExecutionContext.fromExecutorService(threadpool) override protected val logName = s"WriteAheadLogManager $callerNameTag" From 79e9b03e55382d64607ee39ac1a66a102574409a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 12 Nov 2015 14:17:34 -0800 Subject: [PATCH 18/18] reduce max thread limit --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index b24ab92d3473..72705f1a9c01 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -59,7 +59,7 @@ private[streaming] class FileBasedWriteAheadLog( private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("") private val threadpoolName = s"WriteAheadLogManager $callerNameTag" - private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 64) + private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20) private val executionContext = ExecutionContext.fromExecutorService(threadpool) override protected val logName = s"WriteAheadLogManager $callerNameTag"