Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4b82a2c
Import yjshen's PR.
yjshen Aug 11, 2015
eddcc47
Use newer shaded docker-client artifact
JoshRosen Oct 19, 2015
8bb62ea
Use map for setting environment variables.
JoshRosen Oct 19, 2015
23443a5
Ensure that Docker container is cleaned up after error.
JoshRosen Oct 19, 2015
e6c7709
Remove a bunch of layers of indirection / abstraction.
JoshRosen Nov 5, 2015
d06847b
Fix networking for boot2docker.
JoshRosen Nov 5, 2015
93bcf45
Log warning in case container cleanup fails
JoshRosen Nov 5, 2015
bf98ae0
Rename class and freeze image versions
JoshRosen Nov 5, 2015
1e389e2
Automatically get docker IP from docker-machine and boot2docker.
JoshRosen Nov 5, 2015
d9c37df
Print nicer message if Docker connection fails.
JoshRosen Nov 5, 2015
95f00e1
Add test tag for excluding Docker tests in SBT.
JoshRosen Nov 5, 2015
fcf5dc4
Upgrade to non-ancient version of docker-client.
JoshRosen Nov 6, 2015
2273f87
Upgrade Jersey to avoid ASM incompatibilities / classpath ordering is…
JoshRosen Nov 6, 2015
65315c4
Merge remote-tracking branch 'origin/master' into docker-jdbc-tests
JoshRosen Nov 6, 2015
cba340f
Experiment with bumping Jersey, even though I know we can't do that.
JoshRosen Nov 6, 2015
1a09a65
Hack to fix IP address binding in Jenkins.
JoshRosen Nov 9, 2015
af84a47
Move docker tests to own subproject.
JoshRosen Nov 9, 2015
4899b2e
Add DockerTest tag.
JoshRosen Nov 9, 2015
ef85200
Use non-shaded Docker client.
JoshRosen Nov 9, 2015
6db2c1c
Fix dependency problems.
JoshRosen Nov 10, 2015
9011bc5
Merge remote-tracking branch 'origin/master' into docker-jdbc-tests
JoshRosen Nov 10, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
<classifier>shaded</classifier>
<version>2.7.7</version>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah, this is an ancient version of docker-client. I'll try using a newer release to see if that magically fixes the ASM issues.

<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.annotation.ElementType;

import org.scalatest.TagAnnotation;

@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DockerTestTag {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.net.ServerSocket
import java.sql.Connection

import scala.collection.JavaConverters._
import scala.sys.process._
import scala.util.Try
import scala.util.control.NonFatal

import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding}
import com.spotify.docker.client._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

abstract class DatabaseOnDocker {
/**
* The docker image to be pulled.
*/
val imageName: String

/**
* Environment variables to set inside of the Docker container while launching it.
*/
val env: Map[String, String]

/**
* The container-internal JDBC port that the database listens on.
*/
val jdbcPort: Int

/**
* Return a JDBC URL that connects to the database running at the given IP address and port.
*/
def getJdbcUrl(ip: String, port: Int): String
}

abstract class DockerJDBCIntegrationSuite
extends SparkFunSuite
with BeforeAndAfterAll
with Eventually
with SharedSQLContext {

val db: DatabaseOnDocker

private var docker: DockerClient = _
private var containerId: String = _
protected var jdbcUrl: String = _

override def beforeAll() {
super.beforeAll()
try {
docker = DefaultDockerClient.fromEnv.build()
// Check that Docker is actually up
try {
docker.ping()
} catch {
case NonFatal(e) =>
log.error("Exception while connecting to Docker. Check whether Docker is running.")
throw e
}
// Ensure that the Docker image is installed:
try {
docker.inspectImage(db.imageName)
} catch {
case e: ImageNotFoundException =>
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
docker.pull(db.imageName)
}
// Create the database container:
val config = ContainerConfig.builder()
.image(db.imageName)
.networkDisabled(false)
.env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
.exposedPorts(s"${db.jdbcPort}/tcp")
.build()
containerId = docker.createContainer(config).id
// Configure networking (necessary for boot2docker / Docker Machine)
val externalPort: Int = {
val sock = new ServerSocket(0)
val port = sock.getLocalPort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always forget your can do this...

sock.close()
port
}
val dockerIp = getDockerIp()
val hostConfig: HostConfig = HostConfig.builder()
.networkMode("bridge")
.portBindings(
Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava)
.build()
// Start the container and wait until the database can accept JDBC connections:
docker.startContainer(containerId, hostConfig)
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
eventually(timeout(60.seconds), interval(1.seconds)) {
val conn = java.sql.DriverManager.getConnection(jdbcUrl)
conn.close()
}
// Run any setup queries:
val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl)
try {
dataPreparation(conn)
} finally {
conn.close()
}
} catch {
case NonFatal(e) =>
try {
afterAll()
} finally {
throw e
}
}
}

