Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added ReceiverTrackerSuite
  • Loading branch information
tdas committed Oct 8, 2014
commit 4d9c830d91af30dc1daaa21064d64d1583e7d9d5
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ private[streaming] class ReceiverSupervisorImpl(

val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, optionalMetadata.orNull, fileSegmentOption)
trackerActor ! AddBlock(blockInfo)
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
Await.result(future, askTimeout)
logDebug("Reported block " + blockId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,23 @@ private[streaming] case class ReportError(streamId: Int, message: String, error:
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
extends ReceiverTrackerMessage

class ReceivedBlockInfoCheckpointer(
logDirectory: String, conf: SparkConf, hadoopConf: Configuration) {

/**
*
*/
private[streaming] class ReceivedBlockInfoCheckpointer(
checkpointDirectory: String, conf: SparkConf, hadoopConf: Configuration) {

import ReceivedBlockInfoCheckpointer._

private val logDirectory = checkpointDirToLogDir(checkpointDirectory)
private val logManager = new WriteAheadLogManager(
logDirectory, hadoopConf, threadPoolName = "ReceiverTracker.WriteAheadLogManager")
logDirectory, hadoopConf, callerName = "ReceiverTracker")

def read(): Iterator[ReceivedBlockInfo] = {
def recover(): Seq[ReceivedBlockInfo] = {
logManager.readFromLog().map { byteBuffer =>
Utils.deserialize[ReceivedBlockInfo](byteBuffer.array)
}
}.toSeq
}

def write(receivedBlockInfo: ReceivedBlockInfo) {
Expand All @@ -78,15 +85,27 @@ class ReceivedBlockInfoCheckpointer(
def clear(threshTime: Long) {
logManager.clearOldLogs(threshTime)
}

def stop() {
logManager.stop()
}
}

private[streaming] object ReceivedBlockInfoCheckpointer {
def checkpointDirToLogDir(checkpointDir: String): String = {
new Path(checkpointDir, "receivedBlockMetadata").toString
}
}

/**
* This class manages the execution of the receivers of NetworkInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
* has been called because it needs the final set of input streams at the time of instantiation.
*
* @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
*/
private[streaming]
class ReceiverTracker(ssc: StreamingContext) extends Logging {
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
Expand All @@ -95,12 +114,9 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
private val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
private val listenerBus = ssc.scheduler.listenerBus
private val receivedBlockCheckpointerOption = Option(ssc.checkpointDir) map { _ =>
new ReceivedBlockInfoCheckpointer(
new Path(ssc.checkpointDir, "receivedBlockMetadata").toString,
ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration
)
val receivedBlockCheckpointerOption = Option(ssc.checkpointDir) map { dir =>
new ReceivedBlockInfoCheckpointer(dir, ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration)
}

// actor is created when generator starts.
Expand All @@ -109,9 +125,11 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
var currentTime: Time = null

receivedBlockCheckpointerOption.foreach { checkpointer =>
checkpointer.read().foreach { info =>
val recoveredBlockInfo = checkpointer.recover()
recoveredBlockInfo.foreach { info =>
getReceivedBlockInfoQueue(info.streamId) += info
}
logInfo(s"Recovered info on ${recoveredBlockInfo.size} blocks from write ahead log")
}

/** Start the actor and receiver execution thread. */
Expand All @@ -123,7 +141,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
"ReceiverTracker")
receiverExecutor.start()
if (!skipReceiverLaunch) receiverExecutor.start()
logInfo("ReceiverTracker started")
}
}
Expand All @@ -132,11 +150,13 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
def stop() = synchronized {
if (!receiverInputStreams.isEmpty && actor != null) {
// First, stop the receivers
receiverExecutor.stop()
if (!skipReceiverLaunch) receiverExecutor.stop()

// Finally, stop the actor
ssc.env.actorSystem.stop(actor)
actor = null

receivedBlockCheckpointerOption.foreach { _.stop() }
logInfo("ReceiverTracker stopped")
}
}
Expand All @@ -148,6 +168,10 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
receivedBlockInfo.toArray
}

def getReceiverInfo(streamId: Int): Option[ReceiverInfo] = {
receiverInfo.get(streamId)
}

private def getReceivedBlockInfoQueue(streamId: Int) = {
receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
}
Expand Down Expand Up @@ -189,10 +213,17 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
}

/** Add new blocks for the given stream */
def addBlock(receivedBlockInfo: ReceivedBlockInfo) {
receivedBlockCheckpointerOption.foreach { _.write(receivedBlockInfo) }
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
logDebug(s"Stream ${receivedBlockInfo.streamId} received block ${receivedBlockInfo.blockId}")
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
receivedBlockCheckpointerOption.foreach { _.write(receivedBlockInfo) }
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
logDebug(s"Stream ${receivedBlockInfo.streamId} received block ${receivedBlockInfo.blockId}")
true
} catch {
case e: Exception =>
logError("Error adding block " + receivedBlockInfo, e)
false
}
}

/** Report error sent by a receiver */
Expand Down Expand Up @@ -226,7 +257,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlock(receivedBlockInfo)
sender ! addBlock(receivedBlockInfo)
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
case DeregisterReceiver(streamId, message, error) =>
Expand Down Expand Up @@ -299,9 +330,11 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
supervisor.start()
logInfo("Supervisor started()")
supervisor.awaitTermination()
logInfo("Supervisor terminated")
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
threadPoolName = "WriteAheadLogBasedHandler.WriteAheadLogManager",
callerName = "WriteAheadLogBasedHandler",
clock = clock
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ private[streaming] class WriteAheadLogManager(
hadoopConf: Configuration,
rollingIntervalSecs: Int = 60,
maxFailures: Int = 3,
threadPoolName: String = "WriteAheadLogManager",
callerName: String = null,
clock: Clock = new SystemClock
) extends Logging {

private val pastLogs = new ArrayBuffer[LogInfo]
implicit private val executionContext =
ExecutionContext.fromExecutorService(Utils.newDaemonFixedThreadPool(1, threadPoolName))
private val threadpoolName = "WriteAheadLogManager" +
Option(callerName).map { s => s" for $s" }.getOrElse("")
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, threadpoolName))

