Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.spark.sql.parquet

import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}

import org.apache.hadoop.conf.Configuration

import parquet.common.schema.ColumnPath
import parquet.filter2.compat.FilterCompat
import parquet.filter2.compat.FilterCompat._
import parquet.filter2.predicate.Operators.{Column, SupportsLtGt}
import parquet.filter2.predicate.{FilterApi, FilterPredicate}
import parquet.filter2.predicate.FilterApi._
import parquet.io.api.Binary
Expand All @@ -35,11 +33,9 @@ import com.google.common.io.BaseEncoding

import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.parquet.ParquetColumns._

private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
Expand All @@ -64,16 +60,6 @@ private[sql] object ParquetFilters {
name,
literal.value.asInstanceOf[Boolean],
predicate)
case ByteType =>
new ComparisonFilter(
name,
FilterApi.eq(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]),
predicate)
case ShortType =>
new ComparisonFilter(
name,
FilterApi.eq(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]),
predicate)
case IntegerType =>
new ComparisonFilter(
name,
Expand All @@ -99,43 +85,12 @@ private[sql] object ParquetFilters {
name,
literal.value.asInstanceOf[String],
predicate)
case BinaryType =>
ComparisonFilter.createBinaryEqualityFilter(
name,
literal.value.asInstanceOf[Array[Byte]],
predicate)
case DateType =>
new ComparisonFilter(
name,
FilterApi.eq(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])),
predicate)
case TimestampType =>
new ComparisonFilter(
name,
FilterApi.eq(timestampColumn(name),
new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])),
predicate)
case DecimalType.Unlimited =>
new ComparisonFilter(
name,
FilterApi.eq(decimalColumn(name), literal.value.asInstanceOf[Decimal]),
predicate)
}

