From 749afc38050f8e16937d10c84671f2cffaf58f5a Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Fri, 28 Dec 2018 16:03:23 -0800 Subject: [PATCH 1/5] JdbcUtils.getCatalystType maps TINYINT to IntegerType instead of ByteType --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 6 +++++- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index edea549748b4..282c76b55a51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -239,7 +239,7 @@ object JdbcUtils extends Logging { case java.sql.Types.TIMESTAMP => TimestampType case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => null - case java.sql.Types.TINYINT => IntegerType + case java.sql.Types.TINYINT => ByteType case java.sql.Types.VARBINARY => BinaryType case java.sql.Types.VARCHAR => StringType case _ => @@ -456,6 +456,10 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) + case ByteType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, rs.getByte(pos + 1)) + case ArrayType(et, _) => val elementConversion = et match { case TimestampType => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 71e83767964a..0c6df3f3ed6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -572,7 +572,7 @@ class JDBCSuite extends QueryTest assert(rows.length === 1) assert(rows(0).getInt(0) === 1) assert(rows(0).getBoolean(1) === false) - assert(rows(0).getInt(2) === 3) + assert(rows(0).getByte(2) === 3) assert(rows(0).getInt(3) === 4) assert(rows(0).getLong(4) === 1234567890123L) } From c202053a8297872faf70c17ebe38ef49fdd913ac Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Sat, 29 Dec 2018 20:54:27 -0800 Subject: [PATCH 2/5] Revert change in getCatalystType that maps TINYINT to ByteType. Added test that uses custom JdbcDialect that maps TINYINT to ByteType --- .../datasources/jdbc/JdbcUtils.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 25 +++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 282c76b55a51..5b593fd6e775 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -239,7 +239,7 @@ object JdbcUtils extends Logging { case java.sql.Types.TIMESTAMP => TimestampType case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => null - case java.sql.Types.TINYINT => ByteType + case java.sql.Types.TINYINT => IntegerType case java.sql.Types.VARBINARY => BinaryType case java.sql.Types.VARCHAR => StringType case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0c6df3f3ed6a..ba7aa2b02f6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -23,7 +23,6 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} - import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -56,6 +55,17 @@ class JDBCSuite extends QueryTest Some(StringType) } + val testH2DialectTinyInt = new JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2") + override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder) : + Option[DataType] = { + sqlType match { + case java.sql.Types.TINYINT => Some(ByteType) + case _ => None + } + } + } + before { Utils.classForName("org.h2.Driver") // Extra properties that will be specified for our database. We need these to test @@ -572,7 +582,7 @@ class JDBCSuite extends QueryTest assert(rows.length === 1) assert(rows(0).getInt(0) === 1) assert(rows(0).getBoolean(1) === false) - assert(rows(0).getByte(2) === 3) + assert(rows(0).getInt(2) === 3) assert(rows(0).getInt(3) === 4) assert(rows(0).getLong(4) === 1234567890123L) } @@ -693,6 +703,17 @@ class JDBCSuite extends QueryTest JdbcDialects.unregisterDialect(testH2Dialect) } + test("Map TINYINT to ByteType via JdbcDialects") { + JdbcDialects.registerDialect(testH2DialectTinyInt) + val df = spark.read.jdbc(urlWithUserAndPass, "test.inttypes", new Properties()) + val rows = df.collect() + assert(rows.length === 2) + assert(rows(0).get(2).isInstanceOf[Byte]) + assert(rows(0).getByte(2) === 3) + assert(rows(1).isNullAt(2)) + JdbcDialects.unregisterDialect(testH2DialectTinyInt) + } + test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) From ae34533c6f85d1441bf0adc5a6f2e0db1a557587 Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Sat, 29 Dec 2018 20:56:06 -0800 Subject: [PATCH 3/5] fix formatting --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index ba7aa2b02f6f..abdb033d15e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -23,6 +23,7 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} + import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser From 1c487b7be152c6235db1a268486fb73c45f7b2d1 Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Sun, 30 Dec 2018 20:09:32 -0800 Subject: [PATCH 4/5] Fix formatting --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index abdb033d15e4..e4641631e607 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -58,8 +58,11 @@ class JDBCSuite extends QueryTest val testH2DialectTinyInt = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2") - override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder) : - Option[DataType] = { + override def getCatalystType( + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder): Option[DataType] = { sqlType match { case java.sql.Types.TINYINT => Some(ByteType) case _ => None From a5dedacff6bcb6d0bc0fcacc7dfc2420a12ad0bb Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Mon, 31 Dec 2018 14:24:42 -0800 Subject: [PATCH 5/5] Formatting change : moved ByteType after ShortType in JdbcUtils.makeGetter --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5b593fd6e775..922bef284c98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -438,6 +438,10 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setShort(pos, rs.getShort(pos + 1)) + case ByteType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, rs.getByte(pos + 1)) + case StringType => (rs: ResultSet, row: InternalRow, pos: Int) => // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 @@ -456,10 +460,6 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) - case ByteType => - (rs: ResultSet, row: InternalRow, pos: Int) => - row.update(pos, rs.getByte(pos + 1)) - case ArrayType(et, _) => val elementConversion = et match { case TimestampType =>