Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address TD's comments
  • Loading branch information
zsxwing committed Jun 30, 2015
commit f1bf3c05e4582c95025e0bd7e1876062db4be1ed
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private class FlumeUtilsPythonHelper {
enableDecompression: Boolean
): JavaPairDStream[Array[Byte], Array[Byte]] = {
val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
}

def createPollingStream(
Expand All @@ -274,7 +274,7 @@ private class FlumeUtilsPythonHelper {
}
val dstream = FlumeUtils.createPollingStream(
jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
}

}
Expand All @@ -297,7 +297,7 @@ private object FlumeUtilsPythonHelper {
}
}

private def toDStreamForPython(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
JavaPairDStream[Array[Byte], Array[Byte]] = {
dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ import org.apache.spark.streaming.flume.sink.{SparkSinkConfig, SparkSink}
private[flume] class PollingFlumeTestUtils {

private val batchCount = 5
private val eventsPerBatch = 100
val eventsPerBatch = 100
private val totalEventsPerChannel = batchCount * eventsPerBatch
private val channelCapacity = 5000

def getEventsPerBatch: Int = eventsPerBatch

def getTotalEvents: Int = totalEventsPerChannel * channels.size

private val channels = new ArrayBuffer[MemoryChannel]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,9 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")

var utils = new PollingFlumeTestUtils

def beforeFunction() {
logInfo("Using manual clock")
conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
}

before(beforeFunction())
val utils = new PollingFlumeTestUtils

test("flume polling test") {
testMultipleTimes(testFlumePolling)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ def _writeAndVerify(self, ports):
dstream = FlumeUtils.createPollingStream(
ssc,
addresses,
maxBatchSize=self._utils.getEventsPerBatch(),
maxBatchSize=self._utils.eventsPerBatch(),
parallelism=5)
outputBuffer = []

Expand Down