override def afterAll() {
try {
if (docker != null) {
try {
if (containerId != null) {
docker.killContainer(containerId)
docker.removeContainer(containerId)
}
} catch {
case NonFatal(e) =>
logWarning(s"Could not stop container $containerId", e)
} finally {
docker.close()
}
}
} finally {
super.afterAll()
}
}

private def getDockerIp(): String = {
/** If docker-machine is setup on this box, attempts to find the ip from it. */
def findFromDockerMachine(): Option[String] = {
sys.env.get("DOCKER_MACHINE_NAME").flatMap { name =>
Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption
}
}
sys.env.get("DOCKER_IP")
.orElse(findFromDockerMachine())
.orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption)
.getOrElse(Utils.localHostName())
}

/**
* Prepare databases and tables for testing.
*/
def dataPreparation(connection: Connection): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.util.Properties

@DockerTestTag
class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
override val db = new DatabaseOnDocker {
override val imageName = "mysql:5.7.9"
override val env = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
}

override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y TEXT(8))").executeUpdate()
conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate()
conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate()

conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), "
+ "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, "
+ "dbl DOUBLE)").executeUpdate()
conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+ "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, "
+ "42.75, 1.0000000000000002)").executeUpdate()

conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, "
+ "yr YEAR)").executeUpdate()
conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', "
+ "'1996-01-01 01:23:45', '2009-02-13 23:31:30', '2001')").executeUpdate()

// TODO: Test locale conversion for strings.
conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c TINYTEXT, "
+ "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i BLOB)"
).executeUpdate()
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " +
"'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
}

test("Basic test") {
val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
val rows = df.collect()
assert(rows.length == 2)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 2)
assert(types(0).equals("class java.lang.Integer"))
assert(types(1).equals("class java.lang.String"))
}

test("Numeric types") {
val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
assert(types(2).equals("class java.lang.Integer"))
assert(types(3).equals("class java.lang.Integer"))
assert(types(4).equals("class java.lang.Integer"))
assert(types(5).equals("class java.lang.Long"))
assert(types(6).equals("class java.math.BigDecimal"))
assert(types(7).equals("class java.lang.Double"))
assert(types(8).equals("class java.lang.Double"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
assert(rows(0).getInt(2) == 17)
assert(rows(0).getInt(3) == 77777)
assert(rows(0).getInt(4) == 123456789)
assert(rows(0).getLong(5) == 123456789012345L)
val bd = new BigDecimal("123456789012345.12345678901234500000")
assert(rows(0).getAs[BigDecimal](6).equals(bd))
assert(rows(0).getDouble(7) == 42.75)
assert(rows(0).getDouble(8) == 1.0000000000000002)
}

test("Date types") {
val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 5)
assert(types(0).equals("class java.sql.Date"))
assert(types(1).equals("class java.sql.Timestamp"))
assert(types(2).equals("class java.sql.Timestamp"))
assert(types(3).equals("class java.sql.Timestamp"))
assert(types(4).equals("class java.sql.Date"))
assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09")))
assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24")))
assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45")))
assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30")))
assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01")))
}

test("String types") {
val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
assert(types(0).equals("class java.lang.String"))
assert(types(1).equals("class java.lang.String"))
assert(types(2).equals("class java.lang.String"))
assert(types(3).equals("class java.lang.String"))
assert(types(4).equals("class java.lang.String"))
assert(types(5).equals("class java.lang.String"))
assert(types(6).equals("class [B"))
assert(types(7).equals("class [B"))
assert(types(8).equals("class [B"))
assert(rows(0).getString(0).equals("the"))
assert(rows(0).getString(1).equals("quick"))
assert(rows(0).getString(2).equals("brown"))
assert(rows(0).getString(3).equals("fox"))
assert(rows(0).getString(4).equals("jumps"))
assert(rows(0).getString(5).equals("over"))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0)))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121)))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103)))
}

test("Basic write test") {
val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}
}
Loading