def createLessThanFilter(
name: String,
literal: Literal,
predicate: CatalystPredicate) = literal.dataType match {
case ByteType =>
new ComparisonFilter(
name,
FilterApi.lt(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]),
predicate)
case ShortType =>
new ComparisonFilter(
name,
FilterApi.lt(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]),
predicate)
case IntegerType =>
new ComparisonFilter(
name,
Expand All @@ -161,42 +116,11 @@ private[sql] object ParquetFilters {
name,
literal.value.asInstanceOf[String],
predicate)
case BinaryType =>
ComparisonFilter.createBinaryLessThanFilter(
name,
literal.value.asInstanceOf[Array[Byte]],
predicate)
case DateType =>
new ComparisonFilter(
name,
FilterApi.lt(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])),
predicate)
case TimestampType =>
new ComparisonFilter(
name,
FilterApi.lt(timestampColumn(name),
new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])),
predicate)
case DecimalType.Unlimited =>
new ComparisonFilter(
name,
FilterApi.lt(decimalColumn(name), literal.value.asInstanceOf[Decimal]),
predicate)
}
def createLessThanOrEqualFilter(
name: String,
literal: Literal,
predicate: CatalystPredicate) = literal.dataType match {
case ByteType =>
new ComparisonFilter(
name,
FilterApi.ltEq(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]),
predicate)
case ShortType =>
new ComparisonFilter(
name,
FilterApi.ltEq(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]),
predicate)
case IntegerType =>
new ComparisonFilter(
name,
Expand All @@ -222,43 +146,12 @@ private[sql] object ParquetFilters {
name,
literal.value.asInstanceOf[String],
predicate)
case BinaryType =>
ComparisonFilter.createBinaryLessThanOrEqualFilter(
name,
literal.value.asInstanceOf[Array[Byte]],
predicate)
case DateType =>
new ComparisonFilter(
name,
FilterApi.ltEq(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])),
predicate)
case TimestampType =>
new ComparisonFilter(
name,
FilterApi.ltEq(timestampColumn(name),
new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])),
predicate)
case DecimalType.Unlimited =>
new ComparisonFilter(
name,
FilterApi.ltEq(decimalColumn(name), literal.value.asInstanceOf[Decimal]),
predicate)
}
// TODO: combine these two types somehow?
def createGreaterThanFilter(
name: String,
literal: Literal,
predicate: CatalystPredicate) = literal.dataType match {
case ByteType =>
new ComparisonFilter(
name,
FilterApi.gt(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]),
predicate)
case ShortType =>
new ComparisonFilter(
name,
FilterApi.gt(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]),
predicate)
case IntegerType =>
new ComparisonFilter(
name,
Expand All @@ -284,42 +177,11 @@ private[sql] object ParquetFilters {
name,
literal.value.asInstanceOf[String],
predicate)
case BinaryType =>
ComparisonFilter.createBinaryGreaterThanFilter(
name,
literal.value.asInstanceOf[Array[Byte]],
predicate)
case DateType =>
new ComparisonFilter(
name,
FilterApi.gt(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])),
predicate)
case TimestampType =>
new ComparisonFilter(
name,
FilterApi.gt(timestampColumn(name),
new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])),
predicate)
case DecimalType.Unlimited =>
new ComparisonFilter(
name,
FilterApi.gt(decimalColumn(name), literal.value.asInstanceOf[Decimal]),
predicate)
}
def createGreaterThanOrEqualFilter(
name: String,
literal: Literal,
predicate: CatalystPredicate) = literal.dataType match {
case ByteType =>
new ComparisonFilter(
name,
FilterApi.gtEq(byteColumn(name), literal.value.asInstanceOf[java.lang.Byte]),
predicate)
case ShortType =>
new ComparisonFilter(
name,
FilterApi.gtEq(shortColumn(name), literal.value.asInstanceOf[java.lang.Short]),
predicate)
case IntegerType =>
new ComparisonFilter(
name,
Expand All @@ -345,27 +207,6 @@ private[sql] object ParquetFilters {
name,
literal.value.asInstanceOf[String],
predicate)
case BinaryType =>
ComparisonFilter.createBinaryGreaterThanOrEqualFilter(
name,
literal.value.asInstanceOf[Array[Byte]],
predicate)
case DateType =>
new ComparisonFilter(
name,
FilterApi.gtEq(dateColumn(name), new WrappedDate(literal.value.asInstanceOf[Date])),
predicate)
case TimestampType =>
new ComparisonFilter(
name,
FilterApi.gtEq(timestampColumn(name),
new WrappedTimestamp(literal.value.asInstanceOf[Timestamp])),
predicate)
case DecimalType.Unlimited =>
new ComparisonFilter(
name,
FilterApi.gtEq(decimalColumn(name), literal.value.asInstanceOf[Decimal]),
predicate)
}

/**
Expand Down Expand Up @@ -595,102 +436,5 @@ private[parquet] object ComparisonFilter {
columnName,
FilterApi.gtEq(binaryColumn(columnName), Binary.fromString(value)),
predicate)

def createBinaryEqualityFilter(
columnName: String,
value: Array[Byte],
predicate: CatalystPredicate): CatalystFilter =
new ComparisonFilter(
columnName,
FilterApi.eq(binaryColumn(columnName), Binary.fromByteArray(value)),
predicate)

def createBinaryLessThanFilter(
columnName: String,
value: Array[Byte],
predicate: CatalystPredicate): CatalystFilter =
new ComparisonFilter(
columnName,
FilterApi.lt(binaryColumn(columnName), Binary.fromByteArray(value)),
predicate)

def createBinaryLessThanOrEqualFilter(
columnName: String,
value: Array[Byte],
predicate: CatalystPredicate): CatalystFilter =
new ComparisonFilter(
columnName,
FilterApi.ltEq(binaryColumn(columnName), Binary.fromByteArray(value)),
predicate)

def createBinaryGreaterThanFilter(
columnName: String,
value: Array[Byte],
predicate: CatalystPredicate): CatalystFilter =
new ComparisonFilter(
columnName,
FilterApi.gt(binaryColumn(columnName), Binary.fromByteArray(value)),
predicate)

def createBinaryGreaterThanOrEqualFilter(
columnName: String,
value: Array[Byte],
predicate: CatalystPredicate): CatalystFilter =
new ComparisonFilter(
columnName,
FilterApi.gtEq(binaryColumn(columnName), Binary.fromByteArray(value)),
predicate)
}

private[spark] object ParquetColumns {

def byteColumn(columnPath: String): ByteColumn = {
new ByteColumn(ColumnPath.fromDotString(columnPath))
}

final class ByteColumn(columnPath: ColumnPath)
extends Column[java.lang.Byte](columnPath, classOf[java.lang.Byte]) with SupportsLtGt

def shortColumn(columnPath: String): ShortColumn = {
new ShortColumn(ColumnPath.fromDotString(columnPath))
}

final class ShortColumn(columnPath: ColumnPath)
extends Column[java.lang.Short](columnPath, classOf[java.lang.Short]) with SupportsLtGt


def dateColumn(columnPath: String): DateColumn = {
new DateColumn(ColumnPath.fromDotString(columnPath))
}

final class DateColumn(columnPath: ColumnPath)
extends Column[WrappedDate](columnPath, classOf[WrappedDate]) with SupportsLtGt

def timestampColumn(columnPath: String): TimestampColumn = {
new TimestampColumn(ColumnPath.fromDotString(columnPath))
}

final class TimestampColumn(columnPath: ColumnPath)
extends Column[WrappedTimestamp](columnPath, classOf[WrappedTimestamp]) with SupportsLtGt

def decimalColumn(columnPath: String): DecimalColumn = {
new DecimalColumn(ColumnPath.fromDotString(columnPath))
}

final class DecimalColumn(columnPath: ColumnPath)
extends Column[Decimal](columnPath, classOf[Decimal]) with SupportsLtGt

final class WrappedDate(val date: Date) extends Comparable[WrappedDate] {

override def compareTo(other: WrappedDate): Int = {
date.compareTo(other.date)
}
}

final class WrappedTimestamp(val timestamp: Timestamp) extends Comparable[WrappedTimestamp] {

override def compareTo(other: WrappedTimestamp): Int = {
timestamp.compareTo(other.timestamp)
}
}
}