Skip to content

Commit c21c6b3

Browse files
uros-dbhuangxiaopingRD
authored andcommitted
[SPARK-53922][GEO][SQL] Introduce physical Geometry and Geography types
### What changes were proposed in this pull request? Introduce two new physical types to Spark: - `PhysicalGeographyType` - `PhysicalGeometryType` This PR also adds appropriate mapping from the logical geospatial types (introduced in: apache#52491) to the new physical types. ### Why are the changes needed? Extending the implementation of GEOMETRY and GEOGRAPHY types in Spark, laying the groundwork for full geospatial data type support. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new tests to: - `GeographyValSuite` - `GeometryValSuite` Also, added appropriate test cases to: - `GeographyTypeSuite` - `GeographyTypeSuite` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52629 from uros-db/geo-physical-types. Authored-by: Uros Bojanic <uros.bojanic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 5a09025 commit c21c6b3

File tree

7 files changed

+260
-2
lines changed

7 files changed

+260
-2
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.types;
19+
20+
import java.io.Serializable;
21+
22+
// This class represents the physical type for the GEOGRAPHY data type.
23+
public final class GeographyVal implements Comparable<GeographyVal>, Serializable {
24+
25+
// The GEOGRAPHY type is implemented as a byte array. We provide `getBytes` and `fromBytes`
26+
// methods for readers and writers to access this underlying array of bytes.
27+
private final byte[] value;
28+
29+
// We make the constructor private. We should use `fromBytes` to create new instances.
30+
private GeographyVal(byte[] value) {
31+
this.value = value;
32+
}
33+
34+
public byte[] getBytes() {
35+
return value;
36+
}
37+
38+
public static GeographyVal fromBytes(byte[] bytes) {
39+
if (bytes == null) {
40+
return null;
41+
} else {
42+
return new GeographyVal(bytes);
43+
}
44+
}
45+
46+
// Comparison is not yet supported for GEOGRAPHY.
47+
public int compareTo(GeographyVal g) {
48+
throw new UnsupportedOperationException();
49+
}
50+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.types;
19+
20+
import java.io.Serializable;
21+
22+
// This class represents the physical type for the GEOMETRY data type.
23+
public final class GeometryVal implements Comparable<GeometryVal>, Serializable {
24+
25+
// The GEOMETRY type is implemented as a byte array. We provide `getBytes` and `fromBytes`
26+
// methods for readers and writers to access this underlying array of bytes.
27+
private final byte[] value;
28+
29+
// We make the constructor private. We should use `fromBytes` to create new instances.
30+
private GeometryVal(byte[] value) {
31+
this.value = value;
32+
}
33+
34+
public byte[] getBytes() {
35+
return value;
36+
}
37+
38+
public static GeometryVal fromBytes(byte[] bytes) {
39+
if (bytes == null) {
40+
return null;
41+
} else {
42+
return new GeometryVal(bytes);
43+
}
44+
}
45+
46+
// Comparison is not yet supported for GEOMETRY.
47+
public int compareTo(GeometryVal g) {
48+
throw new UnsupportedOperationException();
49+
}
50+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.types;
19+
20+
import org.junit.jupiter.api.Test;
21+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
23+
import static org.junit.jupiter.api.Assertions.assertNull;
24+
25+
public class GeographyValSuite {
26+
27+
@Test
28+
public void roundTripBytes() {
29+
// A simple byte array to test the round trip (`fromBytes` -> `getBytes`).
30+
byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6 };
31+
GeographyVal geographyVal = GeographyVal.fromBytes(bytes);
32+
assertNotNull(geographyVal);
33+
assertArrayEquals(bytes, geographyVal.getBytes());
34+
}
35+
36+
@Test
37+
public void roundNullHandling() {
38+
// A simple null byte array to test null handling for GEOGRAPHY.
39+
byte[] bytes = null;
40+
GeographyVal geographyVal = GeographyVal.fromBytes(bytes);
41+
assertNull(geographyVal);
42+
}
43+
44+
@Test
45+
public void testCompareTo() {
46+
// Comparison is not yet supported for GEOGRAPHY.
47+
byte[] bytes1 = new byte[] { 1, 2, 3 };
48+
byte[] bytes2 = new byte[] { 4, 5, 6 };
49+
GeographyVal geographyVal1 = GeographyVal.fromBytes(bytes1);
50+
GeographyVal geographyVal2 = GeographyVal.fromBytes(bytes2);
51+
try {
52+
geographyVal1.compareTo(geographyVal2);
53+
} catch (UnsupportedOperationException e) {
54+
assert(e.toString().equals("java.lang.UnsupportedOperationException"));
55+
}
56+
}
57+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.types;
19+
20+
import org.junit.jupiter.api.Test;
21+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
23+
import static org.junit.jupiter.api.Assertions.assertNull;
24+
25+
public class GeometryValSuite {
26+
27+
@Test
28+
public void roundTripBytes() {
29+
// A simple byte array to test the round trip (`fromBytes` -> `getBytes`).
30+
byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6 };
31+
GeometryVal geometryVal = GeometryVal.fromBytes(bytes);
32+
assertNotNull(geometryVal);
33+
assertArrayEquals(bytes, geometryVal.getBytes());
34+
}
35+
36+
@Test
37+
public void roundNullHandling() {
38+
// A simple null byte array to test null handling for GEOMETRY.
39+
byte[] bytes = null;
40+
GeometryVal geometryVal = GeometryVal.fromBytes(bytes);
41+
assertNull(geometryVal);
42+
}
43+
44+
@Test
45+
public void testCompareTo() {
46+
// Comparison is not yet supported for GEOMETRY.
47+
byte[] bytes1 = new byte[] { 1, 2, 3 };
48+
byte[] bytes2 = new byte[] { 4, 5, 6 };
49+
GeometryVal geometryVal1 = GeometryVal.fromBytes(bytes1);
50+
GeometryVal geometryVal2 = GeometryVal.fromBytes(bytes2);
51+
try {
52+
geometryVal1.compareTo(geometryVal2);
53+
} catch (UnsupportedOperationException e) {
54+
assert(e.toString().equals("java.lang.UnsupportedOperationException"));
55+
}
56+
}
57+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import scala.reflect.runtime.universe.typeTag
2323
import org.apache.spark.sql.catalyst.expressions.{Ascending, BoundReference, InterpretedOrdering, SortOrder}
2424
import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, MapData, SQLOrderingUtil}
2525
import org.apache.spark.sql.errors.QueryExecutionErrors
26-
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType}
27-
import org.apache.spark.unsafe.types.{ByteArray, UTF8String, VariantVal}
26+
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType}
27+
import org.apache.spark.unsafe.types.{ByteArray, GeographyVal, GeometryVal, UTF8String, VariantVal}
2828
import org.apache.spark.util.ArrayImplicits._
2929

