Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ready for PR
  • Loading branch information
brkyvz committed Oct 30, 2015
commit 06da0d1389f658a1eb3e9aaaf1750fd7ad85567a
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.streaming.util

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentSkipListSet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoteToSelf: Remove

import java.util.{Iterator => JIterator}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog(
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

def deleteFiles() {
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $logInfo", ex)
}
synchronized { pastLogs --= oldLogFiles }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a test to make sure that even if the delete is not successful, the recovery is robust, and will delete the file once the next cleanup request is sent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can merged these two synchronized blocks here and move the print later.

def deleteFile(walInfo: LogInfo): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why rename this to walInfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logInfo is Spark's logging method

try {
val path = new Path(walInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
logDebug(s"Cleared log file $walInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $walInfo", ex)
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
val f = Future { deleteFiles() }
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
oldLogFiles.foreach { logInfo =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: When waitForCompletion is true, this whole deletion is done one by one.
Might be better to compose all the Future.sequence into a single Future to wait. Then they will work in parallel.
Not a problem really as waitForCompletion is true only in tests. Do this only if there is any other critical feedback.

if (!executionContext.isShutdown) {
val f = Future { deleteFile(logInfo) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again should not use the default execution context. please make a execution context for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the execution context was defined implicitly in the class definition. Made it non-implicit for better readability

if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
}
}
}
}
Expand Down Expand Up @@ -225,7 +225,11 @@ private[streaming] class FileBasedWriteAheadLog(

private[streaming] object FileBasedWriteAheadLog {

case class LogInfo(startTime: Long, endTime: Long, path: String)
case class LogInfo(startTime: Long, endTime: Long, path: String) extends Comparable[LogInfo] {
override def compareTo(o: LogInfo): Int = {
endTime.compareTo(o.endTime)
}
}

val logFileRegex = """log-(\d+)-(\d+)""".r

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util.{WriteAheadLogSuite, WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}

Expand Down Expand Up @@ -207,6 +207,87 @@ class ReceivedBlockTrackerSuite
tracker1.isWriteAheadLogEnabled should be (false)
}

test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
val manualClock = new ManualClock
conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
val tracker = createTracker(clock = manualClock)

val addBlocks = generateBlockInfos()
val batch1 = addBlocks.slice(0, 1)
val batch2 = addBlocks.slice(1, 3)
val batch3 = addBlocks.slice(3, 6)

def advanceTime(): Unit = manualClock.advance(1000)

assert(getWriteAheadLogFiles().length === 0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you added inline comments to explain each step, so that the reader can understand whats going on.

val start = manualClock.getTimeMillis()
manualClock.advance(500)
tracker.cleanupOldBatches(start, waitForCompletion = false)
assert(getWriteAheadLogFiles().length === 1)
advanceTime()
batch1.foreach(tracker.addBlock)
assert(getWriteAheadLogFiles().length === 1)
advanceTime()

val batch1Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch1Time)
advanceTime()

batch2.foreach { block =>
tracker.addBlock(block)
advanceTime()
}
assert(getWriteAheadLogFiles().length === 3)

advanceTime()

val batch2Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch2Time)

advanceTime()

assert(getWriteAheadLogFiles().length === 4)
tracker.cleanupOldBatches(batch1Time, waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)

batch3.foreach { block =>
tracker.addBlock(block)
advanceTime()
}
val batch3Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch3Time)

advanceTime()
assert(getWriteAheadLogFiles().length === 4)
advanceTime()
tracker.cleanupOldBatches(batch2Time, waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)

def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
subject.getBlocksOfBatchAndStream(batch3Time, streamId) should be(
base.getBlocksOfBatchAndStream(batch3Time, streamId))
subject.getBlocksOfBatchAndStream(batch2Time, streamId) should be(
base.getBlocksOfBatchAndStream(batch2Time, streamId))
subject.getBlocksOfBatchAndStream(batch1Time, streamId) should be(Nil)
}

val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
compareTrackers(tracker, tracker2)

WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(start), Seq(createBatchCleanup(start)))

assert(getWriteAheadLogFiles().length === 4)
val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
compareTrackers(tracker, tracker3)
WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(batch1Time),
Seq(createBatchAllocation(batch1Time, batch1)))
assert(getWriteAheadLogFiles().length === 5)
val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
compareTrackers(tracker, tracker4)
}

/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log clean.
Expand All @@ -233,6 +314,12 @@ class ReceivedBlockTrackerSuite
getWrittenLogData(Seq(logFile))
}

/** Get the log file name for the given log start time. */
def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
}

/**
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.concurrent.Eventually._
import org.scalatest.BeforeAndAfter

import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}

Expand Down Expand Up @@ -366,6 +367,21 @@ object WriteAheadLogSuite {
segments
}

/**
* Write received block tracker events to a file using the writer class and return an array of
* the file segments written.
*/
def writeEventsUsingWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is related to ReceivedBlockTracker and should not be in this file, as this file is totally independent. Move this method to ReceivedBlockTrackerSuite.

filePath: String,
events: Seq[ReceivedBlockTrackerLogEvent]): Seq[FileBasedWriteAheadLogSegment] = {
val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
val segments = events.map {
item => writer.write(ByteBuffer.wrap(Utils.serialize(item)))
}
writer.close()
segments
}

/** Write data to rotating files in log directory using the WriteAheadLog class. */
def writeDataUsingWriteAheadLog(
logDirectory: String,
Expand Down