Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
51ca7bd
Improve building with maven docs
Mar 6, 2014
cda381f
SPARK-1184: Update the distribution tar.gz to include spark-assembly jar
markgrover Mar 6, 2014
3eb009f
SPARK-1156: allow user to login into a cluster without slaves
CodingCat Mar 6, 2014
3d3acef
SPARK-1187, Added missing Python APIs
Mar 6, 2014
40566e1
SPARK-942: Do not materialize partitions when DISK_ONLY storage level…
kellrott Mar 6, 2014
7edbea4
SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI…
tgravescs Mar 7, 2014
328c73d
SPARK-1197. Change yarn-standalone to yarn-cluster and fix up running…
sryza Mar 7, 2014
9ae919c
Example for cassandra CQL read/write from spark
anitatailor Mar 7, 2014
33baf14
Small clean-up to flatmap tests
pwendell Mar 7, 2014
dabeb6f
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
aarondav Mar 7, 2014
b7cd9e9
SPARK-1195: set map_input_file environment variable in PipedRDD
tgravescs Mar 7, 2014
6e730ed
Spark 1165 rdd.intersection in python and java
ScrapCodes Mar 8, 2014
a99fb37
SPARK-1193. Fix indentation in pom.xmls
sryza Mar 8, 2014
8ad486a
Allow sbt to use more than 1G of heap.
rxin Mar 8, 2014
0b7b7fd
[SPARK-1194] Fix the same-RDD rule for cache replacement
liancheng Mar 8, 2014
c2834ec
Update junitxml plugin to the latest version to avoid recompilation i…
rxin Mar 8, 2014
e59a3b6
SPARK-1190: Do not initialize log4j if slf4j log4j backend is not bei…
pwendell Mar 9, 2014
52834d7
SPARK-929: Fully deprecate usage of SPARK_MEM
aarondav Mar 9, 2014
f6f9d02
Add timeout for fetch file
guojc Mar 9, 2014
faf4cad
Fix markup errors introduced in #33 (SPARK-1189)
pwendell Mar 9, 2014
b9be160
SPARK-782 Clean up for ASM dependency.
pwendell Mar 9, 2014
5d98cfc
maintain arbitrary state data for each key
CrazyJvm Mar 10, 2014
32ad348
[SPARK-1186] : Enrich the Spark Shell to support additional arguments.
berngp Mar 10, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
This patch allows the FaultToleranceTest to work in newer versions of Docker.
See https://spark-project.atlassian.net/browse/SPARK-1136 for more details.

Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs.

Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed.

Author: Aaron Davidson <[email protected]>

Closes #5 from aarondav/zookeeper and squashes the following commits:

5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
  • Loading branch information
aarondav committed Mar 7, 2014
commit dabeb6f160f7ad7df1c54b1b8b069700dd4b74dd
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,41 @@ import scala.sys.process._
import org.json4s._
import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
* In order to mimic a real distributed cluster more closely, Docker is used.
* Execute using
* ./spark-class org.apache.spark.deploy.FaultToleranceTest
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
*
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* *and* SPARK_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
*
* In case of failure, make sure to kill off prior docker containers before restarting:
* docker kill $(docker ps -q)
*
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
* working installation of Docker. In addition to having Docker, the following are assumed:
* - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
* - The docker images tagged spark-test-master and spark-test-worker are built from the
* docker/ directory. Run 'docker/spark-test/build' to generate these.
*/
private[spark] object FaultToleranceTest extends App with Logging {

val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")

val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _

val zk = SparkCuratorUtil.newClient(conf)

var numPassed = 0
var numFailed = 0

Expand All @@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = null
}
terminateCluster()

// Clear ZK directories in between tests (for speed purposes)
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
}

test("sanity-basic") {
Expand Down Expand Up @@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
try {
fn
numPassed += 1
logInfo("==============================================")
logInfo("Passed: " + name)
logInfo("==============================================")
} catch {
case e: Exception =>
numFailed += 1
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logError("FAILED: " + name, e)
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
sys.exit(1)
}
afterEach()
}

def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}

def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}

/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
Expand All @@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}

def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState())
val leader = getLeader
masters -= leader
Expand All @@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)

def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill())
workers.foreach(_.kill())
masters.clear()
Expand Down Expand Up @@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* are all alive in a proper configuration (e.g., only one leader).
*/
def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable()
var numAlive = 0
var numStandby = 0
Expand Down Expand Up @@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val

val workers = json \ "workers"
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
// Extract the worker IP from "webuiaddress" (rather than "host") because the host name
// on containers is a weird hash instead of the actual IP address.
liveWorkerIPs = liveWorkers.map {
w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
}

numLiveApps = (json \ "activeapps").children.size

Expand Down Expand Up @@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""

val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
logDebug("Run command: " + cmd)
cmd
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,

val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}

workers += worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.deploy.master

import org.apache.spark.{SparkConf, Logging}
import scala.collection.JavaConversions._

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.zookeeper.KeeperException

import org.apache.spark.{Logging, SparkConf}

object SparkCuratorUtil extends Logging {

Expand Down Expand Up @@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
}
}
}

def deleteRecursive(zk: CuratorFramework, path: String) {
if (zk.checkExists().forPath(path) != null) {
for (child <- zk.getChildren.forPath(path)) {
zk.delete().forPath(path + "/" + child)
}
zk.delete().forPath(path)
}
}
}
4 changes: 3 additions & 1 deletion docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ Spark docker files
===========

Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).

Tested with Docker version 0.8.1.
8 changes: 7 additions & 1 deletion docker/spark-test/master/default_cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@

IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
export SPARK_LOCAL_IP=$IP
export SPARK_PUBLIC_DNS=$IP

# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts

/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
8 changes: 7 additions & 1 deletion docker/spark-test/worker/default_cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@

IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
export SPARK_LOCAL_IP=$IP
export SPARK_PUBLIC_DNS=$IP

# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts

/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1