From 11852c75c8b9b81f52346a0d7362d93e783e247a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 14 Oct 2019 18:30:22 +0900 Subject: [PATCH 1/6] [SPARK-29461][SQL] Measure records being written for JDBC writer --- .../datasources/jdbc/JdbcUtils.scala | 22 ++++--- .../spark/sql/jdbc/JDBCWriteSuite.scala | 57 +++++++++++++++++++ 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 86a27b5afc25..e4d09d76020d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -615,7 +615,7 @@ object JdbcUtils extends Logging { batchSize: Int, dialect: JdbcDialect, isolationLevel: Int, - options: JDBCOptions): Iterator[Byte] = { + options: JDBCOptions): Long = { val conn = getConnection() var committed = false @@ -643,7 +643,7 @@ object JdbcUtils extends Logging { } } val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE - + var totalUpdatedRows = 0 try { if (supportsTransactions) { conn.setAutoCommit(false) // Everything in the same db transaction. @@ -673,12 +673,12 @@ object JdbcUtils extends Logging { stmt.addBatch() rowCount += 1 if (rowCount % batchSize == 0) { - stmt.executeBatch() + totalUpdatedRows += stmt.executeBatch().sum rowCount = 0 } } if (rowCount > 0) { - stmt.executeBatch() + totalUpdatedRows += stmt.executeBatch().sum } } finally { stmt.close() @@ -687,7 +687,7 @@ object JdbcUtils extends Logging { conn.commit() } committed = true - Iterator.empty + totalUpdatedRows } catch { case e: SQLException => val cause = e.getNextException @@ -840,10 +840,14 @@ object JdbcUtils extends Logging { case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n) case _ => df } - repartitionedDF.rdd.foreachPartition(iterator => savePartition( - getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, - options) - ) + repartitionedDF.rdd.foreachPartition { iterator => + val outMetrics = TaskContext.get().taskMetrics().outputMetrics + val totalUpdatedRows = savePartition( + getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, + options) + outMetrics.setRecordsWritten(outMetrics.recordsWritten + totalUpdatedRows) + Iterator.empty + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index b28c6531d42b..b46fdbfa7264 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -21,10 +21,12 @@ import java.sql.DriverManager import java.util.Properties import scala.collection.JavaConverters.propertiesAsScalaMapConverter +import scala.collection.mutable.ArrayBuffer import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -543,4 +545,59 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { }.getMessage assert(errMsg.contains("Statement was canceled or the session timed out")) } + + test("metrics") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + + runAndVerifyRecordsWritten(2) { + df.write.mode(SaveMode.Append).jdbc(url, "TEST.BASICCREATETEST", new Properties()) + } + + runAndVerifyRecordsWritten(1) { + df2.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", new Properties()) + } + + runAndVerifyRecordsWritten(1) { + df2.write.mode(SaveMode.Overwrite).option("truncate", true) + .jdbc(url, "TEST.BASICCREATETEST", new Properties()) + } + + runAndVerifyRecordsWritten(0) { + intercept[AnalysisException] { + df2.write.mode(SaveMode.ErrorIfExists).jdbc(url, "TEST.BASICCREATETEST", new Properties()) + } + } + + runAndVerifyRecordsWritten(0) { + df.write.mode(SaveMode.Ignore).jdbc(url, "TEST.BASICCREATETEST", new Properties()) + } + } + + private def runAndVerifyRecordsWritten(expected: Long)(job: => Unit): Unit = { + assert(expected === runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten)) + } + + private def runAndReturnMetrics(job: => Unit, collector: (SparkListenerTaskEnd) => Long): Long = { + val taskMetrics = new ArrayBuffer[Long]() + + // Avoid receiving earlier taskEnd events + sparkContext.listenerBus.waitUntilEmpty() + + val listener = new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskMetrics += collector(taskEnd) + } + } + sparkContext.addSparkListener(listener) + + job + + sparkContext.listenerBus.waitUntilEmpty() + + sparkContext.removeSparkListener(listener) + taskMetrics.sum + } + } From 298d9686741cb617598472324459d095f9ee726a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 14 Oct 2019 20:37:04 +0900 Subject: [PATCH 2/6] Fix compilation --- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index e4d09d76020d..0226a0140af9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -846,7 +846,6 @@ object JdbcUtils extends Logging { getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options) outMetrics.setRecordsWritten(outMetrics.recordsWritten + totalUpdatedRows) - Iterator.empty } } From 5f4c9e6141190f4ca3e01b5efc1234d1da7535e8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 15 Oct 2019 05:42:22 +0900 Subject: [PATCH 3/6] Fix UT: remove unnecessary registering dialect to fix missing unregister --- .../test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index b46fdbfa7264..f760cf8cff9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -547,7 +547,6 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { } test("metrics") { - JdbcDialects.registerDialect(testH2Dialect) val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) From 7bae87eef13e7c139ec0ef66ba65adbf9f043ff9 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 23 Oct 2019 19:46:58 +0900 Subject: [PATCH 4/6] Reflect review comments --- .../datasources/jdbc/JdbcUtils.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 0226a0140af9..e3472b1f3d6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -615,7 +615,9 @@ object JdbcUtils extends Logging { batchSize: Int, dialect: JdbcDialect, isolationLevel: Int, - options: JDBCOptions): Long = { + options: JDBCOptions): Unit = { + val outMetrics = TaskContext.get().taskMetrics().outputMetrics + val conn = getConnection() var committed = false @@ -643,7 +645,7 @@ object JdbcUtils extends Logging { } } val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE - var totalUpdatedRows = 0 + var totalRowCount = 0 try { if (supportsTransactions) { conn.setAutoCommit(false) // Everything in the same db transaction. @@ -672,13 +674,14 @@ object JdbcUtils extends Logging { } stmt.addBatch() rowCount += 1 + totalRowCount += 1 if (rowCount % batchSize == 0) { - totalUpdatedRows += stmt.executeBatch().sum + stmt.executeBatch() rowCount = 0 } } if (rowCount > 0) { - totalUpdatedRows += stmt.executeBatch().sum + stmt.executeBatch() } } finally { stmt.close() @@ -687,7 +690,6 @@ object JdbcUtils extends Logging { conn.commit() } committed = true - totalUpdatedRows } catch { case e: SQLException => val cause = e.getNextException @@ -715,9 +717,13 @@ object JdbcUtils extends Logging { // tell the user about another problem. if (supportsTransactions) { conn.rollback() + } else { + outMetrics.setRecordsWritten(totalRowCount) } conn.close() } else { + outMetrics.setRecordsWritten(totalRowCount) + // The stage must succeed. We cannot propagate any exception close() might throw. try { conn.close() @@ -840,12 +846,9 @@ object JdbcUtils extends Logging { case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n) case _ => df } - repartitionedDF.rdd.foreachPartition { iterator => - val outMetrics = TaskContext.get().taskMetrics().outputMetrics - val totalUpdatedRows = savePartition( + repartitionedDF.rdd.foreachPartition { iterator => savePartition( getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options) - outMetrics.setRecordsWritten(outMetrics.recordsWritten + totalUpdatedRows) } } From 620d111d91da8ea8ece0cda50d531d50e3ad5020 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 24 Oct 2019 13:10:38 +0900 Subject: [PATCH 5/6] Reflect missing review comment --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ++-- .../test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index e3472b1f3d6e..5dcaf43f6be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -847,8 +847,8 @@ object JdbcUtils extends Logging { case _ => df } repartitionedDF.rdd.foreachPartition { iterator => savePartition( - getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, - options) + getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, + options) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index f760cf8cff9a..8021ef1a17a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -598,5 +598,4 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { sparkContext.removeSparkListener(listener) taskMetrics.sum } - } From 6e908d172981c48790a7ac6692cdc19652caaf8c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 25 Oct 2019 09:56:09 +0900 Subject: [PATCH 6/6] add explanation of policy around recording metrics --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5dcaf43f6be5..55ca4e3624bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -605,6 +605,13 @@ object JdbcUtils extends Logging { * implementation changes elsewhere might easily render such a closure * non-Serializable. Instead, we explicitly close over all variables that * are used. + * + * Note that this method records task output metrics. It assumes the method is + * running in a task. For now, we only records the number of rows being written + * because there's no good way to measure the total bytes being written. Only + * effective outputs are taken into account: for example, metric will not be updated + * if it supports transaction and transaction is rolled back, but metric will be + * updated even with error if it doesn't support transaction, as there're dirty outputs. */ def savePartition( getConnection: () => Connection,