diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 1e67799e8399..e0b73e2e52f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql.parquet import java.nio.ByteBuffer -import java.sql.{Date, Timestamp} import org.apache.hadoop.conf.Configuration import parquet.common.schema.ColumnPath import parquet.filter2.compat.FilterCompat import parquet.filter2.compat.FilterCompat._ -import parquet.filter2.predicate.Operators.{Column, SupportsLtGt} import parquet.filter2.predicate.{FilterApi, FilterPredicate} import parquet.filter2.predicate.FilterApi._ import parquet.io.api.Binary @@ -35,11 +33,9 @@ import com.google.common.io.BaseEncoding import org.apache.spark.SparkEnv import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.catalyst.types.decimal.Decimal import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer -import org.apache.spark.sql.parquet.ParquetColumns._ private[sql] object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" @@ -64,16 +60,6 @@ private[sql] object ParquetFilters { name, literal.value.asInstanceOf[Boolean], predicate) - case ByteType => - new ComparisonFilter( - name, - FilterApi.eq(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]), - predicate) - case ShortType => - new ComparisonFilter( - name, - FilterApi.eq(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]), - predicate) case IntegerType => new ComparisonFilter( name, @@ -99,43 +85,12 @@ private[sql] object ParquetFilters { name, literal.value.asInstanceOf[String], predicate) - case BinaryType => - ComparisonFilter.createBinaryEqualityFilter( - name, - literal.value.asInstanceOf[Array[Byte]], - predicate) - case DateType => - new ComparisonFilter( - name, - FilterApi.eq(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])), - predicate) - case TimestampType => - new ComparisonFilter( - name, - FilterApi.eq(timestampColumn(name), - new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])), - predicate) - case DecimalType.Unlimited => - new ComparisonFilter( - name, - FilterApi.eq(decimalColumn(name), literal.value.asInstanceOf[Decimal]), - predicate) } def createLessThanFilter( name: String, literal: Literal, predicate: CatalystPredicate) = literal.dataType match { - case ByteType => - new ComparisonFilter( - name, - FilterApi.lt(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]), - predicate) - case ShortType => - new ComparisonFilter( - name, - FilterApi.lt(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]), - predicate) case IntegerType => new ComparisonFilter( name, @@ -161,42 +116,11 @@ private[sql] object ParquetFilters { name, literal.value.asInstanceOf[String], predicate) - case BinaryType => - ComparisonFilter.createBinaryLessThanFilter( - name, - literal.value.asInstanceOf[Array[Byte]], - predicate) - case DateType => - new ComparisonFilter( - name, - FilterApi.lt(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])), - predicate) - case TimestampType => - new ComparisonFilter( - name, - FilterApi.lt(timestampColumn(name), - new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])), - predicate) - case DecimalType.Unlimited => - new ComparisonFilter( - name, - FilterApi.lt(decimalColumn(name), literal.value.asInstanceOf[Decimal]), - predicate) } def createLessThanOrEqualFilter( name: String, literal: Literal, predicate: CatalystPredicate) = literal.dataType match { - case ByteType => - new ComparisonFilter( - name, - FilterApi.ltEq(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]), - predicate) - case ShortType => - new ComparisonFilter( - name, - FilterApi.ltEq(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]), - predicate) case IntegerType => new ComparisonFilter( name, @@ -222,43 +146,12 @@ private[sql] object ParquetFilters { name, literal.value.asInstanceOf[String], predicate) - case BinaryType => - ComparisonFilter.createBinaryLessThanOrEqualFilter( - name, - literal.value.asInstanceOf[Array[Byte]], - predicate) - case DateType => - new ComparisonFilter( - name, - FilterApi.ltEq(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])), - predicate) - case TimestampType => - new ComparisonFilter( - name, - FilterApi.ltEq(timestampColumn(name), - new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])), - predicate) - case DecimalType.Unlimited => - new ComparisonFilter( - name, - FilterApi.ltEq(decimalColumn(name), literal.value.asInstanceOf[Decimal]), - predicate) } // TODO: combine these two types somehow? def createGreaterThanFilter( name: String, literal: Literal, predicate: CatalystPredicate) = literal.dataType match { - case ByteType => - new ComparisonFilter( - name, - FilterApi.gt(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]), - predicate) - case ShortType => - new ComparisonFilter( - name, - FilterApi.gt(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]), - predicate) case IntegerType => new ComparisonFilter( name, @@ -284,42 +177,11 @@ private[sql] object ParquetFilters { name, literal.value.asInstanceOf[String], predicate) - case BinaryType => - ComparisonFilter.createBinaryGreaterThanFilter( - name, - literal.value.asInstanceOf[Array[Byte]], - predicate) - case DateType => - new ComparisonFilter( - name, - FilterApi.gt(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])), - predicate) - case TimestampType => - new ComparisonFilter( - name, - FilterApi.gt(timestampColumn(name), - new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])), - predicate) - case DecimalType.Unlimited => - new ComparisonFilter( - name, - FilterApi.gt(decimalColumn(name), literal.value.asInstanceOf[Decimal]), - predicate) } def createGreaterThanOrEqualFilter( name: String, literal: Literal, predicate: CatalystPredicate) = literal.dataType match { - case ByteType => - new ComparisonFilter( - name, - FilterApi.gtEq(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]), - predicate) - case ShortType => - new ComparisonFilter( - name, - FilterApi.gtEq(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]), - predicate) case IntegerType => new ComparisonFilter( name, @@ -345,27 +207,6 @@ private[sql] object ParquetFilters { name, literal.value.asInstanceOf[String], predicate) - case BinaryType => - ComparisonFilter.createBinaryGreaterThanOrEqualFilter( - name, - literal.value.asInstanceOf[Array[Byte]], - predicate) - case DateType => - new ComparisonFilter( - name, - FilterApi.gtEq(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])), - predicate) - case TimestampType => - new ComparisonFilter( - name, - FilterApi.gtEq(timestampColumn(name), - new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])), - predicate) - case DecimalType.Unlimited => - new ComparisonFilter( - name, - FilterApi.gtEq(decimalColumn(name), literal.value.asInstanceOf[Decimal]), - predicate) } /** @@ -595,102 +436,5 @@ private[parquet] object ComparisonFilter { columnName, FilterApi.gtEq(binaryColumn(columnName), Binary.fromString(value)), predicate) - - def createBinaryEqualityFilter( - columnName: String, - value: Array[Byte], - predicate: CatalystPredicate): CatalystFilter = - new ComparisonFilter( - columnName, - FilterApi.eq(binaryColumn(columnName), Binary.fromByteArray(value)), - predicate) - - def createBinaryLessThanFilter( - columnName: String, - value: Array[Byte], - predicate: CatalystPredicate): CatalystFilter = - new ComparisonFilter( - columnName, - FilterApi.lt(binaryColumn(columnName), Binary.fromByteArray(value)), - predicate) - - def createBinaryLessThanOrEqualFilter( - columnName: String, - value: Array[Byte], - predicate: CatalystPredicate): CatalystFilter = - new ComparisonFilter( - columnName, - FilterApi.ltEq(binaryColumn(columnName), Binary.fromByteArray(value)), - predicate) - - def createBinaryGreaterThanFilter( - columnName: String, - value: Array[Byte], - predicate: CatalystPredicate): CatalystFilter = - new ComparisonFilter( - columnName, - FilterApi.gt(binaryColumn(columnName), Binary.fromByteArray(value)), - predicate) - - def createBinaryGreaterThanOrEqualFilter( - columnName: String, - value: Array[Byte], - predicate: CatalystPredicate): CatalystFilter = - new ComparisonFilter( - columnName, - FilterApi.gtEq(binaryColumn(columnName), Binary.fromByteArray(value)), - predicate) } -private[spark] object ParquetColumns { - - def byteColumn(columnPath: String): ByteColumn = { - new ByteColumn(ColumnPath.fromDotString(columnPath)) - } - - final class ByteColumn(columnPath: ColumnPath) - extends Column[java.lang.Byte](columnPath, classOf[java.lang.Byte]) with SupportsLtGt - - def shortColumn(columnPath: String): ShortColumn = { - new ShortColumn(ColumnPath.fromDotString(columnPath)) - } - - final class ShortColumn(columnPath: ColumnPath) - extends Column[java.lang.Short](columnPath, classOf[java.lang.Short]) with SupportsLtGt - - - def dateColumn(columnPath: String): DateColumn = { - new DateColumn(ColumnPath.fromDotString(columnPath)) - } - - final class DateColumn(columnPath: ColumnPath) - extends Column[WrappedDate](columnPath, classOf[WrappedDate]) with SupportsLtGt - - def timestampColumn(columnPath: String): TimestampColumn = { - new TimestampColumn(ColumnPath.fromDotString(columnPath)) - } - - final class TimestampColumn(columnPath: ColumnPath) - extends Column[WrappedTimestamp](columnPath, classOf[WrappedTimestamp]) with SupportsLtGt - - def decimalColumn(columnPath: String): DecimalColumn = { - new DecimalColumn(ColumnPath.fromDotString(columnPath)) - } - - final class DecimalColumn(columnPath: ColumnPath) - extends Column[Decimal](columnPath, classOf[Decimal]) with SupportsLtGt - - final class WrappedDate(val date: Date) extends Comparable[WrappedDate] { - - override def compareTo(other: WrappedDate): Int = { - date.compareTo(other.date) - } - } - - final class WrappedTimestamp(val timestamp: Timestamp) extends Comparable[WrappedTimestamp] { - - override def compareTo(other: WrappedTimestamp): Int = { - timestamp.compareTo(other.timestamp) - } - } -}