Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private[sql] trait SQLTestUtils
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
val withoutFilters = df.queryExecution.sparkPlan transform {
val withoutFilters = df.queryExecution.sparkPlan.transform {
case FilterExec(_, child) => child
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.hive.common.`type`.{HiveChar, HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.serde2.io.DateWritable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

/**
* Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down.
Expand Down Expand Up @@ -56,29 +55,35 @@ import org.apache.spark.sql.sources._
* known to be convertible.
*/
private[orc] object OrcFilters extends Logging {
def createFilter(filters: Array[Filter]): Option[SearchArgument] = {
def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = {
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap

// First, tries to convert each filter individually to see whether it's convertible, and then
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder())
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
} yield filter

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(And)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder())
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
}

private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = {
private def buildSearchArgument(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
def newBuilder = SearchArgumentFactory.newBuilder()

def isSearchableLiteral(value: Any): Boolean = value match {
// These are types recognized by the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
case _: String | _: Long | _: Double | _: Byte | _: Short | _: Integer | _: Float => true
case _: DateWritable | _: HiveDecimal | _: HiveChar | _: HiveVarchar => true
def isSearchableType(dataType: DataType): Boolean = dataType match {
// Only the values in the Spark types below can be recognized by
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
case ByteType | ShortType | FloatType | DoubleType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about BooleanType ?

Copy link
Member Author

@HyukjinKwon HyukjinKwon May 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedyu Let me test and will make a follow-up or another PR.

case IntegerType | LongType | StringType => true
Copy link
Member Author

@HyukjinKwon HyukjinKwon Apr 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself: this should be okay because CatalystTypeConverters.createToScalaConverter() is always called in DataSourceStrategy for the values in source.Filter. I checked all the cases and it seems there are no cases that they are converted to one of DateWritable | _: HiveDecimal | _: HiveChar | _: HiveVarchar.

As the all values in source.Filter are converted by DataType, this should be okay. I check the test codes and ParquetFilters is also doing this in this way as well.

+I had to do this because IsNull and IsNotNull do not have values in source.Filter so there is no way to check the types with the original isSearchableLiteral.

case _ => false
}

Expand All @@ -92,55 +97,55 @@ private[orc] object OrcFilters extends Logging {
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
_ <- buildSearchArgument(left, newBuilder)
_ <- buildSearchArgument(right, newBuilder)
lhs <- buildSearchArgument(left, builder.startAnd())
rhs <- buildSearchArgument(right, lhs)
_ <- buildSearchArgument(dataTypeMap, left, newBuilder)
_ <- buildSearchArgument(dataTypeMap, right, newBuilder)
lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd())
rhs <- buildSearchArgument(dataTypeMap, right, lhs)
} yield rhs.end()

case Or(left, right) =>
for {
_ <- buildSearchArgument(left, newBuilder)
_ <- buildSearchArgument(right, newBuilder)
lhs <- buildSearchArgument(left, builder.startOr())
rhs <- buildSearchArgument(right, lhs)
_ <- buildSearchArgument(dataTypeMap, left, newBuilder)
_ <- buildSearchArgument(dataTypeMap, right, newBuilder)
lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr())
rhs <- buildSearchArgument(dataTypeMap, right, lhs)
} yield rhs.end()

case Not(child) =>
for {
_ <- buildSearchArgument(child, newBuilder)
negate <- buildSearchArgument(child, builder.startNot())
_ <- buildSearchArgument(dataTypeMap, child, newBuilder)
negate <- buildSearchArgument(dataTypeMap, child, builder.startNot())
} yield negate.end()

// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).

case EqualTo(attribute, value) if isSearchableLiteral(value) =>
case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().equals(attribute, value).end())

case EqualNullSafe(attribute, value) if isSearchableLiteral(value) =>
case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().nullSafeEquals(attribute, value).end())

case LessThan(attribute, value) if isSearchableLiteral(value) =>
case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().lessThan(attribute, value).end())

case LessThanOrEqual(attribute, value) if isSearchableLiteral(value) =>
case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().lessThanEquals(attribute, value).end())

case GreaterThan(attribute, value) if isSearchableLiteral(value) =>
case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startNot().lessThanEquals(attribute, value).end())

case GreaterThanOrEqual(attribute, value) if isSearchableLiteral(value) =>
case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startNot().lessThan(attribute, value).end())

case IsNull(attribute) =>
case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().isNull(attribute).end())

case IsNotNull(attribute) =>
case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startNot().isNull(attribute).end())

