Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Apply code review comments
- Use SparkUnsupportedOperationException
- Remove unused string interpolation
- Fix indentation
  • Loading branch information
EnricoMi committed Apr 28, 2025
commit d9b33ead4f055b35d4b46d0c3aea54a3c12fea46
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6046,6 +6046,11 @@
"Update column nullability for MySQL and MS SQL Server."
]
},
"UPSERT" : {
"message" : [
"Upsert not supported by JDBC dialect <class>."
]
},
"WRITE_FOR_BINARY_SOURCE" : {
"message" : [
"Write for the binary file data source."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ trait UpsertTests {
def createTableOption: String
def upsertTestOptions: Map[String, String] = Map("createTableOptions" -> createTableOption)

test(s"Upsert existing table") { doTestUpsert(tableExists = true) }
test(s"Upsert non-existing table") { doTestUpsert(tableExists = false) }
test("Upsert existing table") { doTestUpsert(tableExists = true) }
test("Upsert non-existing table") { doTestUpsert(tableExists = false) }

Seq(
Seq("ts", "id", "v1", "v2"),
Expand Down Expand Up @@ -80,7 +80,7 @@ trait UpsertTests {
assert(actual === expected)
}

test(s"Upsert concurrency") {
test("Upsert concurrency") {
// create a table with 100k rows
val init =
spark.range(100000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ object JdbcUtils extends Logging with SQLConfHelper {
}

def getUpsertStatement(
table: String,
rddSchema: StructType,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
dialect: JdbcDialect,
options: JDBCOptions): String = {
table: String,
rddSchema: StructType,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
dialect: JdbcDialect,
options: JDBCOptions): String = {
val columns = getInsertColumns(table, rddSchema, tableSchema, isCaseSensitive, dialect)
dialect.getUpsertStatement(table, columns, isCaseSensitive, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ abstract class JdbcDialect extends Serializable with Logging {
columns: Array[StructField],
isCaseSensitive: Boolean,
options: JDBCOptions): String =
throw new UnsupportedOperationException("upserts are not supported")
throw new SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.UPSERT",
messageParameters = Map(
"class" -> this.getClass.getSimpleName))

/**
* Override connection specific properties to run before a select is made. This is in place to
Expand Down