3030
sealed abstract class PhysicalDataType {
@@ -59,6 +59,8 @@ object PhysicalDataType {
5959
case StructType(fields) => PhysicalStructType(fields)
6060
case MapType(keyType, valueType, valueContainsNull) =>
6161
PhysicalMapType(keyType, valueType, valueContainsNull)
62+
case _: GeometryType => PhysicalGeometryType
63+
case _: GeographyType => PhysicalGeographyType
6264
case VariantType => PhysicalVariantType
6365
case _ => UninitializedPhysicalType
6466
}
@@ -411,3 +413,19 @@ object UninitializedPhysicalType extends PhysicalDataType {
411413
override private[sql] type InternalType = Any
412414
@transient private[sql] lazy val tag = typeTag[InternalType]
413415
}
416+
417+
case class PhysicalGeographyType() extends PhysicalDataType {
418+
private[sql] type InternalType = GeographyVal
419+
@transient private[sql] lazy val tag = typeTag[InternalType]
420+
private[sql] val ordering = implicitly[Ordering[InternalType]]
421+
}
422+
423+
object PhysicalGeographyType extends PhysicalGeographyType
424+
425+
case class PhysicalGeometryType() extends PhysicalDataType {
426+
private[sql] type InternalType = GeometryVal
427+
@transient private[sql] lazy val tag = typeTag[InternalType]
428+
private[sql] val ordering = implicitly[Ordering[InternalType]]
429+
}
430+
431+
object PhysicalGeometryType extends PhysicalGeometryType

sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.json4s.JsonAST.JString
2323

2424
import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.SparkIllegalArgumentException
26+
import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalGeographyType}
2627

2728
class GeographyTypeSuite extends SparkFunSuite {
2829

@@ -216,4 +217,15 @@ class GeographyTypeSuite extends SparkFunSuite {
216217
}
217218
}
218219
}
220+
221+
test("PhysicalDataType maps GeographyType to PhysicalGeographyType") {
222+
val geometryTypes: Seq[DataType] = Seq(
223+
GeographyType(4326),
224+
GeographyType("ANY")
225+
)
226+
geometryTypes.foreach { geometryType =>
227+
val pdt = PhysicalDataType(geometryType)
228+
assert(pdt.isInstanceOf[PhysicalGeographyType])
229+
}
230+
}
219231
}

sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.json4s.JsonAST.JString
2323

2424
import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.SparkIllegalArgumentException
26+
import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalGeometryType}
2627

2728
class GeometryTypeSuite extends SparkFunSuite {
2829

@@ -200,4 +201,17 @@ class GeometryTypeSuite extends SparkFunSuite {
200201
}
201202
}
202203
}
204+
205+
test("PhysicalDataType maps GeometryType to PhysicalGeometryType") {
206+
val geometryTypes: Seq[DataType] = Seq(
207+
GeometryType(0),
208+
GeometryType(3857),
209+
GeometryType(4326),
210+
GeometryType("ANY")
211+
)
212+
geometryTypes.foreach { geometryType =>
213+
val pdt = PhysicalDataType(geometryType)
214+
assert(pdt.isInstanceOf[PhysicalGeometryType])
215+
}
216+
}
203217
}

0 commit comments

Comments
 (0)