Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4b82a2c
Import yjshen's PR.
yjshen Aug 11, 2015
eddcc47
Use newer shaded docker-client artifact
JoshRosen Oct 19, 2015
8bb62ea
Use map for setting environment variables.
JoshRosen Oct 19, 2015
23443a5
Ensure that Docker container is cleaned up after error.
JoshRosen Oct 19, 2015
e6c7709
Remove a bunch of layers of indirection / abstraction.
JoshRosen Nov 5, 2015
d06847b
Fix networking for boot2docker.
JoshRosen Nov 5, 2015
93bcf45
Log warning in case container cleanup fails
JoshRosen Nov 5, 2015
bf98ae0
Rename class and freeze image versions
JoshRosen Nov 5, 2015
1e389e2
Automatically get docker IP from docker-machine and boot2docker.
JoshRosen Nov 5, 2015
d9c37df
Print nicer message if Docker connection fails.
JoshRosen Nov 5, 2015
95f00e1
Add test tag for excluding Docker tests in SBT.
JoshRosen Nov 5, 2015
fcf5dc4
Upgrade to non-ancient version of docker-client.
JoshRosen Nov 6, 2015
2273f87
Upgrade Jersey to avoid ASM incompatibilities / classpath ordering is…
JoshRosen Nov 6, 2015
65315c4
Merge remote-tracking branch 'origin/master' into docker-jdbc-tests
JoshRosen Nov 6, 2015
cba340f
Experiment with bumping Jersey, even though I know we can't do that.
JoshRosen Nov 6, 2015
1a09a65
Hack to fix IP address binding in Jenkins.
JoshRosen Nov 9, 2015
af84a47
Move docker tests to own subproject.
JoshRosen Nov 9, 2015
4899b2e
Add DockerTest tag.
JoshRosen Nov 9, 2015
ef85200
Use non-shaded Docker client.
JoshRosen Nov 9, 2015
6db2c1c
Fix dependency problems.
JoshRosen Nov 10, 2015
9011bc5
Merge remote-tracking branch 'origin/master' into docker-jdbc-tests
JoshRosen Nov 10, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove a bunch of layers of indirection / abstraction.
  • Loading branch information
JoshRosen committed Nov 5, 2015
commit e6c7709cfde47c87f995b7e55786c59360039bf7
167 changes: 58 additions & 109 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.util.control.NonFatal
import com.spotify.docker.client.messages.ContainerConfig
import com.spotify.docker.client._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSQLContext
Expand All @@ -33,145 +35,92 @@ abstract class DatabaseOnDocker {
/**
* The docker image to be pulled.
*/
def imageName: String
val imageName: String

/**
* Environment variables to set inside of the Docker container while lauching it.
* Environment variables to set inside of the Docker container while launching it.
*/
def env: Map[String, String]
val env: Map[String, String]

/**
* jdbcUrl should be a lazy val or a function since `ip` it relies on is only available after
* the docker container starts
* Return a JDBC URL that connects to the database running at the given IP address.
*/
def jdbcUrl: String
def getJdbcUrl(ip: String): String
}

