Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,7 @@ class JDBCOptions(
""".stripMargin
)

val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we could just remove this condition and set it as is, and then expect the DBMS to throw an error as well. Seems like Postgres throws an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just put the discussion on SPARK-21287 here for reference :
https://issues.apache.org/jira/browse/SPARK-21287

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's fine too. It isn't really supposed to be negative, but there's one notable exception in MySQL's driver. Even so, we don't manually check other settings here, and so yeah I don't know if there's much value in this check in the end. I'd be OK just removing it entirely too.

Copy link
Contributor Author

@fuwhu fuwhu Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen @HyukjinKwon I think both of these two solutions should be ok, so I submitted another PR #26244 which just remove the check of non-negative condition in JDBCOptions.
You can decide which one to follow up, i'll close the other.

s"Invalid value `${size.toString}` for parameter " +
s"`$JDBC_BATCH_FETCH_SIZE`. The minimum value is 0. When the value is 0, " +
"the JDBC driver ignores the value and does the estimates.")
size
}
val fetchSize = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt

// ------------------------------------------------------------
// Optional parameters only for writing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, PreparedStatement, ResultSet, SQLException}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
Expand Down Expand Up @@ -184,6 +185,8 @@ private[jdbc] class JDBCRDD(
options: JDBCOptions)
extends RDD[InternalRow](sc, Nil) {

JdbcDialects.get(url).validateProperties(options.asProperties.asScala.toMap)

/**
* Retrieve the list of partitions corresponding to this RDD.
*/
Expand Down Expand Up @@ -271,7 +274,6 @@ private[jdbc] class JDBCRDD(
val part = thePart.asInstanceOf[JDBCPartition]
conn = getConnection()
val dialect = JdbcDialects.get(url)
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, options.asProperties.asScala.toMap)

// This executes a generic SQL statement (or PL/SQL block) before reading
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp}
import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -150,6 +151,20 @@ abstract class JdbcDialect extends Serializable {
def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
}

/**
* Do some extra properties validation work in addition to the validation
* in [[org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions]].
* @param properties The connection properties. This is passed through from the relation.
*/
def validateProperties(properties: Map[String, String]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm.. wait .. properties: Map[String, String] this will requires Scala map instead of Java map, if we should implement the dialect from Java side. We could just send Properties as are I suspect.

Copy link
Contributor Author

@fuwhu fuwhu Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose scala Map because i saw the JdbcDialect.beforeFetch accepts scala Map as parameter.
I agree with you, we may consider changing the parameter type for both beforeFetch and validateProperties, but i think we can leave this to another PR.
WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK to leave as-is here.

val fetchSize = properties.getOrElse(JDBCOptions.JDBC_BATCH_FETCH_SIZE, "0").toInt
require(fetchSize >= 0,
s"Invalid value `${fetchSize.toString}` for parameter " +
s"`${JDBCOptions.JDBC_BATCH_FETCH_SIZE}` for dialect ${this.getClass.getSimpleName}. " +
s"The minimum value is 0. When the value is 0, " +
"the JDBC driver ignores the value and does the estimates.")
}

/**
* Escape special characters in SQL string literals.
* @param value The string to be escaped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc

import java.sql.Types

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder}

private case object MySQLDialect extends JdbcDialect {
Expand Down Expand Up @@ -46,4 +47,15 @@ private case object MySQLDialect extends JdbcDialect {
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

override def validateProperties(properties: Map[String, String]): Unit = {
val fetchSize = properties.getOrElse(JDBCOptions.JDBC_BATCH_FETCH_SIZE, "0").toInt
require(fetchSize >= 0 || fetchSize == Integer.MIN_VALUE,
s"Invalid value `${fetchSize.toString}` for parameter " +
s"`${JDBCOptions.JDBC_BATCH_FETCH_SIZE}` for MySQL. " +
s"The value should be >= 0 or equal Integer.MIN_VALUE; " +
s"When the value is 0, the JDBC driver ignores the value and does the estimates; " +
s"When the value is Integer.MIN_VALUE, the data will be fetched in streaming manner, " +
s"namely, fetch one row at a time.")
}
}
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.util.Utils

class JDBCSuite extends QueryTest
with BeforeAndAfter with PrivateMethodTester with SharedSparkSession {
import scala.collection.JavaConverters._
import testImplicits._

val url = "jdbc:h2:mem:testdb0"
Expand Down Expand Up @@ -459,6 +460,30 @@ class JDBCSuite extends QueryTest
assert(e.contains("Invalid value `-1` for parameter `fetchsize`"))
}

test("[SPARK-21287] Dialect validate properties") {
val mysqlDialect = JdbcDialects.get("jdbc:mysql:xxx")
val h2Dialect = JdbcDialects.get("jdbc:h2:xxx")
val properties = new Properties()
properties.setProperty(JDBCOptions.JDBC_BATCH_FETCH_SIZE, "-1")
val e1 = intercept[IllegalArgumentException] {
mysqlDialect.validateProperties(properties.asScala.toMap)
}.getMessage
val e2 = intercept[IllegalArgumentException] {
h2Dialect.validateProperties(properties.asScala.toMap)
}.getMessage
properties.setProperty(JDBCOptions.JDBC_BATCH_FETCH_SIZE, "1")
mysqlDialect.validateProperties(properties.asScala.toMap)
h2Dialect.validateProperties(properties.asScala.toMap)
properties.setProperty(JDBCOptions.JDBC_BATCH_FETCH_SIZE, Integer.MIN_VALUE.toString)
mysqlDialect.validateProperties(properties.asScala.toMap)
val e3 = intercept[IllegalArgumentException] {
h2Dialect.validateProperties(properties.asScala.toMap)
}.getMessage
assert(e1.contains("Invalid value `-1` for parameter `fetchsize`"))
assert(e2.contains("Invalid value `-1` for parameter `fetchsize`"))
assert(e3.contains(s"Invalid value `${Integer.MIN_VALUE.toString}` for parameter `fetchsize`"))
}

test("Missing partition columns") {
withView("tempPeople") {
val e = intercept[IllegalArgumentException] {
Expand Down