Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7d78be3
core
andrej-db Oct 23, 2024
7839e1d
revert MsSqlServerDialect build method
andrej-db Oct 23, 2024
0172c49
remove visitCaseWhen, more intuitive predicate wrapping
andrej-db Oct 23, 2024
7ae7397
imports
andrej-db Oct 23, 2024
49d742e
fix
andrej-db Oct 23, 2024
c2001d9
MsSqlServerDialect: comment
andrej-db Oct 24, 2024
8b1b2da
JdbcDialects: move aux here
andrej-db Oct 25, 2024
1f01b77
V2ExpressionBuilder: refactor
andrej-db Oct 25, 2024
7e36029
nit
andrej-db Oct 29, 2024
ab79afe
nit
andrej-db Nov 18, 2024
de38a8c
Update JdbcDialects.scala
cloud-fan Nov 19, 2024
56cea3c
Update MsSqlServerDialect.scala
cloud-fan Nov 19, 2024
21ec622
Update sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects…
cloud-fan Nov 19, 2024
468eb89
Update JdbcDialects.scala
cloud-fan Nov 19, 2024
1bc39ee
Update sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerD…
cloud-fan Nov 19, 2024
6a221c6
Update JdbcDialects.scala
cloud-fan Nov 19, 2024
ef1bcc8
Update MsSqlServerDialect.scala
cloud-fan Nov 19, 2024
5d94fa1
Update JdbcDialects.scala
andrej-db Nov 19, 2024
f94f8e5
Update MsSqlServerDialect.scala
andrej-db Nov 19, 2024
55084a3
Update MsSqlServerIntegrationSuite.scala
andrej-db Nov 19, 2024
7fb6b29
Update MsSqlServerDialect.scala
andrej-db Nov 19, 2024
ca0545a
Update MsSqlServerIntegrationSuite.scala
andrej-db Nov 19, 2024
df2fe41
Apply suggestions from code review
cloud-fan Nov 20, 2024
53a4220
Update connector/docker-integration-tests/src/test/scala/org/apache/s…
cloud-fan Nov 20, 2024
cbef9a5
Update MsSqlServerIntegrationSuite.scala
cloud-fan Nov 20, 2024
c990ec6
Update JdbcDialects.scala
cloud-fan Nov 20, 2024
ee4d4fb
Update sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerD…
cloud-fan Nov 20, 2024
6cff9f5
Update MsSqlServerIntegrationSuite.scala
andrej-db Nov 21, 2024
bca7ce8
Merge branch 'apache:master' into SPARK-50087-CaseWhen
andrej-db Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove visitCaseWhen, more intuitive predicate wrapping
  • Loading branch information
andrej-db committed Oct 23, 2024
commit 0172c49340f1e3240de6d867ba6e89fd158ffcab
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ protected String inputToSQL(Expression input) {
}
}

protected String inputToCaseWhenSQL(Expression input) {
return "CASE WHEN " + inputToSQL(input) + " THEN 1 ELSE 0";
}

protected String visitBinaryComparison(String name, String l, String r) {
if (name.equals("<=>")) {
return "((" + l + " IS NOT NULL AND " + r + " IS NOT NULL AND " + l + " = " + r + ") " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package org.apache.spark.sql.jdbc

import java.sql.SQLException
import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.{Expression, NullOrdering, SortDirection}
import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc
import org.apache.spark.sql.connector.expressions.{Expression, Literal, NamedReference, NullOrdering, SortDirection, SortOrder, Transform}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.ExpressionWithToString
import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -73,36 +73,6 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
}
}

override def visitCaseWhen(children: Array[String]): String = {
// Since MsSqlServer cannot handle boolean expressions inside
// a CASE WHEN, it is necessary to convert those to an IIF
// expression that will return 1 or 0 depending on the result.
// Example:
// In: ... CASE WHEN a = b THEN c = d ...
// Out: ... CASE WHEN a = b THEN IIF(c = d, 1, 0) ...
val sb = new StringBuilder("CASE")
var i = 0
while (i < children.length) {
val c = children(i)
val j = i + 1
if (j < children.length) {
val v = children(j)
sb.append(" WHEN ")
sb.append(c)
sb.append(" THEN ")
sb.append(MsSqlServerDialect.wrapPredicateWithIIF(v))
}
else {
sb.append(" ELSE ")
sb.append(MsSqlServerDialect.wrapPredicateWithIIF(c))
}

i += 2
}
sb.append(" END")
sb.toString
}

override def dialectFunctionName(funcName: String): String = funcName match {
case "VAR_POP" => "VARP"
case "VAR_SAMP" => "VAR"
Expand All @@ -122,7 +92,24 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
case o => inputToSQL(o)
}
visitBinaryComparison(e.name(), l, r)
case "CASE_WHEN" => visitCaseWhen(expressionsToStringArray(e.children())) + " = 1"
case "CASE_WHEN" =>
// Since MsSqlServer cannot handle boolean expressions inside
// a CASE WHEN, it is necessary to convert those to an IIF
// expression that will return 1 or 0 depending on the result.
// Example:
// In: ... CASE WHEN a = b THEN c = d ...
// Out: ... CASE WHEN a = b THEN IIF(c = d, 1, 0) ...

// grouped turns Array[Expression] to Array[Array[Expression]]
// with a len of max 2 (final one will have only one)
val stringArray = e.children().grouped(2).flatMap { arr =>
arr.dropRight(1).map(inputToSQL) :+
(arr.last match {
case p: Predicate => inputToCaseWhenSQL(p)
case p => inputToSQL(p)
})
}
visitCaseWhen(stringArray.toArray) + " = 1"
case _ => super.build(expr)
}
case _ => super.build(expr)
Expand Down