-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24327][SQL] Verify and normalize a partition column name based on the JDBC resolved schema #21379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #90874 has finished for PR 21379 at commit
|
|
retest this please |
|
Test build #90882 has finished for PR 21379 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the input partition.column is already quoted, can we avoid adding the quotes again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You point out the case users explicitly add quotes in partitionColumn? e.g.,
+ val df = spark.read.format("jdbc")
+ ...
+ .option("partitionColumn", """"THEID"""")
+ ...
+ .option("quotePartitionColumnName", quotePartitionColumnName)
+ .load()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest fix changes an existing behaviour (when quoting non-partition column names), so I'm not sure this fix is acceptable. Any suggestion?
|
Test build #91213 has finished for PR 21379 at commit
|
|
Sorry, I just realized this is a wrong direction. Instead of trusting the user inputs, we should verify and normalize the user-specified partition columns by using the already fetched table schema info |
|
ok. If the schema does not have the column, throws |
|
Yeah, we should do it. |
|
ok, I will re-check. |
a9b0306 to
d76bc7f
Compare
| } | ||
|
|
||
| testIncorrectJdbcPartitionColumn("NoExistingColumn") | ||
| withSQLConf("spark.sql.caseSensitive" -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLConf.CASE_SENSITIVE.key
| ans.toArray | ||
| } | ||
|
|
||
| def getSchema(jdbcOptions: JDBCOptions, resolver: Resolver): StructType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the function description?
| * Null value predicate is added to the first partition where clause to include | ||
| * the rows with null value for the partitions column. | ||
| * | ||
| * @param partitioning partition information to generate the where clause for each partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the other two @param for the new parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| }.map(dialect.quoteIdentifier).getOrElse { | ||
| throw new AnalysisException(s"User-defined partition column ${partitioning.column} not " + | ||
| s"found in the JDBC relation: ${schema.simpleString(Utils.maxNumToStringFields)}") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a new private function for the above resolution and checking logic?
|
@maropu The fix looks good to me. Thanks for changing the solution. Could you update the PR title and description? |
|
oh, I forgot to update the title..yea, I'll do soon. |
|
Test build #91813 has finished for PR 21379 at commit
|
|
Test build #91812 has finished for PR 21379 at commit
|
|
Test build #91814 has finished for PR 21379 at commit
|
|
Test build #91835 has finished for PR 21379 at commit
|
|
Test build #91838 has finished for PR 21379 at commit
|
| } | ||
| val relation = JDBCRelation(parts, options)(sparkSession) | ||
| val schema = JDBCRelation.getSchema(sparkSession.sessionState.conf.resolver, options) | ||
| val relation = JDBCRelation(schema, parts, options)(sparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to change this. Add an apply function to object JDBCRelation.scala
def apply(parts: Array[Partition], jdbcOptions: JDBCOptions)(
sparkSession: SparkSession): JDBCRelation = {
val schema = getSchema(jdbcOptions, sparkSession.sessionState.conf.resolver)
JDBCRelation(schema, parts, jdbcOptions)(sparkSession)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
LGTM except one minor comment. |
|
Test build #91867 has finished for PR 21379 at commit
|
|
ping |
|
retest this please |
|
The build passed. The tests have passed in the previous run. The current tests will be killed at the midnight. LGTM Thanks! Merged to master. |
|
Test build #92285 has finished for PR 21379 at commit
|
What changes were proposed in this pull request?
This pr modified JDBC datasource code to verify and normalize a partition column based on the JDBC resolved schema before building
JDBCRelation.Closes #20370
How was this patch tested?
Added tests in
JDBCSuite.