Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed compilation errors.
  • Loading branch information
tdas committed Oct 4, 2014
commit a49fd1d7a30d9fe3e38211e8703a7ec53636d0cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ private[streaming] case class ReportError(streamId: Int, message: String, error:
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
extends ReceiverTrackerMessage


class ReceivedBlockInfoCheckpointer(
logDirectory: String, conf: SparkConf, hadoopConf: Configuration) {

private val logManager = new WriteAheadLogManager(logDirectory, conf, hadoopConf)
private val logManager = new WriteAheadLogManager(
logDirectory, conf, hadoopConf, "ReceiverTracker.WriteAheadLogManager")

def read(): Iterator[ReceivedBlockInfo] = {
logManager.readFromLog().map { byteBuffer =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package org.apache.spark.streaming.storage

import scala.language.postfixOps

import java.nio.ByteBuffer
import java.util.concurrent.Executors

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.util.Utils

private[streaming] sealed trait ReceivedBlock
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
Expand Down Expand Up @@ -56,14 +55,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

private val logManager = new WriteAheadLogManager(
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString,
conf, hadoopConf
conf, hadoopConf, "WriteAheadLogBasedHandler.WriteAheadLogManager"
)

private val blockStoreTimeout =
conf.getInt("spark.streaming.receiver.blockStoreTimeout", 30) seconds

implicit private val executionContext =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, "WriteAheadLogBasedBlockHandler"))

def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
val serializedBlock = receivedBlock match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package org.apache.spark.streaming.storage

import java.nio.ByteBuffer
import java.util.concurrent.Executors

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils

class WriteAheadLogManager(logDirectory: String, conf: SparkConf, hadoopConf: Configuration)
extends Logging {
private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkConf,
hadoopConf: Configuration, threadPoolName: String = "WriteAheadLogManager") extends Logging {

private case class LogInfo(startTime: Long, endTime: Long, path: String)

Expand All @@ -21,8 +21,9 @@ class WriteAheadLogManager(logDirectory: String, conf: SparkConf, hadoopConf: Co
conf.getInt("spark.streaming.wal.maxRetries", 3)
private val pastLogs = new ArrayBuffer[LogInfo]
implicit private val executionContext =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
ExecutionContext.fromExecutorService(Utils.newDaemonFixedThreadPool(1, threadPoolName))

private var currentLogPath: String = null
private var currentLogWriter: WriteAheadLogWriter = null
private var currentLogWriterStartTime: Long = -1L

Expand Down Expand Up @@ -84,9 +85,10 @@ class WriteAheadLogManager(logDirectory: String, conf: SparkConf, hadoopConf: Co
val currentTime = System.currentTimeMillis
if (currentLogWriter == null ||
currentTime - currentLogWriterStartTime > logWriterChangeIntervalSeconds * 1000) {
pastLogs += LogInfo(currentLogWriterStartTime, currentTime, currentLogWriter.path)
pastLogs += LogInfo(currentLogWriterStartTime, currentTime, currentLogPath)
val newLogPath = new Path(logDirectory, s"data-$currentTime".toString)
currentLogWriter = new WriteAheadLogWriter(newLogPath.toString, hadoopConf)
currentLogPath = newLogPath.toString
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
currentLogWriterStartTime = currentTime
}
currentLogWriter
Expand All @@ -98,11 +100,13 @@ class WriteAheadLogManager(logDirectory: String, conf: SparkConf, hadoopConf: Co
}
}

object WriteAheadLogManager {
private[storage] object WriteAheadLogManager {
def logsToIterator(
chronologicallySortedLogFiles: Seq[String],
hadoopConf: Configuration
): Iterator[ByteBuffer] = {
chronologicallySortedLogFiles map { new WriteAheadLogReader(_, hadoopConf) } flatMap { x => x }
chronologicallySortedLogFiles.iterator.map { file =>
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}
}