Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}

/**
* Get the maximum remember duration across all the input streams. This is a conservative but
* safe remember duration which can be used to perform cleanup operations.
*/
def getMaxInputStreamRememberDuration(): Duration = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a doc here that explains that the max remember duration can be used to set a conservative threshold on when it is safe to remember older data. Otherwise it's not super obvious why callers are using this particular time.

inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
}

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug("DStreamGraph.writeObject used")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,4 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
}
Some(blockRDD)
}

/**
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This
* implementation overrides the default implementation to clear received
* block information.
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
ssc.scheduler.receiverTracker.cleanupOldMetadata(time - rememberDuration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.streaming.receiver

/** Messages sent to the NetworkReceiver. */
import org.apache.spark.streaming.Time

/** Messages sent to the Receiver. */
private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -89,6 +90,9 @@ private[streaming] class ReceiverSupervisorImpl(
case StopReceiver =>
logInfo("Received stop signal")
stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
}

def ref = self
Expand Down Expand Up @@ -200,4 +204,9 @@ private[streaming] class ReceiverSupervisorImpl(

/** Generate new block ID */
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)

private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,29 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
jobScheduler.receiverTracker.cleanupOldMetadata(time - graph.batchDuration)

// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventActor ! DoCheckpoint(time)
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}

/** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
ssc.graph.clearCheckpointData(time)

// All the checkpoint information about which batches have been processed, etc have
// been saved to checkpoints, so its safe to delete block metadata and data WAL files
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
markBatchFullyProcessed(time)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private[streaming] class ReceivedBlockTracker(
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
log
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this is just random clean-up?


/** Stop the block tracker. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import scala.language.existentials
import akka.actor._

import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}

/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
Expand Down Expand Up @@ -119,9 +118,20 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/** Clean up metadata older than the given threshold time */
def cleanupOldMetadata(cleanupThreshTime: Time) {
/**
* Clean up the data and metadata of blocks and batches that are strictly
* older than the threshold time. Note that this does not
*/
def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {
// Clean up old block and batch metadata
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)

// Signal the receivers to delete old block data
if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
receiverInfo.values.flatMap { info => Option(info.actor) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to dig around and understand why ReceiverInfo's might have a null actor reference. There seems to be only one case when this happens and it's when an error is received. But in that case doesn't the error report itself come from the actor (so you can have a reference to the actor via the sender?).

At a minimum, should we maybe indicate in ReceiverInfo that users should defend against the possibility of the actor ref being null? And would it be possible to avoid the possibility of a null reference alltogether (not for this patch specifically, but in general).

.foreach { _ ! CleanupOldBlocks(cleanupThreshTime) }
}
}

/** Register a receiver */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[streaming] object HdfsUtils {
}

def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
// For local file systems, return the raw loca file system, such calls to flush()
// For local file systems, return the raw local file system, such calls to flush()
// actually flushes the stream.
val fs = path.getFileSystem(conf)
fs match {
Expand Down
157 changes: 126 additions & 31 deletions streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@

package org.apache.spark.streaming

import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.Semaphore

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkConf
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor}
import org.scalatest.FunSuite
import com.google.common.io.Files
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._

