Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
82e2f09
Fix part of undocumented/duplicated arguments warnings by CRAN-check
junyangq Aug 9, 2016
41d9dca
[SPARK-16950] [PYSPARK] fromOffsets parameter support in KafkaUtils.c…
Aug 9, 2016
44115e9
[SPARK-16956] Make ApplicationState.MAX_NUM_RETRY configurable
JoshRosen Aug 9, 2016
2d136db
[SPARK-16905] SQL DDL: MSCK REPAIR TABLE
Aug 9, 2016
901edbb
More fixes of the docs.
junyangq Aug 10, 2016
475ee38
Fixed typo
jupblb Aug 10, 2016
2285de7
[SPARK-16522][MESOS] Spark application throws exception on exit.
sun-rui Aug 10, 2016
20efb79
[SPARK-16324][SQL] regexp_extract should doc that it returns empty st…
srowen Aug 10, 2016
719ac5f
[SPARK-15899][SQL] Fix the construction of the file path with hadoop …
avulanov Aug 10, 2016
15637f7
Revert "[SPARK-15899][SQL] Fix the construction of the file path with…
srowen Aug 10, 2016
977fbbf
[SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level…
viirya Aug 10, 2016
d3a30d2
[SPARK-16579][SPARKR] add install.spark function
junyangq Aug 10, 2016
1e40135
[SPARK-17010][MINOR][DOC] Wrong description in memory management docu…
WangTaoTheTonic Aug 11, 2016
8611bc2
[SPARK-16866][SQL] Infrastructure for file-based SQL end-to-end tests
petermaxlee Aug 10, 2016
51b1016
[SPARK-17008][SPARK-17009][SQL] Normalization and isolation in SQLQue…
petermaxlee Aug 11, 2016
ea8a198
[SPARK-17007][SQL] Move test data files into a test-data folder
petermaxlee Aug 11, 2016
4b434e7
[SPARK-17011][SQL] Support testing exceptions in SQLQueryTestSuite
petermaxlee Aug 11, 2016
0ed6236
Correct example value for spark.ssl.YYY.XXX settings
ash211 Aug 11, 2016
33a213f
[SPARK-15899][SQL] Fix the construction of the file path with hadoop …
avulanov Aug 11, 2016
b87ba8f
Fix remaining undocumented/duplicated warnings
junyangq Aug 11, 2016
6bf20cd
[SPARK-17015][SQL] group-by/order-by ordinal and arithmetic tests
petermaxlee Aug 11, 2016
bc683f0
[SPARK-17018][SQL] literals.sql for testing literal parsing
petermaxlee Aug 11, 2016
0fb0149
[SPARK-17022][YARN] Handle potential deadlock in driver handling mess…
WangTaoTheTonic Aug 11, 2016
d2c1d64
Keep to the convention where we have docs for generic and the function.
junyangq Aug 12, 2016
b4047fc
[SPARK-16975][SQL] Column-partition path starting '_' should be handl…
dongjoon-hyun Aug 12, 2016
bde94cd
[SPARK-17013][SQL] Parse negative numeric literals
petermaxlee Aug 12, 2016
38378f5
[SPARK-12370][DOCUMENTATION] Documentation should link to examples …
jagadeesanas2 Aug 13, 2016
a21ecc9
[SPARK-17023][BUILD] Upgrade to Kafka 0.10.0.1 release
lresende Aug 13, 2016
750f880
[SPARK-16966][SQL][CORE] App Name is a randomUUID even when "spark.ap…
srowen Aug 13, 2016
e02d0d0
[SPARK-17027][ML] Avoid integer overflow in PolynomialExpansion.getPo…
zero323 Aug 14, 2016
8f4cacd
[SPARK-16508][SPARKR] Split docs for arrange and orderBy methods
junyangq Aug 15, 2016
4503632
[SPARK-17065][SQL] Improve the error message when encountering an inc…
zsxwing Aug 15, 2016
e5771a1
Fix docs for window functions
junyangq Aug 16, 2016
2e2c787
[SPARK-16964][SQL] Remove private[hive] from sql.hive.execution package
hvanhovell Aug 16, 2016
237ae54
Revert "[SPARK-16964][SQL] Remove private[hive] from sql.hive.executi…
rxin Aug 16, 2016
1c56971
[SPARK-16964][SQL] Remove private[sql] and private[spark] from sql.ex…
hvanhovell Aug 16, 2016
022230c
[SPARK-16519][SPARKR] Handle SparkR RDD generics that create warnings…
felixcheung Aug 16, 2016
6cb3eab
[SPARK-17089][DOCS] Remove api doc link for mapReduceTriplets operator
phalodi Aug 16, 2016
3e0163b
[SPARK-17084][SQL] Rename ParserUtils.assert to validate
hvanhovell Aug 17, 2016
68a24d3
[MINOR][DOC] Fix the descriptions for `properties` argument in the do…
Aug 17, 2016
22c7660
[SPARK-15285][SQL] Generated SpecificSafeProjection.apply method grow…
kiszk Aug 17, 2016
394d598
[SPARK-17102][SQL] bypass UserDefinedGenerator for json format check
cloud-fan Aug 17, 2016
9406f82
[SPARK-17096][SQL][STREAMING] Improve exception string reported throu…
tdas Aug 17, 2016
585d1d9
[SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceive…
keypointt Aug 17, 2016
91aa532
[SPARK-16995][SQL] TreeNodeException when flat mapping RelationalGrou…
viirya Aug 18, 2016
5735b8b
[SPARK-16391][SQL] Support partial aggregation for reduceGroups
rxin Aug 18, 2016
ec5f157
[SPARK-17117][SQL] 1 / NULL should not fail analysis
petermaxlee Aug 18, 2016
0bc3753
Fix part of undocumented/duplicated arguments warnings by CRAN-check
junyangq Aug 9, 2016
6d5233e
More fixes of the docs.
junyangq Aug 10, 2016
0edfd7d
Fix remaining undocumented/duplicated warnings
junyangq Aug 11, 2016
e72a6aa
Keep to the convention where we have docs for generic and the function.
junyangq Aug 12, 2016
afa69ed
Fix docs for window functions
junyangq Aug 16, 2016
c9cfe43
some fixes of R doc
junyangq Aug 18, 2016
3aafaa7
Move param docs from generic function to method definition.
junyangq Aug 18, 2016
315a0dd
some fixes of R doc
junyangq Aug 18, 2016
aa3d233
Move param docs from generic function to method definition.
junyangq Aug 18, 2016
71170e9
Solve conflicts.
junyangq Aug 18, 2016
2682719
Revert "Fix docs for window functions"
junyangq Aug 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level…
… for parquet reader

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.

The benchmark that excludes the time of writing Parquet file:

    test("Benchmark for Parquet") {
      val N = 500 << 12
        withParquetTable((0 until N).map(i => (101, i)), "t") {
          val benchmark = new Benchmark("Parquet reader", N)
          benchmark.addCase("reading Parquet file", 10) { iter =>
            sql("SELECT _1 FROM t where t._1 < 100").collect()
          }
          benchmark.run()
      }
    }

`withParquetTable` in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader.

When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is:

Before this patch:

The retrieved row groups: 8063

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           825 / 1233          2.5         402.6       1.0X

After this patch:

The retrieved row groups: 0

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           306 /  503          6.7         149.6       1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration. This time the parquet block size is default value.

Before this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           136 /  238         15.0          66.5       1.0X

After this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           124 /  193         16.5          60.7       1.0X

For non-pushdown case, from the results, I think this patch doesn't affect normal code path.

I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase` to see if this patch actually filter the row-groups. When running the above benchmark:

After this patch:
    `totalRowCount = 0`

Before this patch:
    `totalRowCount = 1024000`

Existing tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes #13701 from viirya/vectorized-reader-push-down-filter2.

(cherry picked from commit 19af298)
Signed-off-by: Davies Liu <[email protected]>
  • Loading branch information
viirya authored and davies committed Aug 10, 2016
commit 977fbbfcae705dbdbf203bd0a6e7c75a12156d3f
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable {
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums

/**
* Looks for a registered accumulator by accumulator name.
*/
private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
accumulators.find { acc =>
acc.name.isDefined && acc.name.get == name
}
}
}


Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
import org.apache.spark.scheduler.AccumulableInfo

Expand Down Expand Up @@ -257,6 +259,16 @@ private[spark] object AccumulatorContext {
originals.clear()
}

/**
* Looks for a registered accumulator by accumulator name.
*/
private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
originals.values().asScala.find { ref =>
val acc = ref.get
acc != null && acc.name.isDefined && acc.name.get == name
}.map(_.get)
}

// Identifier for distinguishing SQL metrics from other accumulators
private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;

import scala.Option;

import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
Expand Down Expand Up @@ -59,8 +61,12 @@
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;

/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
Expand Down Expand Up @@ -144,6 +150,18 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}

// For test purpose.
// If the predefined accumulator exists, the row group number to read will be updated
// to the accumulator. So we can check if the row groups are filtered or not in test case.
TaskContext taskContext = TaskContext$.MODULE$.get();
if (taskContext != null) {
Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics()
.lookForAccumulatorByName("numRowGroups");
if (accu.isDefined()) {
((LongAccumulator)accu.get()).add((long)blocks.size());
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -355,6 +356,11 @@ private[sql] class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.{AccumulatorContext, LongAccumulator}

/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
Expand Down Expand Up @@ -370,73 +371,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)

val pathFive = s"${dir.getCanonicalPath}/table5"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
Seq("true", "false").map { vectorized =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)

val pathFive = s"${dir.getCanonicalPath}/table5"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
}
}
}
}
Expand Down Expand Up @@ -559,4 +562,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}

test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table"
(1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)

Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
val accu = new LongAccumulator
accu.register(sparkContext, Some("numRowGroups"))

val df = spark.read.parquet(path).filter("a < 100")
df.foreachPartition(_.foreach(v => accu.add(0)))
df.collect

val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
assert(numRowGroups.isDefined)
assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
AccumulatorContext.remove(accu.id)
}
}
}
}
}
}