case In(attribute, values) if values.forall(isSearchableLiteral) =>
case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().in(attribute, values.map(_.asInstanceOf[AnyRef]): _*).end())

case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[sql] class DefaultSource
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates
OrcFilters.createFilter(filters.toArray).foreach { f =>
OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f =>
hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
Expand Down Expand Up @@ -281,21 +281,22 @@ private[orc] case class OrcTableScan(
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val conf = job.getConfiguration

// Tries to push down filters if ORC filter push-down is enabled
if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(filters).foreach { f =>
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}

// Figure out the actual schema from the ORC source (without partition columns) so that we
// can pick the correct ordinals. Note that this assumes that all files have the same schema.
val orcFormat = new DefaultSource
val dataSchema =
orcFormat
.inferSchema(sparkSession, Map.empty, inputPaths)
.getOrElse(sys.error("Failed to read schema from target ORC files."))

// Tries to push down filters if ORC filter push-down is enabled
if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}

// Sets requested columns
OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.orc

import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -54,7 +55,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
assert(selectedFilters.nonEmpty, "No filter is pushed down")

val maybeFilter = OrcFilters.createFilter(selectedFilters.toArray)
val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters")
checker(maybeFilter.get)
}
Expand All @@ -78,10 +79,28 @@ class OrcFilterSuite extends QueryTest with OrcTest {
checkFilterPredicate(df, predicate, checkLogicalOperator)
}

test("filter pushdown - boolean") {
withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
}
private def checkNoFilterPredicate
(predicate: Predicate)
(implicit df: DataFrame): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))

var maybeRelation: Option[HadoopFsRelation] = None
val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) =>
maybeRelation = Some(orcRelation)
filters
}.flatten.reduceLeftOption(_ && _)
assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")

val (_, selectedFilters) =
DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
assert(selectedFilters.nonEmpty, "No filter is pushed down")

val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray)
assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters")
}

test("filter pushdown - integer") {
Expand Down Expand Up @@ -189,16 +208,6 @@ class OrcFilterSuite extends QueryTest with OrcTest {
}
}

test("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}

withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
}
}

test("filter pushdown - combinations with logical operators") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
// Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
Expand Down Expand Up @@ -238,4 +247,40 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
}
}

test("no filter pushdown - non-supported types") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
// ArrayType
withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
checkNoFilterPredicate('_1.isNull)
}
// DecimalType
withOrcDataFrame((1 to 4).map(i => Tuple1(BigDecimal.valueOf(i)))) { implicit df =>
checkNoFilterPredicate('_1 <= BigDecimal.valueOf(4))
}
// BinaryType
withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkNoFilterPredicate('_1 <=> 1.b)
}
// BooleanType
withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkNoFilterPredicate('_1 === true)
}
// TimestampType
val stringTimestamp = "2015-08-20 15:57:00"
withOrcDataFrame(Seq(Tuple1(Timestamp.valueOf(stringTimestamp)))) { implicit df =>
checkNoFilterPredicate('_1 <=> Timestamp.valueOf(stringTimestamp))
}
// DateType
val stringDate = "2015-01-01"
withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>
checkNoFilterPredicate('_1 === Date.valueOf(stringDate))
}
// MapType
withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df =>
checkNoFilterPredicate('_1.isNotNull)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,18 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
}

test("SPARK-14962 Produce correct results on array type with isnotnull") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(Array(i)))
withOrcFile(data) { file =>
val actual = sqlContext
.read
.orc(file)
.where("_1 is not null")
val expected = data.toDF()
checkAnswer(actual, expected)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

case class OrcData(intField: Int, stringField: String)

Expand Down Expand Up @@ -182,12 +183,16 @@ class OrcSourceSuite extends OrcSuite {

test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
// The `LessThan` should be converted while the `StringContains` shouldn't
val schema = new StructType(
Array(
StructField("a", IntegerType, nullable = true),
StructField("b", StringType, nullable = true)))
assertResult(
"""leaf-0 = (LESS_THAN a 10)
|expr = leaf-0
""".stripMargin.trim
) {
OrcFilters.createFilter(Array(
OrcFilters.createFilter(schema, Array(
LessThan("a", 10),
StringContains("b", "prefix")
)).get.toString
Expand All @@ -199,7 +204,7 @@ class OrcSourceSuite extends OrcSuite {
|expr = leaf-0
""".stripMargin.trim
) {
OrcFilters.createFilter(Array(
OrcFilters.createFilter(schema, Array(
LessThan("a", 10),
Not(And(
GreaterThan("a", 1),
Expand Down