private var currentLogPath: String = null
private var currentLogWriter: WriteAheadLogWriter = null
Expand Down Expand Up @@ -57,9 +59,9 @@ private[streaming] class WriteAheadLogManager(

def readFromLog(): Iterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
println("Reading from the logs: " + logFilesToRead.mkString("\n"))
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
logFilesToRead.iterator.map { file =>
println(s"Creating log reader with $file")
logDebug(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}
Expand All @@ -74,7 +76,7 @@ private[streaming] class WriteAheadLogManager(
*/
def clearOldLogs(threshTime: Long): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
println(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

def deleteFiles() {
Expand All @@ -92,15 +94,17 @@ private[streaming] class WriteAheadLogManager(
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}

Future { deleteFiles() }
if (!executionContext.isShutdown) {
Future { deleteFiles() }
}
}

def stop(): Unit = synchronized {
println("Stopping log manager")
if (currentLogWriter != null) {
currentLogWriter.close()
}
executionContext.shutdown()
logInfo("Stopped log manager")
}

private def getLogWriter: WriteAheadLogWriter = synchronized {
Expand All @@ -124,11 +128,11 @@ private[streaming] class WriteAheadLogManager(
val logDirectoryPath = new Path(logDirectory)
val fs = logDirectoryPath.getFileSystem(hadoopConf)
if (fs.exists(logDirectoryPath) && fs.getFileStatus(logDirectoryPath).isDir) {
val logFiles = fs.listStatus(logDirectoryPath).map { _.getPath }
val logFileInfo = logFilesTologInfo(fs.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFilesTologInfo(logFiles)
logInfo(s"Recovered ${logFiles.size} log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFiles.mkString("\n")}")
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
}
}

Expand All @@ -139,30 +143,6 @@ private[streaming] class WriteAheadLogManager(
currentLogWriter = null
}
}

/*
private def tryMultipleTimes[T](message: String)(body: => T): T = {
var result: T = null.asInstanceOf[T]
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
result = body
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
resetWriter()
failures += 1
logWarning(message, ex)
}
}
if (!succeeded) {
throw new Exception(s"Failed $message after $failures failures", lastException)
}
result
} */
}

private[storage] object WriteAheadLogManager {
Expand All @@ -176,14 +156,14 @@ private[storage] object WriteAheadLogManager {
}

def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
println("Creating log info with " + files.mkString("[","],[","]"))
files.flatMap { file =>
logFileRegex.findFirstIn(file.getName()) match {
case logFileRegex(startTimeStr, stopTimeStr) =>
case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
val startTime = startTimeStr.toLong
val stopTime = stopTimeStr.toLong
Some(LogInfo(startTime, stopTime, file.toString))
case _ => None
case None =>
None
}
}.sortBy { _.startTime }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import java.io.{EOFException, Closeable}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging

private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
extends Iterator[ByteBuffer] with Closeable {
extends Iterator[ByteBuffer] with Closeable with Logging {

private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false
Expand All @@ -41,12 +42,15 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
val buffer = new Array[Byte](length)
instream.readFully(buffer)
nextItem = Some(ByteBuffer.wrap(buffer))
logTrace("Read next item " + nextItem.get)
true
} catch {
case e: EOFException =>
logDebug("Error reading next item, EOF reached", e)
close()
false
case e: Exception =>
logDebug("Error reading next item, EOF reached", e)
close()
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration)

override private[streaming] def close(): Unit = synchronized {
closed = true
hflushOrSync()
stream.close()
}

private def hflushOrSync() {
stream.getWrappedStream.flush()
hflushMethod.foreach(_.invoke(stream))
}

Expand Down
Loading