Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Dec 7, 2016

What changes were proposed in this pull request?

Currently, FileSourceStrategy does not handle the case when the pushed-down filter is Literal(null) and removes it at the post-filter in Spark-side.

For example, the codes below:

val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)

shows it keeps null properly.

== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]

However, when we read it back from Parquet,

val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)

null is removed at the post-filter.

== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>

This PR fixes it to keep it properly. In more details,

val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))

This keeps this null in partitionKeyFilters as Literal always don't have children and references is being empty which is always the subset of partitionSet.

And then in

val afterScanFilters = filterSet -- partitionKeyFilters

null is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.

After this PR, it becomes as below:

== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>

How was this patch tested?

Unit test in FileSourceStrategySuite

@HyukjinKwon
Copy link
Member Author

cc @liancheng and @zsxwing. Could you take a look and see if it makes sense please?

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69767 has finished for PR 16184 at commit c6fe345.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

LGTM, thanks for working on this!

I think the problem is that we generate a foldable predicate (lit(null)) during optimization phase but fail to fold it. Ideally, the optimizer should fold IsNotNull($"_1") && lit(null) into a lit(false). But it's still good to check for foldable predicates at planning phase.

Merging this to master and branch-2.1.

asfgit pushed a commit that referenced this pull request Dec 14, 2016
…-side post-filter for FileFormat datasources

## What changes were proposed in this pull request?

Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.

For example, the codes below:

```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```

shows it keeps `null` properly.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]
```

However, when we read it back from Parquet,

```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```

`null` is removed at the post-filter.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

This PR fixes it to keep it properly. In more details,

```scala
val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```

This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty  which is always the subset of `partitionSet`.

And then in

```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```

`null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.

After this PR, it becomes as below:

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

## How was this patch tested?

Unit test in `FileSourceStrategySuite`

Author: hyukjinkwon <[email protected]>

Closes #16184 from HyukjinKwon/SPARK-18753.

(cherry picked from commit 89ae26d)
Signed-off-by: Cheng Lian <[email protected]>
@asfgit asfgit closed this in 89ae26d Dec 14, 2016
@HyukjinKwon
Copy link
Member Author

Thank you for the detailed explanation!!

robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…-side post-filter for FileFormat datasources

## What changes were proposed in this pull request?

Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.

For example, the codes below:

```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```

shows it keeps `null` properly.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]
```

However, when we read it back from Parquet,

```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```

`null` is removed at the post-filter.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

This PR fixes it to keep it properly. In more details,

```scala
val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```

This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty  which is always the subset of `partitionSet`.

And then in

```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```

`null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.

After this PR, it becomes as below:

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

## How was this patch tested?

Unit test in `FileSourceStrategySuite`

Author: hyukjinkwon <[email protected]>

Closes apache#16184 from HyukjinKwon/SPARK-18753.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…-side post-filter for FileFormat datasources

## What changes were proposed in this pull request?

Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.

For example, the codes below:

```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```

shows it keeps `null` properly.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]
```

However, when we read it back from Parquet,

```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```

`null` is removed at the post-filter.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

This PR fixes it to keep it properly. In more details,

```scala
val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```

This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty  which is always the subset of `partitionSet`.

And then in

```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```

`null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.

After this PR, it becomes as below:

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

## How was this patch tested?

Unit test in `FileSourceStrategySuite`

Author: hyukjinkwon <[email protected]>

Closes apache#16184 from HyukjinKwon/SPARK-18753.
@HyukjinKwon HyukjinKwon deleted the SPARK-18753 branch January 2, 2018 03:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants