Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial version of ReceivedDataManager
  • Loading branch information
tdas committed Oct 4, 2014
commit a33bf483760f6721dfd80ea96a1ad89a89d6c2f1
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
import org.apache.spark.streaming.scheduler.RegisterReceiver
import org.apache.spark.streaming.storage._

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -47,9 +48,8 @@ private[streaming] class ReceiverSupervisorImpl(
env: SparkEnv
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val blockManager = env.blockManager

private val storageLevel = receiver.storageLevel
private val receivedBlockHandler =
new BlockManagerBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel)

/** Remote Akka actor for the ReceiverTracker */
private val trackerActor = {
Expand Down Expand Up @@ -108,11 +108,7 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), optionalMetadata, optionalBlockId)
}

/** Store a iterator of received data as a data block into Spark's memory. */
Expand All @@ -121,11 +117,7 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
pushAndReportBlock(IteratorBlock(iterator), optionalMetadata, optionalBlockId)
}

/** Store the bytes of received data as a data block into Spark's memory. */
Expand All @@ -134,16 +126,30 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
pushAndReportBlock(ByteBufferBlock(bytes), optionalMetadata, optionalBlockId)
}

/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
}

val time = System.currentTimeMillis
blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
}
val fileSegmentOption = receivedBlockHandler.store(blockId, receivedBlock) match {
case f: FileSegment => Some(f)
case _ => None
}
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")

/** Report pushed block */
def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, optionalMetadata.orNull, fileSegmentOption)
trackerActor ! AddBlock(blockInfo)
logDebug("Reported block " + blockId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import scala.language.existentials

import akka.actor._
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils
import org.apache.spark.streaming.storage.{WriteAheadLogManager, FileSegment}
import org.apache.spark.util.Utils
import org.apache.hadoop.fs.Path
import java.nio.ByteBuffer

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any
metadata: Any,
fileSegmentOption: Option[FileSegment]
)

/**
Expand All @@ -53,6 +58,28 @@ private[streaming] case class ReportError(streamId: Int, message: String, error:
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
extends ReceiverTrackerMessage


class ReceivedBlockInfoCheckpointer(
logDirectory: String, conf: SparkConf, hadoopConf: Configuration) {

private val logManager = new WriteAheadLogManager(logDirectory, conf, hadoopConf)

def read(): Iterator[ReceivedBlockInfo] = {
logManager.readFromLog().map { byteBuffer =>
Utils.deserialize[ReceivedBlockInfo](byteBuffer.array)
}
}

def write(receivedBlockInfo: ReceivedBlockInfo) {
val bytes = Utils.serialize(receivedBlockInfo)
logManager.writeToLog(ByteBuffer.wrap(bytes))
}

def clear(threshTime: Long) {
logManager.clear(threshTime)
}
}

/**
* This class manages the execution of the receivers of NetworkInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
Expand All @@ -61,20 +88,32 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, err
private[streaming]
class ReceiverTracker(ssc: StreamingContext) extends Logging {

val receiverInputStreams = ssc.graph.getReceiverInputStreams()
val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverLauncher()
val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
private val receiverExecutor = new ReceiverLauncher()
private val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
private val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)
val listenerBus = ssc.scheduler.listenerBus
private val listenerBus = ssc.scheduler.listenerBus
private val receivedBlockCheckpointerOption = Option(ssc.checkpointDir) map { _ =>
new ReceivedBlockInfoCheckpointer(
new Path(ssc.checkpointDir, "receivedBlockMetadata").toString,
ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration
)
}

// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
var actor: ActorRef = null
var currentTime: Time = null

receivedBlockCheckpointerOption.foreach { checkpointer =>
checkpointer.read().foreach { info =>
getReceivedBlockInfoQueue(info.streamId) += info
}
}

/** Start the actor and receiver execution thread. */
def start() = synchronized {
if (actor != null) {
Expand Down Expand Up @@ -126,7 +165,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
}
receiverInfo(streamId) = ReceiverInfo(
streamId, s"${typ}-${streamId}", receiverActor, true, host)
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
}

Expand All @@ -140,7 +179,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
receiverInfo(streamId) = newReceiverInfo
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
Expand All @@ -150,10 +189,10 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
}

/** Add new blocks for the given stream */
def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
def addBlock(receivedBlockInfo: ReceivedBlockInfo) {
receivedBlockCheckpointerOption.foreach { _.write(receivedBlockInfo) }
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
receivedBlockInfo.blockId)
logDebug(s"Stream ${receivedBlockInfo.streamId} received block ${receivedBlockInfo.blockId}")
}

/** Report error sent by a receiver */
Expand All @@ -166,7 +205,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
receiverInfo(streamId) = newReceiverInfo
ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
Expand All @@ -187,7 +226,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
addBlock(receivedBlockInfo)
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
case DeregisterReceiver(streamId, message, error) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.apache.spark.streaming.storage

import scala.language.postfixOps

import java.nio.ByteBuffer
import java.util.concurrent.Executors

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockManager, StorageLevel, StreamBlockId}

private[streaming] sealed trait ReceivedBlock
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock

private[streaming] trait ReceivedBlockHandler {
def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef]
def clear(threshTime: Long) { }
}

private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager,
streamId: Int,
storageLevel: StorageLevel
) extends ReceivedBlockHandler {

def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
case IteratorBlock(iterator) =>
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}
None
}
}

private[streaming] class WriteAheadLogBasedBlockHandler(
blockManager: BlockManager,
streamId: Int,
storageLevel: StorageLevel,
conf: SparkConf,
hadoopConf: Configuration,
checkpointDir: String
) extends ReceivedBlockHandler with Logging {

private val logManager = new WriteAheadLogManager(
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString,
conf, hadoopConf
)

private val blockStoreTimeout =
conf.getInt("spark.streaming.receiver.blockStoreTimeout", 30) seconds

implicit private val executionContext =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))

def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
val serializedBlock = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
blockManager.dataSerialize(blockId, iterator)
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}

val pushToBlockManagerFuture = Future {
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
}
val pushToLogFuture = Future { logManager.writeToLog(serializedBlock) }
val combinedFuture = for {
_ <- pushToBlockManagerFuture
fileSegment <- pushToLogFuture
} yield fileSegment

Some(Await.result(combinedFuture, blockStoreTimeout))
}
}
Loading