/** Testsuite for testing the network receiver behavior */
class ReceiverSuite extends FunSuite with Timeouts {
class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {

test("receiver life cycle") {

Expand Down Expand Up @@ -192,7 +197,6 @@ class ReceiverSuite extends FunSuite with Timeouts {
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
assert(
// the first and last block may be incomplete, so we slice them out
recordedBlocks.drop(1).dropRight(1).forall { block =>
Expand All @@ -203,39 +207,91 @@ class ReceiverSuite extends FunSuite with Timeouts {
)
}


/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved the FakeReceiver out of the ReceiverSuite to avoid serialization issues. The diff is very confusing, so just compare directly.

* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
* Test whether write ahead logs are generated by received,
* and automatically cleaned up. The clean up must be aware of the
* remember duration of the input streams. E.g., input streams on which window()
* has been applied must remember the data for longer, and hence corresponding
* WALs should be cleaned later.
*/
class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
@volatile var otherThread: Thread = null
@volatile var receiving = false
@volatile var onStartCalled = false
@volatile var onStopCalled = false

def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
while(!isStopped()) {
Thread.sleep(10)
}
test("write ahead log - generating and cleaning") {
val sparkConf = new SparkConf()
.setMaster("local[4]") // must be at least 3 as we are going to start 2 receivers
.setAppName(framework)
.set("spark.ui.enabled", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
val batchDuration = Milliseconds(500)
val tempDirectory = Files.createTempDir()
val logDirectory1 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0))
val logDirectory2 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 1))
val allLogFiles1 = new mutable.HashSet[String]()
val allLogFiles2 = new mutable.HashSet[String]()
logInfo("Temp checkpoint directory = " + tempDirectory)

def getBothCurrentLogFiles(): (Seq[String], Seq[String]) = {
(getCurrentLogFiles(logDirectory1), getCurrentLogFiles(logDirectory2))
}

def getCurrentLogFiles(logDirectory: File): Seq[String] = {
try {
if (logDirectory.exists()) {
logDirectory1.listFiles().filter { _.getName.startsWith("log") }.map { _.toString }
} else {
Seq.empty
}
} catch {
case e: Exception =>
Seq.empty
}
onStartCalled = true
otherThread.start()

}

def onStop() {
onStopCalled = true
otherThread.join()
def printLogFiles(message: String, files: Seq[String]) {
logInfo(s"$message (${files.size} files):\n" + files.mkString("\n"))
}

def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
tempDirectory.deleteOnExit()
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiverStream1 = ssc.receiverStream(receiver1)
val receiverStream2 = ssc.receiverStream(receiver2)
receiverStream1.register()
receiverStream2.window(batchDuration * 6).register() // 3 second window
ssc.checkpoint(tempDirectory.getAbsolutePath())
ssc.start()

// Run until sufficient WAL files have been generated and
// the first WAL files has been deleted
eventually(timeout(20 seconds), interval(batchDuration.milliseconds millis)) {
val (logFiles1, logFiles2) = getBothCurrentLogFiles()
allLogFiles1 ++= logFiles1
allLogFiles2 ++= logFiles2
if (allLogFiles1.size > 0) {
assert(!logFiles1.contains(allLogFiles1.toSeq.sorted.head))
}
if (allLogFiles2.size > 0) {
assert(!logFiles2.contains(allLogFiles2.toSeq.sorted.head))
}
assert(allLogFiles1.size >= 7)
assert(allLogFiles2.size >= 7)
}
ssc.stop(stopSparkContext = true, stopGracefully = true)

val sortedAllLogFiles1 = allLogFiles1.toSeq.sorted
val sortedAllLogFiles2 = allLogFiles2.toSeq.sorted
val (leftLogFiles1, leftLogFiles2) = getBothCurrentLogFiles()

printLogFiles("Receiver 0: all", sortedAllLogFiles1)
printLogFiles("Receiver 0: left", leftLogFiles1)
printLogFiles("Receiver 1: all", sortedAllLogFiles2)
printLogFiles("Receiver 1: left", leftLogFiles2)

// Verify that necessary latest log files are not deleted
// receiverStream1 needs to retain just the last batch = 1 log file
// receiverStream2 needs to retain 3 seconds (3-seconds window) = 3 log files
assert(sortedAllLogFiles1.takeRight(1).forall(leftLogFiles1.contains))
assert(sortedAllLogFiles2.takeRight(3).forall(leftLogFiles2.contains))
}
}

Expand Down Expand Up @@ -315,3 +371,42 @@ class ReceiverSuite extends FunSuite with Timeouts {
}
}

/**
* An implementation of Receiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver(sendData: Boolean = false) extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
@volatile var otherThread: Thread = null
@volatile var receiving = false
@volatile var onStartCalled = false
@volatile var onStopCalled = false

def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
var count = 0
while(!isStopped()) {
if (sendData) {
store(count)
count += 1
}
Thread.sleep(10)
}
}
}
onStartCalled = true
otherThread.start()
}

def onStop() {
onStopCalled = true
otherThread.join()
}

def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
}
}