Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implemenation for H2
  • Loading branch information
EnricoMi committed Apr 28, 2025
commit 12c46d19e2a59d5c22c27394e6432f7f57a23231
14 changes: 11 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.{Connection, SQLException, Types}
import java.sql.{Connection, SQLException, Statement, Types}
import java.util
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -33,10 +33,10 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}

private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
private[sql] case class H2Dialect() extends JdbcDialect with MergeByTempTable with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

Expand Down Expand Up @@ -96,6 +96,14 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
functionMap.clear()
}

override def createTempTable(
statement: Statement,
tableName: String,
strSchema: String,
options: JdbcOptionsInWrite): Unit = {
statement.executeUpdate(s"CREATE LOCAL TEMPORARY TABLE $tableName ($strSchema)")
}

// CREATE INDEX syntax
// https://www.h2database.com/html/commands.html#create_index
override def createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -207,6 +208,31 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {
}
}

test("Upsert") {
val table = "upsert"
spark
.range(10)
.select(col("id"), col("id").as("val"))
.write
.jdbc(url, table, new Properties())
spark
.range(5, 15, 1, 10)
.withColumn("val", lit(-1))
.write
.options(Map("upsert" -> "true", "upsertKeyColumns" -> "id"))
.mode(SaveMode.Append)
.jdbc(url, table, new Properties())
val result = spark.read
.jdbc(url, table, new Properties())
.select((col("val") === -1).as("updated"))
.groupBy(col("updated"))
.count()
.sort(col("updated"))
.collect()
// we expect 5 unchanged rows (ids 0..4) and 10 updated rows (ids 5..14)
assert(result === Seq(Row(false, 5), Row(true, 10)))
}

test("Truncate") {
JdbcDialects.unregisterDialect(H2Dialect())
try {
Expand Down