Skip to content

Commit e991255

Browse files
SlavikBaranovmarmbrus
authored andcommitted
[SPARK-6913][SQL] Fixed "java.sql.SQLException: No suitable driver found"
Fixed `java.sql.SQLException: No suitable driver found` when loading DataFrame into Spark SQL if the driver is supplied with `--jars` argument. The problem is in `java.sql.DriverManager` class that can't access drivers loaded by Spark ClassLoader. Wrappers that forward requests are created for these drivers. Also, it's not necessary any more to include JDBC drivers in `--driver-class-path` in local mode, specifying in `--jars` argument is sufficient. Author: Vyacheslav Baranov <[email protected]> Closes #5782 from SlavikBaranov/SPARK-6913 and squashes the following commits: 510c43f [Vyacheslav Baranov] [SPARK-6913] Fixed review comments b2a727c [Vyacheslav Baranov] [SPARK-6913] Fixed thread race on driver registration c8294ae [Vyacheslav Baranov] [SPARK-6913] Fixed "No suitable driver found" when using using JDBC driver added with SparkContext.addJar
1 parent a0d8a61 commit e991255

File tree

3 files changed

+62
-4
lines changed

3 files changed

+62
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private[sql] object JDBCRDD extends Logging {
159159
def getConnector(driver: String, url: String, properties: Properties): () => Connection = {
160160
() => {
161161
try {
162-
if (driver != null) Utils.getContextOrSparkClassLoader.loadClass(driver)
162+
if (driver != null) DriverRegistry.register(driver)
163163
} catch {
164164
case e: ClassNotFoundException => {
165165
logWarning(s"Couldn't find class $driver", e);

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private[sql] class DefaultSource extends RelationProvider {
100100
val upperBound = parameters.getOrElse("upperBound", null)
101101
val numPartitions = parameters.getOrElse("numPartitions", null)
102102

103-
if (driver != null) Utils.getContextOrSparkClassLoader.loadClass(driver)
103+
if (driver != null) DriverRegistry.register(driver)
104104

105105
if (partitionColumn != null
106106
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
@@ -136,7 +136,7 @@ private[sql] case class JDBCRelation(
136136
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
137137

138138
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
139-
val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
139+
val driver: String = DriverRegistry.getDriverClassName(url)
140140
JDBCRDD.scanTable(
141141
sqlContext.sparkContext,
142142
schema,

sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.sql.{Connection, DriverManager, PreparedStatement}
20+
import java.sql.{Connection, Driver, DriverManager, DriverPropertyInfo, PreparedStatement}
21+
import java.util.Properties
22+
23+
import scala.collection.mutable
2124

2225
import org.apache.spark.Logging
2326
import org.apache.spark.sql.types._
27+
import org.apache.spark.util.Utils
2428

2529
package object jdbc {
2630
private[sql] object JDBCWriteDetails extends Logging {
@@ -179,4 +183,58 @@ package object jdbc {
179183
}
180184

181185
}
186+
187+
private [sql] class DriverWrapper(val wrapped: Driver) extends Driver {
188+
override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
189+
190+
override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
191+
192+
override def getPropertyInfo(url: String, info: Properties): Array[DriverPropertyInfo] = {
193+
wrapped.getPropertyInfo(url, info)
194+
}
195+
196+
override def getMinorVersion: Int = wrapped.getMinorVersion
197+
198+
override def getParentLogger: java.util.logging.Logger = wrapped.getParentLogger
199+
200+
override def connect(url: String, info: Properties): Connection = wrapped.connect(url, info)
201+
202+
override def getMajorVersion: Int = wrapped.getMajorVersion
203+
}
204+
205+
/**
206+
* java.sql.DriverManager is always loaded by bootstrap classloader,
207+
* so it can't load JDBC drivers accessible by Spark ClassLoader.
208+
*
209+
* To solve the problem, drivers from user-supplied jars are wrapped
210+
* into thin wrapper.
211+
*/
212+
private [sql] object DriverRegistry extends Logging {
213+
214+
private val wrapperMap: mutable.Map[String, DriverWrapper] = mutable.Map.empty
215+
216+
def register(className: String): Unit = {
217+
val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
218+
if (cls.getClassLoader == null) {
219+
logTrace(s"$className has been loaded with bootstrap ClassLoader, wrapper is not required")
220+
} else if (wrapperMap.get(className).isDefined) {
221+
logTrace(s"Wrapper for $className already exists")
222+
} else {
223+
synchronized {
224+
if (wrapperMap.get(className).isEmpty) {
225+
val wrapper = new DriverWrapper(cls.newInstance().asInstanceOf[Driver])
226+
DriverManager.registerDriver(wrapper)
227+
wrapperMap(className) = wrapper
228+
logTrace(s"Wrapper for $className registered")
229+
}
230+
}
231+
}
232+
}
233+
234+
def getDriverClassName(url: String): String = DriverManager.getDriver(url) match {
235+
case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
236+
case driver => driver.getClass.getCanonicalName
237+
}
238+
}
239+
182240
} // package object jdbc

0 commit comments

Comments
 (0)