-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-10838][SPARK-11576][SQL][WIP] Incorrect results or exceptions when using self-joins #9548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10838][SPARK-11576][SQL][WIP] Incorrect results or exceptions when using self-joins #9548
Conversation
|
Since this solution requires adding quantifier comparison into the equation of attributeReferences, this will fail a couple test cases in expand. We have already identified the bugs in the expand and submitted pull requests to resolve this issue. #9216 |
|
Test build #45319 has finished for PR 9548 at commit
|
|
To fix these failed cases, I will move the dataFrame's hashCode to the Column class as a dedicated field, instead of directly putting the values to quantifiers. |
|
I can't fix the problem without a major code change. The current design of dataFrame APIs has a fundamental problem. When using column references, we might hit various strange issues if the dataFrame has the columns with the same name and expression id. Note that this might occur even if we do not have self joins. For example, in the following code, val df1 = Seq((1, 3), (2, 1)).toDF("keyCol1", "keyCol2")
val df2 = Seq((1, 4, 0), (2, 1, 0)).toDF("keyCol1", "keyCol3", "keyColToDrop")
val df3 = df1.join(df2, df1("keyCol1") === df2("keyCol1"))
val col = df3("keyColToDrop")
val df = df2.drop(col)
df.printSchema() Above, we can use a column reference of df3 to drop the column in df2. That does not make sense, right? In each column reference, we have to know the data source. @marmbrus @rxin @liancheng @yhuai @cloud-fan |
|
That particular example does not really seem like a problem to me. Its the same column logically that you are dropping. What if we make the change in the Column API only, instead of trying to change |
|
@marmbrus Thank you for your suggestions! That is also like my initial idea. I did a try last night. Unfortunately, I hit a problem when adding such a field to In the current design, the class When implementing the idea, it becomes more difficult. For example, in the following binary operators, def === (other: Any): Column = {
val right = lit(other).expr
EqualTo(expr, right)
}
That is why I am thinking this could mean a major code change to |
|
I don't think every About the problem that we resolve right tree of self join but miss the join codition, actually it's a known bug, a workaround is aliasing a name to Making every |
|
@cloud-fan Before discussing the solution details, let us first talk about the design issues. IMO, the In the current implementation, each When more components are built on Will answer your design suggestion in a separate post. |
|
@cloud-fan So far, we do not have an easy fix, but I believe we should never return a wrong result for self join. Let me post the test case I added. This test case will return an incorrect result without any exception: test("[SPARK-10838] self join - conflicting attributes in condition - incorrect result 2") {
val df1 = Seq((1, 3), (2, 1)).toDF("keyCol1", "keyCol2")
val df2 = Seq((1, 4), (2, 1)).toDF("keyCol1", "keyCol3")
val df3 = df1.join(df2, df1("keyCol1") === df2("keyCol1")).select(df1("keyCol1"), $"keyCol3")
checkAnswer(
df3.join(df1, df3("keyCol3") === df1("keyCol1") && df1("keyCol1") === df3("keyCol3")),
Row(2, 1, 1, 3) :: Nil)
}Before resolving this problem, what we can do it is to detect it and let customers use the workaround you mentioned. The detection condition is simple. The incorrect result could happen when the conflicting attributes contain the Do you agree @cloud-fan @marmbrus ? If OK, I will submit another PR for detecting it and issuing an exception with a meaningful message to users. |
|
@gatorsmile we will revisit this in the future. Do you mind closing the pull request for now? |
|
Ok, let me close it. Thank you! |
Spark SQL is using expression ID to identify the column references. For self joins, these IDs might not be unique. When resolving the attributeReference's ambiguity caused by self joins, the current solution only handles the conflicting attributes. However, this does not work when the join conditions use the column names that appear in both analyzed dataFrames, since the the columns in join conditions are analyzed before resolving the ambiguity of conflicting attributes. Currently, we did not update the search-condition during ambiguity resolution of attributeReference. When generating the new expression IDs in the right tree, we must update the corresponding columns' expression ID in search condition. This part is missing now.
Here, I am trying to propose a solution to resolve the above issue. When analyzing the columns in the join conditions, we record the dataFrame hashCode of the search-condition columns. By using the information, we can determine which columns are from the right tree, and then update their expression IDs when resolving the ambiguity of conflicting attributes.
When designing this PR, I am trying to avoid introducing a lot of code changes, and thus, I just use quantifiers to record this information, if and only if necessary.
Ideally, I think each column reference always needs to maintain a dedicated identifier for identifying its source dataFrame. The ideal solution requires a lot of code changes, but this can help us further optimize the plan in the future.
Thanks for any suggestion!