Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-15703] Fixes test failures to ensure single sc
  • Loading branch information
dhruve committed Jul 20, 2016
commit 09e855ede8d4c7b01d48467b126368a048ea398d
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,5 @@ package object config {
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
Copy link
Contributor

@JoshRosen JoshRosen May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another post-hoc review/complaint: I think that size might be misleading in this context where we're talking about a queue, since the size of a queue refers to the number of elements currently in the queue while its capacity refers to the maximum size that the queue can reach. This configuration name caused confusion in https://github.com/apache/spark/pull/18083/files/378206efb9f5c9628a678ba7defb536252f5cbcb#r118413115

Instead, it might have been better to call it capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Capacity would have been a better choice.

.intConf
.transform((x: Int) => {
if (x <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
} else {
x
}
})
.createWithDefault(10000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean

import scala.util.DynamicVariable

import org.apache.spark.internal.config._
import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.util.Utils

/**
Expand All @@ -41,9 +40,18 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)


private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}

test("basic creation and shutdown of LiveListenerBus") {
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val counter = new BasicJobCounter
val bus = new LiveListenerBus(sc)
bus.addListener(counter)
Expand Down Expand Up @@ -106,7 +107,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
drained = true
}
}

sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener

Expand Down Expand Up @@ -353,6 +354,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val bus = new LiveListenerBus(sc)

// Propagate events to bad listener first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._

/** Testsuite that tests block replication in BlockManager */
class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
class BlockManagerReplicationSuite extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
Expand Down Expand Up @@ -91,9 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")

sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true)
new LiveListenerBus(sc))), conf, true)
allStores.clear()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer

class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {

import BlockManagerSuite._

Expand Down Expand Up @@ -107,9 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)

sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true)
new LiveListenerBus(sc))), conf, true)

val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, Success}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._

/**
* Test various functionality in the StorageListener that supports the StorageTab.
*/
class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
private var bus: LiveListenerBus = _
private var storageStatusListener: StorageStatusListener = _
private var storageListener: StorageListener = _
Expand All @@ -44,7 +43,8 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {

before {
val conf = new SparkConf()
bus = new LiveListenerBus(new SparkContext("local", "test", conf))
sc = new SparkContext("local", "test", conf)
bus = new LiveListenerBus(sc)
storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite
extends SparkFunSuite
with BeforeAndAfter
with Matchers
with LocalSparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that this change was necessary is another weird code smell which suggests to me that putting SparkContext into the constructor was not a good idea.

with Logging {

import WriteAheadLogBasedBlockHandler._
Expand Down Expand Up @@ -77,9 +78,10 @@ class ReceivedBlockHandlerSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)

sc = new SparkContext("local", "test", conf)
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true)
new LiveListenerBus(sc))), conf, true)

storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)
Expand Down