abstract class DatabaseIntegrationSuite
extends SparkFunSuite
with BeforeAndAfterAll
with Eventually
with SharedSQLContext {

private val docker: DockerClient = DockerClientFactory.get()
private var containerId: String = null
val db: DatabaseOnDocker

lazy val ip = docker.inspectContainer(containerId).networkSettings.ipAddress
private var docker: DockerClient = _
private var containerId: String = _
protected var jdbcUrl: String = _

def start(): Unit = {
while (true) {
override def beforeAll() {
super.beforeAll()
try {
docker = DefaultDockerClient.fromEnv.build()
// Ensure that the Docker image is installed:
try {
val config = ContainerConfig.builder()
.image(imageName)
.env(env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
.build()
containerId = docker.createContainer(config).id
docker.startContainer(containerId)
return
docker.inspectImage(db.imageName)
} catch {
case e: ImageNotFoundException => retry(5)(docker.pull(imageName))
case e: ImageNotFoundException =>
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
docker.pull(db.imageName)
}
}
}

private def retry[T](n: Int)(fn: => T): T = {
try {
fn
} catch {
case e if n > 1 =>
retry(n - 1)(fn)
}
}

def close(): Unit = {
docker.killContainer(containerId)
docker.removeContainer(containerId)
DockerClientFactory.close(docker)
}
}

abstract class DatabaseIntegrationSuite extends SparkFunSuite
with BeforeAndAfterAll with SharedSQLContext {

def db: DatabaseOnDocker

def waitForDatabase(ip: String, maxMillis: Long) {
val before = System.currentTimeMillis()
var lastException: java.sql.SQLException = null
while (true) {
if (System.currentTimeMillis() > before + maxMillis) {
throw new java.sql.SQLException(s"Database not up after $maxMillis ms.", lastException)
// Launch the container:
val config = ContainerConfig.builder()
.image(db.imageName)
.env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
.build()
containerId = docker.createContainer(config).id
docker.startContainer(containerId)
// Wait until the database has started and is accepting JDBC connections:
jdbcUrl = db.getJdbcUrl(ip = docker.inspectContainer(containerId).networkSettings.ipAddress)
eventually(timeout(60.seconds), interval(1.seconds)) {
val conn = java.sql.DriverManager.getConnection(jdbcUrl)
conn.close()
}
// Run any setup queries:
val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl)
try {
val conn = java.sql.DriverManager.getConnection(db.jdbcUrl)
dataPreparation(conn)
} finally {
conn.close()
return
} catch {
case e: java.sql.SQLException =>
lastException = e
java.lang.Thread.sleep(250)
}
}
}

def setupDatabase(ip: String): Unit = {
val conn: Connection = java.sql.DriverManager.getConnection(db.jdbcUrl)
try {
dataPreparation(conn)
} finally {
conn.close()
}
}

/**
* Prepare databases and tables for testing
*/
def dataPreparation(connection: Connection)

override def beforeAll() {
super.beforeAll()
try {
db.start()
waitForDatabase(db.ip, 60000)
setupDatabase(db.ip)
} catch {
case NonFatal(e) =>
try {
afterAll()
} finally {
throw e
}
}

}
}

override def afterAll() {
try {
db.close()
if (docker != null) {
try {
if (containerId != null) {
docker.killContainer(containerId)
docker.removeContainer(containerId)
}
} finally {
docker.close()
}
}
} finally {
super.afterAll()
}
}
}

/**
* A factory and morgue for DockerClient objects. In the DockerClient we use,
* calling close() closes the desired DockerClient but also renders all other
* DockerClients inoperable. This is inconvenient if we have more than one
* open, such as during tests.
*/
object DockerClientFactory {
var numClients: Int = 0
val zombies = new MutableList[DockerClient]()

def get(): DockerClient = {
this.synchronized {
numClients = numClients + 1
DefaultDockerClient.fromEnv.build()
}
}

def close(dc: DockerClient) {
this.synchronized {
numClients = numClients - 1
zombies += dc
if (numClients == 0) {
zombies.foreach(_.close())
zombies.clear()
}
}
}
/**
* Prepare databases and tables for testing.
*/
def dataPreparation(connection: Connection): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite {
val env = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
lazy val jdbcUrl = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass"
def getJdbcUrl(ip: String) = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass"
}

override def dataPreparation(conn: Connection) {
override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y TEXT(8))").executeUpdate()
conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate()
Expand All @@ -57,7 +57,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite {
}

test("Basic test") {
val df = sqlContext.read.jdbc(db.jdbcUrl, "tbl", new Properties)
val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
val rows = df.collect()
assert(rows.length == 2)
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand All @@ -67,7 +67,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite {
}

test("Numeric types") {
val df = sqlContext.read.jdbc(db.jdbcUrl, "numbers", new Properties)
val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand All @@ -94,7 +94,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite {
}

test("Date types") {
val df = sqlContext.read.jdbc(db.jdbcUrl, "dates", new Properties)
val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand All @@ -112,7 +112,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite {
}

test("String types") {
val df = sqlContext.read.jdbc(db.jdbcUrl, "strings", new Properties)
val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand All @@ -138,11 +138,11 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite {
}

test("Basic write test") {
val df1 = sqlContext.read.jdbc(db.jdbcUrl, "numbers", new Properties)
val df2 = sqlContext.read.jdbc(db.jdbcUrl, "dates", new Properties)
val df3 = sqlContext.read.jdbc(db.jdbcUrl, "strings", new Properties)
df1.write.jdbc(db.jdbcUrl, "numberscopy", new Properties)
df2.write.jdbc(db.jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(db.jdbcUrl, "stringscopy", new Properties)
val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ class PostgresIntegrationSuite extends DatabaseIntegrationSuite {
val env = Map(
"POSTGRES_PASSWORD" -> "rootpass"
)
lazy val jdbcUrl = s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass"
override def getJdbcUrl(ip: String): String =
s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass"
}

override def dataPreparation(conn: Connection) {
override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
conn.setCatalog("foo")
conn.prepareStatement("CREATE TABLE bar (a text, b integer, c double precision, d bigint, "
Expand All @@ -39,7 +40,7 @@ class PostgresIntegrationSuite extends DatabaseIntegrationSuite {
}

test("Type mapping for various types") {
val df = sqlContext.read.jdbc(db.jdbcUrl, "bar", new Properties)
val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand Down Expand Up @@ -70,8 +71,8 @@ class PostgresIntegrationSuite extends DatabaseIntegrationSuite {
}

test("Basic write test") {
val df = sqlContext.read.jdbc(db.jdbcUrl, "bar", new Properties)
df.write.jdbc(db.jdbcUrl, "public.barcopy", new Properties)
// Test only that it doesn't bomb out.
val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
df.write.jdbc(jdbcUrl, "public.barcopy", new Properties)
// Test only that it doesn't crash.
}
}