From 73549ffa481038fc01b54f196ceb570b180231d2 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 21 May 2015 11:45:26 -0600 Subject: [PATCH 01/18] Add func setupStreamingListeners to add listeners from spark.streamingListeners --- .../spark/streaming/StreamingContext.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 160fc42c57d1..eaad3d14d10c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{InputStream, NotSerializableException} +import java.lang.reflect.Constructor import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map @@ -201,6 +202,8 @@ class StreamingContext private[streaming] ( private val startSite = new AtomicReference[CallSite](null) + setupStreamingListeners() + /** * Return the associated Spark context */ @@ -525,6 +528,54 @@ class StreamingContext private[streaming] ( scheduler.listenerBus.addListener(streamingListener) } + /** + * Registers streamingListeners specified in spark.streamingListeners + */ + private def setupStreamingListeners(): Unit = { + // Use reflection to instantiate listeners specified via `spark.extraListeners` + try { + val listenerClassNames: Seq[String] = + conf.get("spark.streamingListeners", "").split(',').map(_.trim).filter(_ != "") + for (className <- listenerClassNames) { + // Use reflection to find the right constructor + val constructors = { + val listenerClass = Class.forName(className) + listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] + } + val constructorTakingSparkConf = constructors.find { c => + c.getParameterTypes.sameElements(Array(classOf[SparkConf])) + } + lazy val zeroArgumentConstructor = constructors.find { c => + c.getParameterTypes.isEmpty + } + val listener: StreamingListener = { + if (constructorTakingSparkConf.isDefined) { + constructorTakingSparkConf.get.newInstance(conf) + } else if (zeroArgumentConstructor.isDefined) { + zeroArgumentConstructor.get.newInstance() + } else { + throw new SparkException( + s"$className did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf. Note: if the class is" + + " defined inside of another Scala class, then its constructors may accept an" + + " implicit parameter that references the enclosing class; in this case, you must" + + " define the listener as a top-level class in order to prevent this extra" + + " parameter from breaking Spark's ability to find a valid constructor.") + } + } + scheduler.listenerBus.addListener(listener) + logInfo(s"Registered streaming listener $className") + } + } catch { + case e: Exception => + try { + stop() + } finally { + throw new SparkException(s"Exception when registering StreamingListener", e) + } + } + } + private def validate() { assert(graph != null, "Graph is null") graph.validate() From f8ad6293ccd8dcd8c0c58b7a6a1ea2fe378c204b Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Sat, 23 May 2015 12:08:19 -0600 Subject: [PATCH 02/18] Use class API to add listener. Modify StatsReportListener to have a zero argument constructor in order to make it compatible with config option. --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- .../apache/spark/streaming/scheduler/StreamingListener.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index bc130108dea4..bdf30120d550 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -564,7 +564,7 @@ class StreamingContext private[streaming] ( " parameter from breaking Spark's ability to find a valid constructor.") } } - scheduler.listenerBus.addListener(listener) + addStreamingListener(listener) logInfo(s"Registered streaming listener $className") } } catch { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 74dbba453f02..602848891105 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.scheduler +import org.apache.spark.scheduler.StatsReportListener + import scala.collection.mutable.Queue import org.apache.spark.util.Distribution @@ -85,6 +87,9 @@ trait StreamingListener { */ @DeveloperApi class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { + + def this() = this(10) + // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() From 3c1a19d70d1a20a882f1a44215a7a0bb6a5320c6 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Sat, 23 May 2015 12:15:59 -0600 Subject: [PATCH 03/18] Reduced ambiguity by removing default values from StatsReportListener's single-valued constructor. Added docs for config option spark.streamingListeners --- docs/configuration.md | 11 +++++++++++ .../spark/streaming/scheduler/StreamingListener.scala | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 30508a617fdd..47b556bd3fd9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1481,6 +1481,17 @@ Apart from these, the following properties are also available, and may be useful How many batches the Spark Streaming UI and status APIs remember before garbage collecting. + + spark.streamingListeners + (none) + + A comma-separated list of classes that implement StreamingListener; when initializing + SparkContext, instances of these classes will be created and registered with Spark's listener + bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor + will be called; otherwise, a zero-argument constructor will be called. If no valid constructor + can be found, the SparkContext creation will fail with an exception. + + #### Cluster Managers diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 602848891105..d981c1a642d4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -86,7 +86,7 @@ trait StreamingListener { * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) */ @DeveloperApi -class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { +class StatsReportListener(numBatchInfos: Int) extends StreamingListener { def this() = this(10) From f3a3fee1af2ada6eb430894f4421ef6bfbd76ad5 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Sat, 23 May 2015 12:57:13 -0600 Subject: [PATCH 04/18] Add test --- .../streaming/StreamingListenerSuite.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 312cce408cfe..af499d82d09e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming: import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global @@ -29,7 +30,7 @@ import org.apache.spark.streaming.scheduler._ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} class StreamingListenerSuite extends TestSuiteBase with Matchers { @@ -138,6 +139,18 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } true } + + test("registering listeners via spark.streamingListeners") { + val conf = new SparkConf().setMaster("local").setAppName("test") + .set("spark.streamingListeners", classOf[BatchInfoCollector].getName + "," + + classOf[ReceiverInfoCollector].getName) + val scc = new StreamingContext(conf, Seconds(1)) + + scc.scheduler.listenerBus.listeners.collect { case x: BatchInfoCollector => x}.size should be (1) + scc.scheduler.listenerBus.listeners.collect { + case x: ReceiverInfoCollector => + }.size should be (1) + } } /** Listener that collects information on processed batches */ From e8506d4087c174cfa6e1994f66c1f30560ba8dc7 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Sat, 23 May 2015 13:01:19 -0600 Subject: [PATCH 05/18] Fix minor syntax derp --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index af499d82d09e..95f32f0ca0f4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming: +package org.apache.spark.streaming import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.collection.JavaConversions._ From 6453c90cffd17705c22f214b67f03eedca8baf24 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Sun, 24 May 2015 08:58:19 -0600 Subject: [PATCH 06/18] Fix style issues and small type in StreamingListenerSuite --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 95f32f0ca0f4..16a6879192b3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -146,10 +146,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { classOf[ReceiverInfoCollector].getName) val scc = new StreamingContext(conf, Seconds(1)) - scc.scheduler.listenerBus.listeners.collect { case x: BatchInfoCollector => x}.size should be (1) scc.scheduler.listenerBus.listeners.collect { - case x: ReceiverInfoCollector => - }.size should be (1) + case x: BatchInfoCollector => x }.size should be (1) + scc.scheduler.listenerBus.listeners.collect { + case x: ReceiverInfoCollector => x }.size should be (1) } } From d92d55bc5c4813dbeb0aec8697ff6ea924b1eccd Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Tue, 26 May 2015 20:30:03 -0600 Subject: [PATCH 07/18] Maintain consistancy with property naming conventions --- docs/configuration.md | 2 +- .../scala/org/apache/spark/streaming/StreamingContext.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 47b556bd3fd9..8e1c47c330ea 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1482,7 +1482,7 @@ Apart from these, the following properties are also available, and may be useful - spark.streamingListeners + spark.streaming.listeners (none) A comma-separated list of classes that implement StreamingListener; when initializing diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index bdf30120d550..fb44f73fb634 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -530,13 +530,13 @@ class StreamingContext private[streaming] ( } /** - * Registers streamingListeners specified in spark.streamingListeners + * Registers streamingListeners specified in spark.streaming.listeners */ private def setupStreamingListeners(): Unit = { // Use reflection to instantiate listeners specified via `spark.extraListeners` try { val listenerClassNames: Seq[String] = - conf.get("spark.streamingListeners", "").split(',').map(_.trim).filter(_ != "") + conf.get("spark.streaming.listeners", "").split(',').map(_.trim).filter(_ != "") for (className <- listenerClassNames) { // Use reflection to find the right constructor val constructors = { From 00c0409ca2fa3d62859d2c7ac0d2fdddbf4e98f8 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Wed, 27 May 2015 17:16:39 -0600 Subject: [PATCH 08/18] Fix test for new format --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 16a6879192b3..202e07e78dda 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -142,7 +142,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { test("registering listeners via spark.streamingListeners") { val conf = new SparkConf().setMaster("local").setAppName("test") - .set("spark.streamingListeners", classOf[BatchInfoCollector].getName + "," + + .set("spark.streaming.listeners", classOf[BatchInfoCollector].getName + "," + classOf[ReceiverInfoCollector].getName) val scc = new StreamingContext(conf, Seconds(1)) From c94982f25f57abf488bc75a253be44e3bfbab20d Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Wed, 27 May 2015 20:08:32 -0600 Subject: [PATCH 09/18] Test renamed to spark.streaming.listeners --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 202e07e78dda..45b8ea17bb34 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -140,7 +140,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { true } - test("registering listeners via spark.streamingListeners") { + test("registering listeners via spark.streaming.listeners") { val conf = new SparkConf().setMaster("local").setAppName("test") .set("spark.streaming.listeners", classOf[BatchInfoCollector].getName + "," + classOf[ReceiverInfoCollector].getName) From d7e7b2ec2bcc5401a1de44bfc8429c5077a50e9d Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 9 Jul 2015 20:13:21 -0600 Subject: [PATCH 10/18] Update name to spark.streaming.extraListeners / fix docs / change Class.forName to org.apache.spark.util.Utils.classForName --- docs/configuration.md | 6 +++--- .../org/apache/spark/streaming/StreamingContext.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 802c58335ee6..46da11cca4ac 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1494,14 +1494,14 @@ Apart from these, the following properties are also available, and may be useful - spark.streaming.listeners + spark.streaming.extraListeners (none) A comma-separated list of classes that implement StreamingListener; when initializing - SparkContext, instances of these classes will be created and registered with Spark's listener + SparkContext, instances of these classes will be created and registered with Spark's streaming listener bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor will be called; otherwise, a zero-argument constructor will be called. If no valid constructor - can be found, the SparkContext creation will fail with an exception. + can be found, the StreamingContext creation will fail with an exception. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1c67453b50dc..70fed9dec119 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -541,17 +541,17 @@ class StreamingContext private[streaming] ( } /** - * Registers streamingListeners specified in spark.streaming.listeners + * Registers streamingListeners specified in spark.streaming.extraListeners */ private def setupStreamingListeners(): Unit = { // Use reflection to instantiate listeners specified via `spark.extraListeners` try { val listenerClassNames: Seq[String] = - conf.get("spark.streaming.listeners", "").split(',').map(_.trim).filter(_ != "") + conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "") for (className <- listenerClassNames) { // Use reflection to find the right constructor val constructors = { - val listenerClass = Class.forName(className) + val listenerClass = org.apache.spark.util.Utils.classForName(className) listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] } val constructorTakingSparkConf = constructors.find { c => @@ -576,7 +576,7 @@ class StreamingContext private[streaming] ( } } addStreamingListener(listener) - logInfo(s"Registered streaming listener $className") + logInfo(s"Registered StreamingListener $className") } } catch { case e: Exception => From 33d3179c141c74214866e573147018dd36444b78 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 9 Jul 2015 20:19:17 -0600 Subject: [PATCH 11/18] Default to 100 --- .../apache/spark/streaming/scheduler/StreamingListener.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index d981c1a642d4..82891bbe6bc6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -83,12 +83,12 @@ trait StreamingListener { /** * :: DeveloperApi :: * A simple StreamingListener that logs summary statistics across Spark Streaming batches - * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) + * @param numBatchInfos Number of last batches to consider for generating statistics (default: 100) */ @DeveloperApi class StatsReportListener(numBatchInfos: Int) extends StreamingListener { - def this() = this(10) + def this() = this(100) // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() From 233335d734c8f58253b1b56a2fa07bf8faa1efdb Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 9 Jul 2015 20:21:38 -0600 Subject: [PATCH 12/18] Update Class.forName to org.apache.spark.util.Utils.classForName to avoid problems with classloaders --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 82704b1ab218..89a4b56e8bd1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1968,7 +1968,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli for (className <- listenerClassNames) { // Use reflection to find the right constructor val constructors = { - val listenerClass = Class.forName(className) + val listenerClass = org.apache.spark.util.Utils.classForName(className) listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] } val constructorTakingSparkConf = constructors.find { c => From 28e7f2726a35f5063655eae0b1abcda32f87f8c8 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Fri, 10 Jul 2015 14:01:20 -0600 Subject: [PATCH 13/18] Use spark.streaming.extraListeners --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 9c96543d5188..3cc033426175 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -144,7 +144,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { test("registering listeners via spark.streaming.listeners") { val conf = new SparkConf().setMaster("local").setAppName("test") - .set("spark.streaming.listeners", classOf[BatchInfoCollector].getName + "," + + .set("spark.streaming.extraListeners", classOf[BatchInfoCollector].getName + "," + classOf[ReceiverInfoCollector].getName) val scc = new StreamingContext(conf, Seconds(1)) From af41098fe40df289fdd5e9e3e91435a6b32754f5 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Fri, 10 Jul 2015 17:40:51 -0600 Subject: [PATCH 14/18] checking --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 3cc033426175..bebf62f5744b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -142,7 +142,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { true } - test("registering listeners via spark.streaming.listeners") { + test("registering listeners via spark.streaming.extraListeners") { val conf = new SparkConf().setMaster("local").setAppName("test") .set("spark.streaming.extraListeners", classOf[BatchInfoCollector].getName + "," + classOf[ReceiverInfoCollector].getName) From 9ec68c15b6b836c5c4489ffc9925f43c7507f427 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Mon, 13 Jul 2015 16:43:08 -0600 Subject: [PATCH 15/18] Changes in clarity, removed redundant error handling --- .../spark/streaming/StreamingContext.scala | 70 ++++++++----------- .../streaming/StreamingListenerSuite.scala | 6 +- 2 files changed, 33 insertions(+), 43 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 70fed9dec119..49330294611a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -544,47 +544,39 @@ class StreamingContext private[streaming] ( * Registers streamingListeners specified in spark.streaming.extraListeners */ private def setupStreamingListeners(): Unit = { - // Use reflection to instantiate listeners specified via `spark.extraListeners` - try { - val listenerClassNames: Seq[String] = - conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "") - for (className <- listenerClassNames) { - // Use reflection to find the right constructor - val constructors = { - val listenerClass = org.apache.spark.util.Utils.classForName(className) - listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] - } - val constructorTakingSparkConf = constructors.find { c => - c.getParameterTypes.sameElements(Array(classOf[SparkConf])) - } - lazy val zeroArgumentConstructor = constructors.find { c => - c.getParameterTypes.isEmpty - } - val listener: StreamingListener = { - if (constructorTakingSparkConf.isDefined) { - constructorTakingSparkConf.get.newInstance(conf) - } else if (zeroArgumentConstructor.isDefined) { - zeroArgumentConstructor.get.newInstance() - } else { - throw new SparkException( - s"$className did not have a zero-argument constructor or a" + - " single-argument constructor that accepts SparkConf. Note: if the class is" + - " defined inside of another Scala class, then its constructors may accept an" + - " implicit parameter that references the enclosing class; in this case, you must" + - " define the listener as a top-level class in order to prevent this extra" + - " parameter from breaking Spark's ability to find a valid constructor.") - } - } - addStreamingListener(listener) - logInfo(s"Registered StreamingListener $className") + // Use reflection to instantiate listeners specified via `spark.streaming.extraListeners` + val listenerClassNames: Seq[String] = + conf.get("spark.streaming.extraListeners", "").split(',').map(_.trim).filter(_ != "") + for (className <- listenerClassNames) { + // Use reflection to find the right constructor + val constructors = { + val listenerClass = org.apache.spark.util.Utils.classForName(className) + listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: StreamingListener]]] + } + val constructorTakingSparkConf = constructors.find { c => + c.getParameterTypes.sameElements(Array(classOf[SparkConf])) } - } catch { - case e: Exception => - try { - stop() - } finally { - throw new SparkException(s"Exception when registering StreamingListener", e) + lazy val zeroArgumentConstructor = constructors.find { c => + c.getParameterTypes.isEmpty + } + val listener: StreamingListener = { + if (constructorTakingSparkConf.isDefined) { + constructorTakingSparkConf.get.newInstance(conf) + } else if (zeroArgumentConstructor.isDefined) { + zeroArgumentConstructor.get.newInstance() + } else { + throw new SparkException( + s"Exception when registering Streaming Listener:" + + " $className did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf. Note: if the class is" + + " defined inside of another Scala class, then its constructors may accept an" + + " implicit parameter that references the enclosing class; in this case, you must" + + " define the listener as a top-level class in order to prevent this extra" + + " parameter from breaking Spark's ability to find a valid constructor.") } + } + addStreamingListener(listener) + logInfo(s"Registered StreamingListener $className") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index bebf62f5744b..767696965ac6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -148,10 +148,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { classOf[ReceiverInfoCollector].getName) val scc = new StreamingContext(conf, Seconds(1)) - scc.scheduler.listenerBus.listeners.collect { - case x: BatchInfoCollector => x }.size should be (1) - scc.scheduler.listenerBus.listeners.collect { - case x: ReceiverInfoCollector => x }.size should be (1) + scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[BatchInfoCollector] } + scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[ReceiverInfoCollector] } } } From 70c31e4e0f62461df83fd125b2d33f96ee597a10 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 16 Jul 2015 15:13:11 -0600 Subject: [PATCH 16/18] Fix documentation, add more/clearer test cases --- docs/configuration.md | 4 +-- .../streaming/StreamingListenerSuite.scala | 35 ++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 46da11cca4ac..253edcc1ed4f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1498,8 +1498,8 @@ Apart from these, the following properties are also available, and may be useful (none) A comma-separated list of classes that implement StreamingListener; when initializing - SparkContext, instances of these classes will be created and registered with Spark's streaming listener - bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor + StreamingContext, instances of these classes will be created and registered with StreamingContext's listener + bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor will be called; otherwise, a zero-argument constructor will be called. If no valid constructor can be found, the StreamingContext creation will fail with an exception. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 767696965ac6..1048bcf30ff6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -27,10 +27,10 @@ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ -import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, Logging} +import org.scalatest.{Matchers, Assertions} +import org.apache.spark.{SparkException, SparkConf, Logging} class StreamingListenerSuite extends TestSuiteBase with Matchers { @@ -143,13 +143,30 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } test("registering listeners via spark.streaming.extraListeners") { + // Test for success with zero-argument constructor and sparkConf constructor val conf = new SparkConf().setMaster("local").setAppName("test") - .set("spark.streaming.extraListeners", classOf[BatchInfoCollector].getName + "," + + .set("spark.streaming.extraListeners", classOf[StreamingListenerThatAcceptsSparkConf].getName + "," + classOf[ReceiverInfoCollector].getName) val scc = new StreamingContext(conf, Seconds(1)) - scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[BatchInfoCollector] } + scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[StreamingListenerThatAcceptsSparkConf] } scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[ReceiverInfoCollector] } + + // Test for failure with too many arguments in constructor + val failingConf = new SparkConf().setMaster("local").setAppName("failingTest") + .set("spark.streaming.extraListeners", classOf[StreamingListenerTooManyArguments].getName) + val thrown = intercept[SparkException] { + val failingScc = new StreamingContext(failingConf, Seconds(1)) + } + val expectedErrorMessage = + s"Exception when registering Streaming Listener:" + + " StreamingListenerTooManyArguments did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf. Note: if the class is" + + " defined inside of another Scala class, then its constructors may accept an" + + " implicit parameter that references the enclosing class; in this case, you must" + + " define the listener as a top-level class in order to prevent this extra" + + " parameter from breaking Spark's ability to find a valid constructor." + assert(thrown.getMessage === expectedErrorMessage) } } @@ -183,7 +200,7 @@ class ReceiverInfoCollector extends StreamingListener { startedReceiverStreamIds += receiverStarted.receiverInfo.streamId } - override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopListped) { stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId } @@ -207,3 +224,11 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O } def onStop() { } } + +class StreamingListenerThatAcceptsSparkConf(conf: SparkConf) extends StreamingListener { + // Empty dummy class used for testing +} + +class StreamingListenerTooManyArguments(conf: SparkConf, failInt: Int) extends StreamingListener { + // Empty dummy class used for testing +} From aff08ff61073f2998310bc436253094424adae66 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 16 Jul 2015 15:16:24 -0600 Subject: [PATCH 17/18] Accidental insertion removed --- .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 1048bcf30ff6..b2975cd0db4a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -200,7 +200,7 @@ class ReceiverInfoCollector extends StreamingListener { startedReceiverStreamIds += receiverStarted.receiverInfo.streamId } - override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopListped) { + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId } From fa8c752d6a78d1c54717819f37ffe2a0f0ec48a5 Mon Sep 17 00:00:00 2001 From: Jeff Harrison Date: Thu, 16 Jul 2015 16:39:43 -0600 Subject: [PATCH 18/18] Correct test text --- .../apache/spark/streaming/StreamingContext.scala | 4 ++-- .../spark/streaming/StreamingListenerSuite.scala | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 49330294611a..8e9b724c9b4f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -566,8 +566,8 @@ class StreamingContext private[streaming] ( zeroArgumentConstructor.get.newInstance() } else { throw new SparkException( - s"Exception when registering Streaming Listener:" + - " $className did not have a zero-argument constructor or a" + + "Exception when registering Streaming Listener:" + + s" $className did not have a zero-argument constructor or a" + " single-argument constructor that accepts SparkConf. Note: if the class is" + " defined inside of another Scala class, then its constructors may accept an" + " implicit parameter that references the enclosing class; in this case, you must" + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index b2975cd0db4a..3007fea1b9b1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -145,12 +145,15 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { test("registering listeners via spark.streaming.extraListeners") { // Test for success with zero-argument constructor and sparkConf constructor val conf = new SparkConf().setMaster("local").setAppName("test") - .set("spark.streaming.extraListeners", classOf[StreamingListenerThatAcceptsSparkConf].getName + "," + + .set("spark.streaming.extraListeners", + classOf[StreamingListenerThatAcceptsSparkConf].getName + "," + classOf[ReceiverInfoCollector].getName) val scc = new StreamingContext(conf, Seconds(1)) - scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[StreamingListenerThatAcceptsSparkConf] } - scc.scheduler.listenerBus.listeners.exists { _.isInstanceOf[ReceiverInfoCollector] } + scc.scheduler.listenerBus.listeners.exists { + _.isInstanceOf[StreamingListenerThatAcceptsSparkConf] } + scc.scheduler.listenerBus.listeners.exists { + _.isInstanceOf[ReceiverInfoCollector] } // Test for failure with too many arguments in constructor val failingConf = new SparkConf().setMaster("local").setAppName("failingTest") @@ -159,8 +162,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val failingScc = new StreamingContext(failingConf, Seconds(1)) } val expectedErrorMessage = - s"Exception when registering Streaming Listener:" + - " StreamingListenerTooManyArguments did not have a zero-argument constructor or a" + + "Exception when registering Streaming Listener:" + + " org.apache.spark.streaming.StreamingListenerTooManyArguments" + + " did not have a zero-argument constructor or a" + " single-argument constructor that accepts SparkConf. Note: if the class is" + " defined inside of another Scala class, then its constructors may accept an" + " implicit parameter that references the enclosing class; in this case, you must" +