-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Refactor filter pushdown for JDBCRDD and add few filters #10470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
1e3adbc
05188b7
b0da6fc
a7ef79e
cb7ce21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -174,6 +174,7 @@ private[sql] object JDBCRDD extends Logging { | |
| case stringValue: String => s"'${escapeSql(stringValue)}'" | ||
| case timestampValue: Timestamp => "'" + timestampValue + "'" | ||
| case dateValue: Date => "'" + dateValue + "'" | ||
| case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") | ||
| case _ => value | ||
| } | ||
|
|
||
|
|
@@ -182,18 +183,40 @@ private[sql] object JDBCRDD extends Logging { | |
|
|
||
| /** | ||
| * Turns a single Filter into a String representing a SQL expression. | ||
| * Returns null for an unhandled filter. | ||
| * Returns None for an unhandled filter. | ||
| */ | ||
| private def compileFilter(f: Filter): String = f match { | ||
| case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" | ||
| case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" | ||
| case LessThan(attr, value) => s"$attr < ${compileValue(value)}" | ||
| case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" | ||
| case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" | ||
| case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" | ||
| case IsNull(attr) => s"$attr IS NULL" | ||
| case IsNotNull(attr) => s"$attr IS NOT NULL" | ||
| case _ => null | ||
| private def compileFilter(f: Filter): Option[String] = { | ||
| Option(f match { | ||
| case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" | ||
| case LessThan(attr, value) => s"$attr < ${compileValue(value)}" | ||
| case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" | ||
| case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" | ||
| case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" | ||
| case IsNull(attr) => s"$attr IS NULL" | ||
| case IsNotNull(attr) => s"$attr IS NOT NULL" | ||
| case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'" | ||
| case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'" | ||
| case StringContains(attr, value) => s"${attr} LIKE '%${value}%'" | ||
| case In(attr, value) => s"$attr IN (${compileValue(value)})" | ||
| case Not(f) => compileFilter(f).map(p => s"(NOT ($p))").getOrElse(null) | ||
| case Or(f1, f2) => | ||
| // We can't compile Or filter unless both sub-filters are compiled successfully. | ||
| // It applies too for the following And filter. | ||
| val or = Seq(f1, f2).map(compileFilter(_)).flatten | ||
| if (or.size == 2) { | ||
| or.map(p => s"($p)").mkString(" OR ") | ||
| } else { | ||
| null | ||
| } | ||
| case And(f1, f2) => | ||
| val and = Seq(f1, f2).map(compileFilter(_)).flatten | ||
| if (and.size == 2) { | ||
| and.map(p => s"($p)").mkString(" AND ") | ||
| } else { | ||
| null | ||
| } | ||
| case _ => null | ||
| }) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -295,25 +318,17 @@ private[sql] class JDBCRDD( | |
| /** | ||
| * `filters`, but as a WHERE clause suitable for injection into a SQL query. | ||
| */ | ||
| private val filterWhereClause: String = { | ||
| val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null) | ||
| if (filterStrings.size > 0) { | ||
| val sb = new StringBuilder("WHERE ") | ||
| filterStrings.foreach(x => sb.append(x).append(" AND ")) | ||
| sb.substring(0, sb.length - 5) | ||
| } else "" | ||
| } | ||
| private val filterWhereClause: Seq[Option[String]] = filters.map(JDBCRDD.compileFilter) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this just take Seq[String] by adding a flatMap to filters.map(JDBCRDD.compileFilter)?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. I added flatten and mkString to it and return a string like before then. |
||
|
|
||
| /** | ||
| * A WHERE clause representing both `filters`, if any, and the current partition. | ||
| */ | ||
| private def getWhereClause(part: JDBCPartition): String = { | ||
| if (part.whereClause != null && filterWhereClause.length > 0) { | ||
| filterWhereClause + " AND " + part.whereClause | ||
| } else if (part.whereClause != null) { | ||
| "WHERE " + part.whereClause | ||
| val w = filterWhereClause.+:(Option(part.whereClause)).flatten.mkString(" AND ") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the old code here is a lot easier to understand than the new version.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 and the old one is better to me.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. I revert it back to old code with little difference for the new filterWhereClause. |
||
| if (!w.isEmpty) { | ||
| "WHERE " + w | ||
| } else { | ||
| filterWhereClause | ||
| w | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter | |
| import org.scalatest.PrivateMethodTester | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.Row | ||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
| import org.apache.spark.sql.types._ | ||
|
|
@@ -186,8 +187,26 @@ class JDBCSuite extends SparkFunSuite | |
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) | ||
| .collect().size == 2) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use two == to be consistent
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
| .collect().size === 2) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) | ||
| .collect().size == 2) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " | ||
| + "AND THEID = 2")).collect().size == 2) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) | ||
| assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) | ||
| assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) | ||
| assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) | ||
|
|
||
| // This is a test to reflect discussion in SPARK-12218. | ||
| // The older versions of spark have this kind of bugs in parquet data source. | ||
| val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") | ||
| val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") | ||
| assert(df1.collect.toSet === Set(Row("mary", 2))) | ||
| assert(df2.collect.toSet === Set(Row("mary", 2))) | ||
| } | ||
|
|
||
| test("SELECT * WHERE (quoted strings)") { | ||
|
|
@@ -434,17 +453,24 @@ class JDBCSuite extends SparkFunSuite | |
| } | ||
|
|
||
| test("compile filters") { | ||
| val compileFilter = PrivateMethod[String]('compileFilter) | ||
| def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) | ||
| val compileFilter = PrivateMethod[Option[String]]('compileFilter) | ||
| def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) getOrElse("") | ||
| assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") | ||
| assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") | ||
| assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))") | ||
| assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def"))) | ||
| === "(col0 = 0) AND (col1 = 'def')") | ||
| assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi"))) | ||
| === "(col0 = 2) OR (col1 = 'ghi')") | ||
| assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") | ||
| assert(doCompileFilter(LessThan("col3", | ||
| Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") | ||
| assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'") | ||
| assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") | ||
| assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") | ||
| assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") | ||
| assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')") | ||
| assert(doCompileFilter(Not(In("col1", Array("mno", "pqr")))) | ||
| === "(NOT (col1 IN ('mno', 'pqr')))") | ||
| assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") | ||
| assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests in case of failing to compile sub-filters.
And also, what is an concrete example for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we already support all filters here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this pr includes all the implemented filters in
o.a.s.sources.filters.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test for that using EqualNullSafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad and I understood.
Actually, I'm not sure how to handle
EqualNullSafe. Most the recent databases supportIS NOT DISTINCT FROMcorresponding toEqualNullSafein SQL99 though, older databases do not.ISTM that following prs consider this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh actually I opened a PR for
EqualNullSafehere already few months ago. #8743There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh great. If #8743 merged, I think it could simplify the
And/Orentries incompileFilters.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know that.