Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move upsert tests into trait
  • Loading branch information
EnricoMi committed Apr 28, 2025
commit 1bb04571cc607ba249f82eeadb3590bae70789ef
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with UpsertTests {
override val db = new MsSQLServerDatabaseOnDocker

override def dataPreparation(conn: Connection): Unit = {
Expand Down Expand Up @@ -146,6 +146,14 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
.executeUpdate()
conn.prepareStatement("""INSERT INTO test_rowversion (myKey, myValue) VALUES (1, 0)""")
.executeUpdate()

conn.prepareStatement("CREATE TABLE upsert (id INT, ts DATETIME, v1 FLOAT, v2 FLOAT, " +
"CONSTRAINT pk_upsert PRIMARY KEY (id, ts))").executeUpdate()
conn.prepareStatement("INSERT INTO upsert VALUES " +
"(1, '1996-01-01 01:23:45', 1.234, 1.234567), " +
"(1, '1996-01-01 01:23:46', 1.235, 1.234568), " +
"(2, '1996-01-01 01:23:45', 2.345, 2.345678), " +
"(2, '1996-01-01 01:23:46', 2.346, 2.345679)").executeUpdate()
}

test("Basic test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.Properties
import scala.util.Using

import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.ShortType
Expand All @@ -40,9 +39,7 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
import testImplicits._

class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with UpsertTests {
override val db = new MySQLDatabaseOnDocker

override def dataPreparation(conn: Connection): Unit = {
Expand Down Expand Up @@ -370,43 +367,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
.load()
checkAnswer(df, Row("brown ", "fox"))
}

Seq(false, true).foreach { exists =>
test(s"Upsert ${if (exists) "" else "non-"}existing table") {
val df = Seq(
(1, Timestamp.valueOf("1996-01-01 01:23:46"), 1.235, 1.234568), // row unchanged
(2, Timestamp.valueOf("1996-01-01 01:23:45"), 2.346, 2.345678), // updates v1
(2, Timestamp.valueOf("1996-01-01 01:23:46"), 2.347, 2.345680), // updates v1 and v2
(3, Timestamp.valueOf("1996-01-01 01:23:45"), 3.456, 3.456789) // inserts new row
).toDF("id", "ts", "v1", "v2").repartition(10)

val table = if (exists) "upsert" else "new_table"
val options = Map("numPartitions" -> "10", "upsert" -> "true")
df.write.mode(SaveMode.Append).options(options).jdbc(jdbcUrl, table, new Properties)

val actual = spark.read.jdbc(jdbcUrl, table, new Properties).collect.toSet
val existing = if (exists) {
Set((1, Timestamp.valueOf("1996-01-01 01:23:45"), 1.234, 1.234567))
} else {
Set.empty
}
val upsertedRows = Set(
(1, Timestamp.valueOf("1996-01-01 01:23:46"), 1.235, 1.234568),
(2, Timestamp.valueOf("1996-01-01 01:23:45"), 2.346, 2.345678),
(2, Timestamp.valueOf("1996-01-01 01:23:46"), 2.347, 2.345680),
(3, Timestamp.valueOf("1996-01-01 01:23:45"), 3.456, 3.456789)
)
val expected = (existing ++ upsertedRows).map { case (id, ts, v1, v2) =>
Row(Integer.valueOf(id), ts, v1.doubleValue(), v2.doubleValue())
}
assert(actual === expected)
}
}

test("Write with unspecified mode with upsert") { }
test("Write with overwrite mode with upsert") { }
test("Write with error-if-exists mode with upsert") { }
test("Write with ignore mode with upsert") { }
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with UpsertTests {
override val db = new PostgresDatabaseOnDocker

override def dataPreparation(conn: Connection): Unit = {
Expand Down Expand Up @@ -325,30 +325,6 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
DateTimeUtils.toJavaTimestamp(62551949000L)))
}

test(s"Upsert existing table") {
val df = Seq(
(1, Timestamp.valueOf("1996-01-01 01:23:46"), 1.235, 1.234568), // row unchanged
(2, Timestamp.valueOf("1996-01-01 01:23:45"), 2.346, 2.345678), // updates v1
(2, Timestamp.valueOf("1996-01-01 01:23:46"), 2.347, 2.345680), // updates v1 and v2
(3, Timestamp.valueOf("1996-01-01 01:23:45"), 3.456, 3.456789) // inserts new row
).toDF("id", "ts", "v1", "v2").repartition(10)

val options = Map("numPartitions" -> "10", "upsert" -> "true", "upsertKeyColumns" -> "id, ts")
df.write.mode(SaveMode.Append).options(options).jdbc(jdbcUrl, "upsert", new Properties)

val actual = spark.read.jdbc(jdbcUrl, "upsert", new Properties).collect.toSet
val expected = Set(
(1, Timestamp.valueOf("1996-01-01 01:23:45"), 1.234, 1.234567),
(1, Timestamp.valueOf("1996-01-01 01:23:46"), 1.235, 1.234568),
(2, Timestamp.valueOf("1996-01-01 01:23:45"), 2.346, 2.345678),
(2, Timestamp.valueOf("1996-01-01 01:23:46"), 2.347, 2.345680),
(3, Timestamp.valueOf("1996-01-01 01:23:45"), 3.456, 3.456789)
).map { case (id, ts, v1, v2) =>
Row(Integer.valueOf(id), ts, v1.doubleValue(), v2.doubleValue())
}
assert(actual === expected)
}

test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE " +
"should be recognized") {
// When using JDBC to read the columns of TIMESTAMP with TIME ZONE and TIME with TIME ZONE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.sql.Timestamp
import java.util.Properties

import org.apache.spark.sql.{Row, SaveMode}

trait UpsertTests {
self: DockerJDBCIntegrationSuite =>

import testImplicits._

test(s"Upsert existing table") { doTestUpsert(true) }
test(s"Upsert non-existing table") { doTestUpsert(false) }

def doTestUpsert(tableExists: Boolean): Unit = {
val df = Seq(
(1, Timestamp.valueOf("1996-01-01 01:23:46"), 1.235, 1.234568), // row unchanged
(2, Timestamp.valueOf("1996-01-01 01:23:45"), 2.346, 2.345678), // updates v1
(2, Timestamp.valueOf("1996-01-01 01:23:46"), 2.347, 2.345680), // updates v1 and v2
(3, Timestamp.valueOf("1996-01-01 01:23:45"), 3.456, 3.456789) // inserts new row
).toDF("id", "ts", "v1", "v2").repartition(1) // .repartition(10)

val table = if (tableExists) "upsert" else "new_table"
val options = Map("numPartitions" -> "10", "upsert" -> "true", "upsertKeyColumns" -> "id, ts")
df.write.mode(SaveMode.Append).options(options).jdbc(jdbcUrl, table, new Properties)

val actual = spark.read.jdbc(jdbcUrl, table, new Properties).collect.toSet
val existing = if (tableExists) {
Set((1, Timestamp.valueOf("1996-01-01 01:23:45"), 1.234, 1.234567))
} else {
Set.empty
}
val upsertedRows = Set(
(1, Timestamp.valueOf("1996-01-01 01:23:46"), 1.235, 1.234568),
(2, Timestamp.valueOf("1996-01-01 01:23:45"), 2.346, 2.345678),
(2, Timestamp.valueOf("1996-01-01 01:23:46"), 2.347, 2.345680),
(3, Timestamp.valueOf("1996-01-01 01:23:45"), 3.456, 3.456789)
)
val expected = (existing ++ upsertedRows).map { case (id, ts, v1, v2) =>
Row(Integer.valueOf(id), ts, v1.doubleValue(), v2.doubleValue())
}
assert(actual === expected)
}

test("Upsert null values") {}
test("Write with unspecified mode with upsert") {}
test("Write with overwrite mode with upsert") {}
test("Write with error-if-exists mode with upsert") {}
test("Write with ignore mode with upsert") {}
}