Skip to content

Conversation

@gatorsmile
Copy link
Member

What changes were proposed in this pull request?

When FilterExec contains isNotNull, which could be inferred and pushed down or users specified, we convert the nullability of the involved columns if the top-layer expression is null-intolerant. However, this is not correct, if the top-layer expression is not a leaf expression, it could still tolerate the null when it has null-tolerant child expressions.

For example, cast(coalesce(a#5, a#15) as double). Although cast is a null-intolerant expression, but obviouslycoalesce is null-tolerant. Thus, it could eat null.

When the nullability is wrong, we could generate incorrect results in different cases. For example,

    val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
    val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")
    val joinedDf = df1.join(df2, Seq("a"), "outer").na.fill(0)
    val df3 = Seq((3, 1)).toDF("a", "d")
    joinedDf.join(df3, "a").show

The optimized plan is like

Project [a#29, b#30, c#31, d#42]
+- Join Inner, (a#29 = a#41)
   :- Project [cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int) AS a#29, cast(coalesce(cast(b#6 as double), 0.0) as int) AS b#30, cast(coalesce(cast(c#16 as double), 0.0) as int) AS c#31]
   :  +- Filter isnotnull(cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int))
   :     +- Join FullOuter, (a#5 = a#15)
   :        :- LocalRelation [a#5, b#6]
   :        +- LocalRelation [a#15, c#16]
   +- LocalRelation [a#41, d#42]

Without the fix, it returns an empty result. With the fix, it can return a correct answer:

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  3|  0|  4|  1|
+---+---+---+---+

How was this patch tested?

Added test cases to verify the nullability changes in FilterExec. Also added a test case for verifying the reported incorrect result.

@gatorsmile gatorsmile changed the title [SPARK-17981] [SPARK-17957] [SQL] Incorrectly Set Nullability to False in FilterExec [SPARK-17981] [SPARK-17957] [SQL] Fix Incorrect Nullability Setting to False in FilterExec Oct 17, 2016
@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67096 has finished for PR 15523 at commit 54c3cc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

cc @cloud-fan @davies @sameeragarwal

// One expression is null intolerant iff it and its children are null intolerant
private def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant =>
if (e.isInstanceOf[LeafExpression]) true else e.children.forall(isNullIntolerant)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this!

This change is too conservative. Actually we only need to consider a non NullIntolerant expression when it contains the attributes in the output. I think we can do more aggressive way. E.g.,

// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
  case IsNotNull(a: NullIntolerant) =>
    isNullIntolerant(a) && a.references.subsetOf(child.outputSet)
  case _ => false
}

private def isNullIntolerant(expr: Expression): Boolean = {
  expr.find { e =>
    !e.isInstanceOf[NullIntolerant] && e.references.subsetOf(child.outputSet)
  }.isEmpty
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized the original code was from your PR. Then, in your above code, why you still need to keep a.references.subsetOf(child.outputSet)? It looks confusing to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even a passed the check of isNullIntolerant, i.e., it has not non NullIntolerant which wraps output attributes. If it doesn't refer to any output attributes, we don't need it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you show me an example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsNotNull(Rand() > 0.5)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see.

First, we definitely need test cases to cover each positive and negative scenario. Previously, we did not have any test case to check the validity of nullability changes. Second, the code needs more comments when the variable/function names are not able to explain the codes.

@gatorsmile
Copy link
Member Author

@viirya I have not changed the algorithm. I just tried to improve the test case coverage.

Thanks to constructIsNotNullConstraints, the existing solution already covers all the cases, right?

Can you help me check anything scenario is missing? Thanks!

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67212 has finished for PR 15523 at commit ce418f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 20, 2016

@gatorsmile A predicate like IsNotNull(a + b + Rand()) will let this change to wrongly set the nullability of a and b to true. Isn't it?

@gatorsmile
Copy link
Member Author

The parm name of the verification function is wrong. It should be expectedNonNullableColumns. Please check the test case again.

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67229 has finished for PR 15523 at commit 52cb8fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 24, 2016

Test build #67457 has finished for PR 15523 at commit 4f2101e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

verifyNullabilityInFilterExec(df,
expr = "_1", expectedNonNullableColumns = Seq("_1"))
verifyNullabilityInFilterExec(df,
expr = "_2 + Rand()", expectedNonNullableColumns = Seq("_2"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not work in current approach. It works now because we infer redundant IsNotNull constraints. E.g., if Filter has a constraint IsNotNull(_2 + Rand()), we will infer another IsNotNull(_2) from it. Your approach is working on IsNotNull(_2) to decide _2 is non-nullable column, not IsNotNull(_2 + Rand()).

I submitted another PR #15653 for redundant IsNotNull constraints. But I am not sure if we want to fix it since it doesn't affect correctness. I left that to @cloud-fan or @hvanhovell to decide it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already explained why my current solution works in my previous statement. Personally, I like simple code, which is easy to understand and maintain, especially when it can cover all the cases. Result correctness and code maintainability are alwasy more important.

If constructIsNotNullConstraints is changed by somebody else (i.e., it does not provide the expected IsNotNull constraints), the test cases added by this PR will fail. Then, we can modify the codes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so I said I will left that to @cloud-fan or others to decide...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the simplicity argument but can you please add a comment here explaining why this particular case is working due to the null inference rule?

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67697 has finished for PR 15523 at commit 49daace.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 28, 2016

LGTM and wait for @cloud-fan or others to do second check.

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this LGTM too with just a couple of extremely minor comments. As far as the change is concerned, I tend to side with simplicity argument for now (especially if we're planning to target 2.1).

// If one expression and its children are null intolerant, it is null intolerant.
private def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant =>
if (e.isInstanceOf[LeafExpression]) true else e.children.forall(isNullIntolerant)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about something like this for better readability:

  private def isNullIntolerant(expr: Expression): Boolean = expr match {
    case e: NullIntolerant with LeafExpression => true
    case e: NullIntolerant => e.children.forall(isNullIntolerant)
    case _ => false
  }

Copy link
Member Author

@gatorsmile gatorsmile Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forall will return true if the children is empty. Thus, we can remove the first case. Now it becomes simpler. : )

  private def isNullIntolerant(expr: Expression): Boolean = expr match {
    case e: NullIntolerant => e.children.forall(isNullIntolerant)
    case _ => false
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

verifyNullabilityInFilterExec(df,
expr = "_1", expectedNonNullableColumns = Seq("_1"))
verifyNullabilityInFilterExec(df,
expr = "_2 + Rand()", expectedNonNullableColumns = Seq("_2"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the simplicity argument but can you please add a comment here explaining why this particular case is working due to the null inference rule?

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68045 has finished for PR 15523 at commit 2364cc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

Merging to master/2.1. Thanks!

asfgit pushed a commit that referenced this pull request Nov 3, 2016
…False in FilterExec

### What changes were proposed in this pull request?

When `FilterExec` contains `isNotNull`, which could be inferred and pushed down or users specified, we convert the nullability of the involved columns if the top-layer expression is null-intolerant. However, this is not correct, if the top-layer expression is not a leaf expression, it could still tolerate the null when it has null-tolerant child expressions.

For example, `cast(coalesce(a#5, a#15) as double)`. Although `cast` is a null-intolerant expression, but obviously`coalesce` is null-tolerant. Thus, it could eat null.

When the nullability is wrong, we could generate incorrect results in different cases. For example,

``` Scala
    val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
    val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")
    val joinedDf = df1.join(df2, Seq("a"), "outer").na.fill(0)
    val df3 = Seq((3, 1)).toDF("a", "d")
    joinedDf.join(df3, "a").show
```

The optimized plan is like

```
Project [a#29, b#30, c#31, d#42]
+- Join Inner, (a#29 = a#41)
   :- Project [cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int) AS a#29, cast(coalesce(cast(b#6 as double), 0.0) as int) AS b#30, cast(coalesce(cast(c#16 as double), 0.0) as int) AS c#31]
   :  +- Filter isnotnull(cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int))
   :     +- Join FullOuter, (a#5 = a#15)
   :        :- LocalRelation [a#5, b#6]
   :        +- LocalRelation [a#15, c#16]
   +- LocalRelation [a#41, d#42]
```

Without the fix, it returns an empty result. With the fix, it can return a correct answer:

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  3|  0|  4|  1|
+---+---+---+---+
```
### How was this patch tested?

Added test cases to verify the nullability changes in FilterExec. Also added a test case for verifying the reported incorrect result.

Author: gatorsmile <[email protected]>

Closes #15523 from gatorsmile/nullabilityFilterExec.

(cherry picked from commit 66a99f4)
Signed-off-by: Herman van Hovell <[email protected]>
@hvanhovell
Copy link
Contributor

@gatorsmile can you open a PR for 2.0 if we also need to port it to that branch?

@asfgit asfgit closed this in 66a99f4 Nov 3, 2016
@gatorsmile
Copy link
Member Author

Sure, @hvanhovell will do it. Thanks!

asfgit pushed a commit that referenced this pull request Nov 5, 2016
…ty Setting to False in FilterExec

### What changes were proposed in this pull request?

**This PR is to backport the fix #15523 to 2.0.**

When `FilterExec` contains `isNotNull`, which could be inferred and pushed down or users specified, we convert the nullability of the involved columns if the top-layer expression is null-intolerant. However, this is not correct, if the top-layer expression is not a leaf expression, it could still tolerate the null when it has null-tolerant child expressions.

For example, `cast(coalesce(a#5, a#15) as double)`. Although `cast` is a null-intolerant expression, but obviously`coalesce` is null-tolerant. Thus, it could eat null.

When the nullability is wrong, we could generate incorrect results in different cases. For example,

``` Scala
    val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
    val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")
    val joinedDf = df1.join(df2, Seq("a"), "outer").na.fill(0)
    val df3 = Seq((3, 1)).toDF("a", "d")
    joinedDf.join(df3, "a").show
```

The optimized plan is like

```
Project [a#29, b#30, c#31, d#42]
+- Join Inner, (a#29 = a#41)
   :- Project [cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int) AS a#29, cast(coalesce(cast(b#6 as double), 0.0) as int) AS b#30, cast(coalesce(cast(c#16 as double), 0.0) as int) AS c#31]
   :  +- Filter isnotnull(cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int))
   :     +- Join FullOuter, (a#5 = a#15)
   :        :- LocalRelation [a#5, b#6]
   :        +- LocalRelation [a#15, c#16]
   +- LocalRelation [a#41, d#42]
```

Without the fix, it returns an empty result. With the fix, it can return a correct answer:

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  3|  0|  4|  1|
+---+---+---+---+
```
### How was this patch tested?

Added test cases to verify the nullability changes in FilterExec. Also added a test case for verifying the reported incorrect result.

Author: gatorsmile <[email protected]>

Closes #15781 from gatorsmile/nullabilityFix.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…False in FilterExec

### What changes were proposed in this pull request?

When `FilterExec` contains `isNotNull`, which could be inferred and pushed down or users specified, we convert the nullability of the involved columns if the top-layer expression is null-intolerant. However, this is not correct, if the top-layer expression is not a leaf expression, it could still tolerate the null when it has null-tolerant child expressions.

For example, `cast(coalesce(a#5, a#15) as double)`. Although `cast` is a null-intolerant expression, but obviously`coalesce` is null-tolerant. Thus, it could eat null.

When the nullability is wrong, we could generate incorrect results in different cases. For example,

``` Scala
    val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
    val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")
    val joinedDf = df1.join(df2, Seq("a"), "outer").na.fill(0)
    val df3 = Seq((3, 1)).toDF("a", "d")
    joinedDf.join(df3, "a").show
```

The optimized plan is like

```
Project [a#29, b#30, c#31, d#42]
+- Join Inner, (a#29 = a#41)
   :- Project [cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int) AS a#29, cast(coalesce(cast(b#6 as double), 0.0) as int) AS b#30, cast(coalesce(cast(c#16 as double), 0.0) as int) AS c#31]
   :  +- Filter isnotnull(cast(coalesce(cast(coalesce(a#5, a#15) as double), 0.0) as int))
   :     +- Join FullOuter, (a#5 = a#15)
   :        :- LocalRelation [a#5, b#6]
   :        +- LocalRelation [a#15, c#16]
   +- LocalRelation [a#41, d#42]
```

Without the fix, it returns an empty result. With the fix, it can return a correct answer:

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  3|  0|  4|  1|
+---+---+---+---+
```
### How was this patch tested?

Added test cases to verify the nullability changes in FilterExec. Also added a test case for verifying the reported incorrect result.

Author: gatorsmile <[email protected]>

Closes apache#15523 from gatorsmile/nullabilityFilterExec.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants