Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Reimplimented FlumeStreamSuite to be more robust.
  • Loading branch information
tdas committed Oct 12, 2014
commit 93cd7f62bf31c9015f30eb25439d368bac3c57c5
Original file line number Diff line number Diff line change
Expand Up @@ -17,103 +17,141 @@

package org.apache.spark.streaming.flume

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import java.net.InetSocketAddress
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
import java.nio.charset.Charset

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
import org.apache.spark.util.Utils

import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.handler.codec.compression._
class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")

var ssc: StreamingContext = null
var transceiver: NettyTransceiver = null

class FlumeStreamSuite extends TestSuiteBase {
after {
if (ssc != null) {
ssc.stop()
}
if (transceiver != null) {
transceiver.close()
}
}

test("flume input stream") {
runFlumeStreamTest(false)
testFlumeStream(testCompression = false)
}

test("flume input compressed stream") {
runFlumeStreamTest(true)
testFlumeStream(testCompression = true)
}

/** Run test on flume stream */
private def testFlumeStream(testCompression: Boolean): Unit = {
val input = (1 to 100).map { _.toString }
val testPort = findFreePort()
val outputBuffer = startContext(testPort, testCompression)
writeAndVerify(input, testPort, outputBuffer, testCompression)
}

/** Find a free port */
private def findFreePort(): Int = {
Utils.startServiceOnPort(23456, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}

def runFlumeStreamTest(enableDecompression: Boolean) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val (flumeStream, testPort) =
Utils.startServiceOnPort(9997, (trialPort: Int) => {
val dstream = FlumeUtils.createStream(
ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
(dstream, trialPort)
})

/** Setup and start the streaming context */
private def startContext(
testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
ssc = new StreamingContext(conf, Milliseconds(200))
val flumeStream = FlumeUtils.createStream(
ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
outputBuffer
}

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
var client: AvroSourceProtocol = null

if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)))
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
}
/** Send data to the flume receiver and verify whether the data was received */
private def writeAndVerify(
input: Seq[String],
testPort: Int,
outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
enableCompression: Boolean
) {
val testAddress = new InetSocketAddress("localhost", testPort)

for (i <- 0 until input.size) {
val inputEvents = input.map { item =>
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
event
}

Thread.sleep(1000)

val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to retry this test multiple times? The usual case where the test fails is mainly because of bind issues, correct? Since findFreePort (sort of) takes care of that..this does not seem to help.

There is a small race condition that can be taken care of, using eventually though - where the free port is taken before the bind, in which case we can use a new free port, by calling findFreePort inside the eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I found that a lot of time the test was failing because of the uncertainty on when the Flume receiver is ready to receive the new connection. Even after the connection gets accepted, sending data does not return Status.OK (and the data that streaming receives has empty strings). I am not sure what is the reason behind this but this seems like a fairly robust way to send all the data once in a single shot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sounds good!

// if last attempted transceiver had succeeded, close it
if (transceiver != null) {
transceiver.close()
transceiver = null
}

// Create transceiver
transceiver = {
if (enableCompression) {
new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
} else {
new NettyTransceiver(testAddress)
}
}

// Create Avro client with the transceiver
val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
client should not be null

// Send data
val status = client.appendBatch(inputEvents.toList)
status should be (avro.Status.OK)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()

val decoder = Charset.forName("UTF-8").newDecoder()

assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")

val decoder = Charset.forName("UTF-8").newDecoder()
eventually(timeout(10 seconds), interval(100 milliseconds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is absolutely necessary! Since we have no control over how long the spark streaming will take to receive and run the jobs and return the data, this eventually tests repeatedly whether the current state of the output buffer has all the records that were sent or not.

This was handler earlier using manual clock increments and sleeps to allow the job to complete, which was (unnecessarily) increasing processing times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks!

val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
}
val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
output should be (input)
}
}

class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
/** Class to create socket channel with compression */
private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
val encoder = new ZlibEncoder(compressionLevel)
pipeline.addFirst("deflater", encoder)
Expand Down