Skip to content

Commit ff05e32

Browse files
sarutakHyukjinKwon
authored andcommitted
[SPARK-33813][SQL][3.0] Fix the issue that JDBC source can't treat MS SQL Server's spatial types
### What changes were proposed in this pull request? This PR backports SPARK-33813 (apache#31283). This PR fixes the issue that reading tables which contain spatial datatypes from MS SQL Server fails. MS SQL server supports two non-standard spatial JDBC types, `geometry` and `geography` but Spark SQL can't treat them ``` java.sql.SQLException: Unrecognized SQL type -157 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:251) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:366) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:355) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:355) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240) at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:381) ``` Considering the [data type mapping](https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-basic-data-types?view=sql-server-ver15) says, I think those spatial types can be mapped to Catalyst's `BinaryType`. ### Why are the changes needed? To provide better support. ### Does this PR introduce _any_ user-facing change? Yes. MS SQL Server users can use `geometry` and `geography` types in datasource tables. ### How was this patch tested? New test case added to `MsSqlServerIntegrationSuite`. Closes apache#31290 from sarutak/SPARK-33813-branch-3.0. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 224b7fa commit ff05e32

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,37 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
100100
|'the', 'lazy',
101101
|'dog')
102102
""".stripMargin).executeUpdate()
103+
conn.prepareStatement(
104+
"""
105+
|CREATE TABLE spatials (
106+
|point geometry,
107+
|line geometry,
108+
|circle geometry,
109+
|curve geography,
110+
|polygon geometry,
111+
|curve_polygon geography,
112+
|multi_point geometry,
113+
|multi_line geometry,
114+
|multi_polygon geometry,
115+
|geometry_collection geometry)
116+
""".stripMargin).executeUpdate()
117+
conn.prepareStatement(
118+
"""
119+
|INSERT INTO spatials VALUES (
120+
|'POINT(3 4 7 2.5)',
121+
|'LINESTRING(1 0, 0 1, -1 0)',
122+
|'CIRCULARSTRING(
123+
| -122.358 47.653, -122.348 47.649, -122.348 47.658, -122.358 47.658, -122.358 47.653)',
124+
|'COMPOUNDCURVE(
125+
| CIRCULARSTRING(-122.358 47.653, -122.348 47.649,
126+
| -122.348 47.658, -122.358 47.658, -122.358 47.653))',
127+
|'POLYGON((-20 -20, -20 20, 20 20, 20 -20, -20 -20), (10 0, 0 10, 0 -10, 10 0))',
128+
|'CURVEPOLYGON((-122.3 47, 122.3 47, 125.7 49, 121 38, -122.3 47))',
129+
|'MULTIPOINT((2 3), (7 8 9.5))',
130+
|'MULTILINESTRING((0 2, 1 1), (1 0, 1 1))',
131+
|'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 1)))',
132+
|'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 -5, -5 -1, -1 -1)))')
133+
""".stripMargin).executeUpdate()
103134
}
104135

105136
test("Basic test") {
@@ -225,4 +256,94 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
225256
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
226257
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
227258
}
259+
260+
test("SPARK-33813: MsSqlServerDialect should support spatial types") {
261+
val df = spark.read.jdbc(jdbcUrl, "spatials", new Properties)
262+
val rows = df.collect()
263+
assert(rows.length == 1)
264+
val row = rows(0)
265+
val types = row.toSeq.map(x => x.getClass.toString)
266+
assert(types.length == 10)
267+
assert(types(0) == "class [B")
268+
assert(row.getAs[Array[Byte]](0) ===
269+
Array(0, 0, 0, 0, 1, 15, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0,
270+
16, 64, 0, 0, 0, 0, 0, 0, 28, 64, 0, 0, 0, 0, 0, 0, 4, 64))
271+
assert(types(1) == "class [B")
272+
assert(row.getAs[Array[Byte]](1) ===
273+
Array[Byte](0, 0, 0, 0, 1, 4, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0,
274+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
275+
-16, 63, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
276+
0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 2))
277+
assert(types(2) == "class [B")
278+
assert(row.getAs[Array[Byte]](2) ===
279+
Array[Byte](0, 0, 0, 0, 2, 4, 5, 0, 0, 0, -12, -3, -44, 120, -23, -106,
280+
94, -64, -35, 36, 6, -127, -107, -45, 71, 64, -125, -64, -54, -95, 69,
281+
-106, 94, -64, 80, -115, -105, 110, 18, -45, 71, 64, -125, -64, -54,
282+
-95, 69, -106, 94, -64, 78, 98, 16, 88, 57, -44, 71, 64, -12, -3, -44,
283+
120, -23, -106, 94, -64, 78, 98, 16, 88, 57, -44, 71, 64, -12, -3, -44,
284+
120, -23, -106, 94, -64, -35, 36, 6, -127, -107, -45, 71, 64, 1, 0, 0,
285+
0, 2, 0, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 8))
286+
assert(types(3) == "class [B")
287+
assert(row.getAs[Array[Byte]](3) ===
288+
Array[Byte](-26, 16, 0, 0, 2, 4, 5, 0, 0, 0, -35, 36, 6, -127, -107, -45,
289+
71, 64, -12, -3, -44, 120, -23, -106, 94, -64, 80, -115, -105, 110, 18,
290+
-45, 71, 64, -125, -64, -54, -95, 69, -106, 94, -64, 78, 98, 16, 88, 57,
291+
-44, 71, 64, -125, -64, -54, -95, 69, -106, 94, -64, 78, 98, 16, 88, 57,
292+
-44, 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, -35, 36, 6, -127, -107,
293+
-45, 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, 1, 0, 0, 0, 3, 0, 0,
294+
0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 9, 2, 0, 0, 0, 3, 1))
295+
assert(types(5) == "class [B")
296+
assert(row.getAs[Array[Byte]](4) ===
297+
Array[Byte](0, 0, 0, 0, 1, 4, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0,
298+
0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, 64,
299+
0, 0, 0, 0, 0, 0, 52, 64, 0, 0, 0, 0, 0, 0, 52, 64, 0, 0, 0, 0, 0, 0, 52,
300+
64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0,
301+
0, 52, -64, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
302+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
303+
0, 0, 36, -64, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0,
304+
0, 2, 0, 0, 0, 0, 0, 5, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 3))
305+
assert(types(6) === "class [B")
306+
assert(row.getAs[Array[Byte]](5) ===
307+
Array[Byte](-26, 16, 0, 0, 2, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, -128, 71, 64, 51,
308+
51, 51, 51, 51, -109, 94, -64, 0, 0, 0, 0, 0, -128, 71, 64, 51, 51, 51, 51,
309+
51, -109, 94, 64, 0, 0, 0, 0, 0, -128, 72, 64, -51, -52, -52, -52, -52, 108,
310+
95, 64, 0, 0, 0, 0, 0, 0, 67, 64, 0, 0, 0, 0, 0, 64, 94, 64, 0, 0, 0, 0, 0,
311+
-128, 71, 64, 51, 51, 51, 51, 51, -109, 94, -64, 1, 0, 0, 0, 1, 0, 0, 0, 0,
312+
1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 10))
313+
assert(types(6) === "class [B")
314+
assert(row.getAs[Array[Byte]](6) ===
315+
Array[Byte](0, 0, 0, 0, 1, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0,
316+
0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 28, 64, 0, 0, 0, 0, 0, 0, 32, 64, 0, 0, 0, 0,
317+
0, 0, -8, -1, 0, 0, 0, 0, 0, 0, 35, 64, 2, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0,
318+
0, 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 1,
319+
0, 0, 0, 0, 1, 0, 0, 0, 1))
320+
assert(types(6) === "class [B")
321+
assert(row.getAs[Array[Byte]](7) ===
322+
Array[Byte](0, 0, 0, 0, 1, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
323+
0, 0, 0, 64, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0,
324+
0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0,
325+
0, 0, 0, 0, -16, 63, 2, 0, 0, 0, 1, 0, 0, 0, 0, 1, 2, 0, 0, 0, 3, 0, 0, 0,
326+
-1, -1, -1, -1, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, 0, 2))
327+
assert(types(6) === "class [B")
328+
assert(row.getAs[Array[Byte]](8) ===
329+
Array[Byte](0, 0, 0, 0, 1, 0, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0,
330+
0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0, 0,
331+
0, 0, 0, 0, -64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0,
332+
0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0,
333+
0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0,
334+
0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 8, 64, 0,
335+
0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, -16, 63,
336+
0, 0, 0, 0, 0, 0, -16, 63, 2, 0, 0, 0, 2, 0, 0, 0, 0, 2, 5, 0, 0, 0, 3, 0,
337+
0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 1, 0, 0, 0, 3))
338+
assert(types(6) === "class [B")
339+
assert(row.getAs[Array[Byte]](9) ===
340+
Array[Byte](0, 0, 0, 0, 1, 4, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0,
341+
0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0,
342+
0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, -16, -65,
343+
0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, 20,
344+
-64, 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0,
345+
-16, -65, 0, 0, 0, 0, 0, 0, -16, -65, 2, 0, 0, 0, 1, 0, 0, 0, 0, 2, 2, 0, 0,
346+
0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0,
347+
0, 0, 0, 1, 0, 0, 0, 3))
348+
}
228349
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ import org.apache.spark.sql.types._
2525

2626
private object MsSqlServerDialect extends JdbcDialect {
2727

28+
// Special JDBC types in Microsoft SQL Server.
29+
// https://github.com/microsoft/mssql-jdbc/blob/v7.2.1/src/main/java/microsoft/sql/Types.java
30+
private object SpecificTypes {
31+
val GEOMETRY = -157
32+
val GEOGRAPHY = -158
33+
}
34+
2835
override def canHandle(url: String): Boolean =
2936
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
3037

@@ -40,6 +47,7 @@ private object MsSqlServerDialect extends JdbcDialect {
4047
sqlType match {
4148
case java.sql.Types.SMALLINT => Some(ShortType)
4249
case java.sql.Types.REAL => Some(FloatType)
50+
case SpecificTypes.GEOMETRY | SpecificTypes.GEOGRAPHY => Some(BinaryType)
4351
case _ => None
4452
}
4553
}

0 commit comments

Comments
 (0)