Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Moved around packages.
  • Loading branch information
tdas committed Oct 21, 2014
commit acd94673948e4433fb216df5577c7f9e669c3bbc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.streaming.storage.{FileSegment, WriteAheadLogManager}
import org.apache.spark.streaming.storage.{ReceivedBlockTracker, FileSegment, WriteAheadLogManager}
import org.apache.spark.util.Utils

/** Information about blocks received by the receiver */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends Re

private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef]
def clearOldBlocks(threshTime: Long)
def cleanupOldBlock(threshTime: Long)
}

private[streaming] class BlockManagerBasedBlockHandler(
Expand All @@ -45,7 +45,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
None
}

def clearOldBlocks(threshTime: Long) {
def cleanupOldBlock(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
Expand Down Expand Up @@ -104,7 +104,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
Some(Await.result(combinedFuture, blockStoreTimeout))
}

def clearOldBlocks(threshTime: Long) {
def cleanupOldBlock(threshTime: Long) {
logManager.cleanupOldLogs(threshTime)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.spark.streaming.scheduler
package org.apache.spark.streaming.storage

import java.nio.ByteBuffer

Expand All @@ -13,6 +13,7 @@ import org.apache.spark.streaming.storage.WriteAheadLogManager
import org.apache.spark.util.Utils
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.Clock
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo

private[streaming] sealed trait ReceivedBlockTrackerRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.storage.WriteAheadLogReader
import org.apache.spark.streaming.storage._
import org.apache.spark.streaming.storage.WriteAheadLogSuite._
import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock}
import org.apache.spark.util.Utils
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.storage.StreamBlockId
import scala.Some
import org.apache.spark.streaming.storage.BlockAddition

class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
manualClock.currentTime() shouldEqual 5000L

val cleanupThreshTime = 3000L
receivedBlockHandler.clearOldBlocks(cleanupThreshTime)
receivedBlockHandler.cleanupOldBlock(cleanupThreshTime)
eventually(timeout(10000 millis), interval(10 millis)) {
getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
}
Expand Down