diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 82704b1ab218..89a4b56e8bd1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1968,7 +1968,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli for (className <- listenerClassNames) { // Use reflection to find the right constructor val constructors = { - val listenerClass = Class.forName(className) + val listenerClass = org.apache.spark.util.Utils.classForName(className) listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] } val constructorTakingSparkConf = constructors.find { c => diff --git a/docs/configuration.md b/docs/configuration.md index 892c02b27df3..253edcc1ed4f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1493,6 +1493,17 @@ Apart from these, the following properties are also available, and may be useful How many batches the Spark Streaming UI and status APIs remember before garbage collecting. + + spark.streaming.extraListeners + (none) + + A comma-separated list of classes that implement StreamingListener; when initializing + StreamingContext, instances of these classes will be created and registered with StreamingContext's listener + bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor + will be called; otherwise, a zero-argument constructor will be called. If no valid constructor + can be found, the StreamingContext creation will fail with an exception. + + #### SparkR diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f42d12..8e9b724c9b4f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{InputStream, NotSerializableException} +import java.lang.reflect.Constructor import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map @@ -202,6 +203,7 @@ class StreamingContext private[streaming] ( private val startSite = new AtomicReference[CallSite](null) + setupStreamingListeners() private var shutdownHookRef: AnyRef = _ /** @@ -538,6 +540,46 @@ class StreamingContext private[streaming] ( scheduler.listenerBus.addListener(streamingListener) } + /** + * Registers streamingListeners specified in spark.streaming.extraListeners + */ + private def setupStreamingListeners(): Unit = { + // Use reflection to instantiate listeners specified via `spark.streaming.extraListeners` + val listenerClassNames: Seq[String] = + conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "") + for (className <- listenerClassNames) { + // Use reflection to find the right constructor + val constructors = { + val listenerClass = org.apache.spark.util.Utils.classForName(className) + listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] + } + val constructorTakingSparkConf = constructors.find { c => + c.getParameterTypes.sameElements(Array(classOf[SparkConf])) + } + lazy val zeroArgumentConstructor = constructors.find { c => + c.getParameterTypes.isEmpty + } + val listener: StreamingListener = { + if (constructorTakingSparkConf.isDefined) { + constructorTakingSparkConf.get.newInstance(conf) + } else if (zeroArgumentConstructor.isDefined) { + zeroArgumentConstructor.get.newInstance() + } else { + throw new SparkException( + "Exception when registering Streaming Listener:" + + s" $className did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf. Note: if the class is" + + " defined inside of another Scala class, then its constructors may accept an" + + " implicit parameter that references the enclosing class; in this case, you must" + + " define the listener as a top-level class in order to prevent this extra" + + " parameter from breaking Spark's ability to find a valid constructor.") + } + } + addStreamingListener(listener) + logInfo(s"Registered StreamingListener $className") + } + } + private def validate() { assert(graph != null, "Graph is null") graph.validate() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 74dbba453f02..82891bbe6bc6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.scheduler +import org.apache.spark.scheduler.StatsReportListener + import scala.collection.mutable.Queue import org.apache.spark.util.Distribution @@ -81,10 +83,13 @@ trait StreamingListener { /** * :: DeveloperApi :: * A simple StreamingListener that logs summary statistics across Spark Streaming batches - * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) + * @param numBatchInfos Number of last batches to consider for generating statistics (default: 100) */ @DeveloperApi -class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { +class StatsReportListener(numBatchInfos: Int) extends StreamingListener { + + def this() = this(100) + // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 4bc1dd4a30fc..3007fea1b9b1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global @@ -26,10 +27,10 @@ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ -import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.Logging +import org.scalatest.{Matchers, Assertions} +import org.apache.spark.{SparkException, SparkConf, Logging} class StreamingListenerSuite extends TestSuiteBase with Matchers { @@ -140,6 +141,37 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } true } + + test("registering listeners via spark.streaming.extraListeners") { + // Test for success with zero-argument constructor and sparkConf constructor + val conf = new SparkConf().setMaster("local").setAppName("test") + .set("spark.streaming.extraListeners", + classOf[StreamingListenerThatAcceptsSparkConf].getName + "," + + classOf[ReceiverInfoCollector].getName) + val scc = new StreamingContext(conf, Seconds(1)) + + scc.scheduler.listenerBus.listeners.exists { + _.isInstanceOf[StreamingListenerThatAcceptsSparkConf] } + scc.scheduler.listenerBus.listeners.exists { + _.isInstanceOf[ReceiverInfoCollector] } + + // Test for failure with too many arguments in constructor + val failingConf = new SparkConf().setMaster("local").setAppName("failingTest") + .set("spark.streaming.extraListeners", classOf[StreamingListenerTooManyArguments].getName) + val thrown = intercept[SparkException] { + val failingScc = new StreamingContext(failingConf, Seconds(1)) + } + val expectedErrorMessage = + "Exception when registering Streaming Listener:" + + " org.apache.spark.streaming.StreamingListenerTooManyArguments" + + " did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf. Note: if the class is" + + " defined inside of another Scala class, then its constructors may accept an" + + " implicit parameter that references the enclosing class; in this case, you must" + + " define the listener as a top-level class in order to prevent this extra" + + " parameter from breaking Spark's ability to find a valid constructor." + assert(thrown.getMessage === expectedErrorMessage) + } } /** Listener that collects information on processed batches */ @@ -196,3 +228,11 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O } def onStop() { } } + +class StreamingListenerThatAcceptsSparkConf(conf: SparkConf) extends StreamingListener { + // Empty dummy class used for testing +} + +class StreamingListenerTooManyArguments(conf: SparkConf, failInt: Int) extends StreamingListener { + // Empty dummy class used for testing +}