From 2c9988eaf31b7ebd97f2c2904ed7ee531eff0d20 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Jun 2020 14:18:16 +0000 Subject: [PATCH 1/5] [SPARK-31910][SQL] Enable Java 8 time API in Thrift server ### What changes were proposed in this pull request? Set `spark.sql.datetime.java8API.enabled` to `true` in: 1. `SparkSQLEnv.init()` of Thrift server, and 2. `SparkSQLSessionManager.openSession()` ### Why are the changes needed? 1. Date and timestamp string literals are parsed by using Java 8 time API and Spark's session time zone. Before the changes, date/timestamp values were collected as legacy types `java.sql.Date`/`java.sql.Timestamp`, and the value of such types didn't respect the config `spark.sql.session.timeZone`. To have consistent view, users had to keep JVM time zone and Spark's session time zone in sync. 2. After the changes, formatting of date values doesn't depend on JVM time zone. 3. While returning dates/timestamps of Java 8 type, we can avoid dates/timestamps rebasing from Proleptic Gregorian calendar to the hybrid calendar (Julian + Gregorian), and the issues related to calendar switching. 4. Properly handle negative years (BCE). 5. Consistent conversion of date/timestamp strings to/from internal Catalyst types in both direction to and from Spark. ### Does this PR introduce any user-facing change? Yes. Before: ```sql spark-sql> select make_date(-44, 3, 15); 0045-03-15 ``` After: ```sql spark-sql> select make_date(-44, 3, 15); -0044-03-15 ``` ### How was this patch tested? Manually via `bin/spark-sql`. Closes #28729 from MaxGekk/enable-java8-time-api-in-thrift-server. Lead-authored-by: Max Gekk Co-authored-by: Maxim Gekk Signed-off-by: Wenchen Fan --- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 3 +++ .../hive/thriftserver/SparkSQLSessionManager.scala | 2 ++ .../spark/sql/hive/thriftserver/CliSuite.scala | 6 ++++++ .../hive/thriftserver/HiveThriftServer2Suites.scala | 12 ++++++++++++ 4 files changed, 23 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8944b93d9b69..233e6224a10d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ @@ -45,6 +46,8 @@ private[hive] object SparkSQLEnv extends Logging { sparkConf .setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}")) + .set(SQLConf.DATETIME_JAVA8API_ENABLED, true) + val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() sparkContext = sparkSession.sparkContext diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index e10e7ed1a276..806b6146b2db 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager +import org.apache.spark.sql.internal.SQLConf private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) @@ -63,6 +64,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: sqlContext.newSession() } ctx.setConf(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion) + ctx.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) val hiveSessionState = session.getSessionState setConfMap(ctx, hiveSessionState.getOverriddenConfigurations) setConfMap(ctx, hiveSessionState.getHiveVariables) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index ea1a371151c3..8546421a8692 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -551,4 +551,10 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { errorResponses = Seq("AnalysisException"))( ("", "Error in query: The second argument of 'date_sub' function needs to be an integer.")) } + + test("SPARK-30808: use Java 8 time API in Thrift SQL CLI by default") { + // If Java 8 time API is enabled via the SQL config `spark.sql.datetime.java8API.enabled`, + // the date formatter for `java.sql.LocalDate` must output negative years with sign. + runCliWithin(1.minute)("SELECT MAKE_DATE(-44, 3, 15);" -> "-0044-03-15") + } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 4e6d4e104021..ff54879cb508 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -862,6 +862,18 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } } + + test("SPARK-30808: use Java 8 time API and Proleptic Gregorian calendar by default") { + withJdbcStatement() { st => + // Proleptic Gregorian calendar has no gap in the range 1582-10-04..1582-10-15 + val date = "1582-10-10" + val rs = st.executeQuery(s"select date '$date'") + rs.next() + val expected = java.sql.Date.valueOf(date) + assert(rs.getDate(1) === expected) + assert(rs.getString(1) === expected.toString) + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { From fc6af9d900ec6f6a1cbe8f987857a69e6ef600d1 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 5 Jun 2020 16:44:16 +0000 Subject: [PATCH 2/5] [SPARK-31867][SQL][FOLLOWUP] Check result differences for datetime formatting ### What changes were proposed in this pull request? In this PR, we throw `SparkUpgradeException` when getting `DateTimeException` for datetime formatting in the `EXCEPTION` legacy Time Parser Policy. ### Why are the changes needed? `DateTimeException` is also declared by `java.time.format.DateTimeFormatter#format`, but in Spark, it can barely occur. We have suspected one that due to a JDK bug so far. see https://bugs.openjdk.java.net/browse/JDK-8079628. For `from_unixtime` function, we will suppress the DateTimeException caused by `DD` and result `NULL`. It is a silent date change that should be avoided in Java 8. ### Does this PR introduce _any_ user-facing change? Yes, when running on Java8 and using `from_unixtime` function with pattern `DD` to format datetimes, if dayofyear>=100, `SparkUpgradeException` will alert users instead of silently resulting null. For `date_format`, `SparkUpgradeException` take the palace of `DateTimeException`. ### How was this patch tested? add unit tests. Closes #28736 from yaooqinn/SPARK-31867-F. Authored-by: Kent Yao Signed-off-by: Wenchen Fan --- .../sql/catalyst/util/DateFormatter.scala | 7 +++-- .../util/DateTimeFormatterHelper.scala | 30 ++++++++++++++++--- .../catalyst/util/TimestampFormatter.scala | 7 +++-- .../util/TimestampFormatterSuite.scala | 15 ++++++++++ 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index b611ffa198b1..76ae3e5e8469 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -58,12 +58,15 @@ class Iso8601DateFormatter( try { val localDate = toLocalDate(formatter.parse(s)) localDateToDays(localDate) - } catch checkDiffResult(s, legacyFormatter.parse) + } catch checkParsedDiff(s, legacyFormatter.parse) } } override def format(localDate: LocalDate): String = { - localDate.format(formatter) + try { + localDate.format(formatter) + } catch checkFormattedDiff(toJavaDate(localDateToDays(localDate)), + (d: Date) => format(d)) } override def format(days: Int): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index 8e5c8651c8c3..992a2b12a462 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -21,7 +21,7 @@ import java.time._ import java.time.chrono.IsoChronology import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} -import java.util.Locale +import java.util.{Date, Locale} import com.google.common.cache.CacheBuilder @@ -109,13 +109,17 @@ trait DateTimeFormatterHelper { formatter } + private def needConvertToSparkUpgradeException(e: Throwable): Boolean = e match { + case _: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => true + case _ => false + } // When legacy time parser policy set to EXCEPTION, check whether we will get different results // between legacy parser and new parser. If new parser fails but legacy parser works, throw a // SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED, // DateTimeParseException will address by the caller side. - protected def checkDiffResult[T]( + protected def checkParsedDiff[T]( s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { - case e: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => + case e if needConvertToSparkUpgradeException(e) => try { legacyParseFunc(s) } catch { @@ -126,6 +130,25 @@ trait DateTimeFormatterHelper { s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e) } + // When legacy time parser policy set to EXCEPTION, check whether we will get different results + // between legacy formatter and new formatter. If new formatter fails but legacy formatter works, + // throw a SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED, + // DateTimeParseException will address by the caller side. + protected def checkFormattedDiff[T <: Date]( + d: T, + legacyFormatFunc: T => String): PartialFunction[Throwable, String] = { + case e if needConvertToSparkUpgradeException(e) => + val resultCandidate = try { + legacyFormatFunc(d) + } catch { + case _: Throwable => throw e + } + throw new SparkUpgradeException("3.0", s"Fail to format it to '$resultCandidate' in the new" + + s" formatter. You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore" + + " the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid" + + " datetime string.", e) + } + /** * When the new DateTimeFormatter failed to initialize because of invalid datetime pattern, it * will throw IllegalArgumentException. If the pattern can be recognized by the legacy formatter @@ -137,7 +160,6 @@ trait DateTimeFormatterHelper { * @param tryLegacyFormatter a func to capture exception, identically which forces a legacy * datetime formatter to be initialized */ - protected def checkLegacyFormatter( pattern: String, tryLegacyFormatter: => Unit): PartialFunction[Throwable, DateTimeFormatter] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 11dcdec7356f..f3b589657b25 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -84,12 +84,15 @@ class Iso8601TimestampFormatter( val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) - } catch checkDiffResult(s, legacyFormatter.parse) + } catch checkParsedDiff(s, legacyFormatter.parse) } } override def format(instant: Instant): String = { - formatter.withZone(zoneId).format(instant) + try { + formatter.withZone(zoneId).format(instant) + } catch checkFormattedDiff(toJavaTimestamp(instantToMicros(instant)), + (t: Timestamp) => format(t)) } override def format(us: Long): String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala index 02333a3eb9fc..e70f805b30f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala @@ -418,4 +418,19 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite { val t5 = f3.parse("AM") assert(t5 === date(1970)) } + + test("check result differences for datetime formatting") { + val formatter = TimestampFormatter("DD", UTC, isParsing = false) + assert(formatter.format(date(1970, 1, 3)) == "03") + assert(formatter.format(date(1970, 4, 9)) == "99") + + if (System.getProperty("java.version").split("\\D+")(0).toInt < 9) { + // https://bugs.openjdk.java.net/browse/JDK-8079628 + intercept[SparkUpgradeException] { + formatter.format(date(1970, 4, 10)) + } + } else { + assert(formatter.format(date(1970, 4, 10)) == "100") + } + } } From 5079831106ba22f81a26b4a8104a253422fa1b6a Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Sat, 6 Jun 2020 07:35:25 +0900 Subject: [PATCH 3/5] [SPARK-31904][SQL] Fix case sensitive problem of char and varchar partition columns ### What changes were proposed in this pull request? ```sql CREATE TABLE t1(a STRING, B VARCHAR(10), C CHAR(10)) STORED AS parquet; CREATE TABLE t2 USING parquet PARTITIONED BY (b, c) AS SELECT * FROM t1; SELECT * FROM t2 WHERE b = 'A'; ``` Above SQL throws MetaException > Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:810) ... 114 more Caused by: MetaException(message:Filtering is supported only on partition keys of type string, or integral types) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$FilterBuilder.setError(ExpressionTree.java:184) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.getJdoFilterPushdownParam(ExpressionTree.java:439) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilterOverPartitions(ExpressionTree.java:356) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilter(ExpressionTree.java:278) at org.apache.hadoop.hive.metastore.parser.ExpressionTree.generateJDOFilterFragment(ExpressionTree.java:583) at org.apache.hadoop.hive.metastore.ObjectStore.makeQueryFilterString(ObjectStore.java:3315) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsViaOrmFilter(ObjectStore.java:2768) at org.apache.hadoop.hive.metastore.ObjectStore.access$500(ObjectStore.java:182) at org.apache.hadoop.hive.metastore.ObjectStore$7.getJdoResult(ObjectStore.java:3248) at org.apache.hadoop.hive.metastore.ObjectStore$7.getJdoResult(ObjectStore.java:3232) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2974) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsByFilterInternal(ObjectStore.java:3250) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsByFilter(ObjectStore.java:2906) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:101) at com.sun.proxy.$Proxy25.getPartitionsByFilter(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_partitions_by_filter(HiveMetaStore.java:5093) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107) at com.sun.proxy.$Proxy26.get_partitions_by_filter(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(HiveMetaStoreClient.java:1232) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173) at com.sun.proxy.$Proxy27.listPartitionsByFilter(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Hive.java:2679) ... 119 more ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add a unit test. Closes #28724 from LantaoJin/SPARK-31904. Authored-by: LantaoJin Signed-off-by: Takeshi Yamamuro --- .../org/apache/spark/sql/hive/client/HiveShim.scala | 3 ++- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 2b806609426a..8df43b785759 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -708,7 +708,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { .map(col => col.getName).toSet def unapply(attr: Attribute): Option[String] = { - if (varcharKeys.contains(attr.name)) { + val resolver = SQLConf.get.resolver + if (varcharKeys.exists(c => resolver(c, attr.name))) { None } else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType) { Some(attr.name) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e8548fd62ddc..e8cf4ad5d9f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2720,4 +2720,14 @@ class HiveDDLSuite checkAnswer(sql("SHOW PARTITIONS ta_part"), Row("ts=10") :: Nil) } } + + test("SPARK-31904: Fix case sensitive problem of char and varchar partition columns") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(a STRING, B VARCHAR(10), C CHAR(10)) STORED AS parquet") + sql("CREATE TABLE t2 USING parquet PARTITIONED BY (b, c) AS SELECT * FROM t1") + // make sure there is no exception + assert(sql("SELECT * FROM t2 WHERE b = 'A'").collect().isEmpty) + assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty) + } + } } From 04f66bfd4eb863253ac9c30594055b8d5997c321 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Sat, 6 Jun 2020 16:49:48 +0900 Subject: [PATCH 4/5] [MINOR][SS][DOCS] fileNameOnly parameter description re-unite ### What changes were proposed in this pull request? `fileNameOnly` parameter is split to 2 pieces in [this](https://github.com/apache/spark/commit/dbb8143501ab87865d6e202c17297b9a73a0b1c3) commit. This PR re-unites it. ### Why are the changes needed? Parameter description split in doc. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` cd docs/ SKIP_API=1 jekyll build ``` Manual webpage check. Closes #28739 from gaborgsomogyi/datasettxtfix. Authored-by: Gabor Somogyi Signed-off-by: HyukjinKwon --- docs/structured-streaming-programming-guide.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1776d23607f7..69d744d09e1e 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -540,12 +540,13 @@ Here are the details of all the sources in Spark.
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
- maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week) -
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
- "s3a://a/b/c/dataset.txt"
+ "s3a://a/b/c/dataset.txt" +
+ maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week) +
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
From e9337f505b737f4501d4173baa9c5739626b06a8 Mon Sep 17 00:00:00 2001 From: iRakson Date: Sun, 7 Jun 2020 13:08:50 +0900 Subject: [PATCH 5/5] [SPARK-30119][WEBUI] Add Pagination Support to Streaming Page ### What changes were proposed in this pull request? * Pagination Support is added to all tables of streaming page in spark web UI. For adding pagination support, existing classes from #7399 were used. * Earlier streaming page has two tables `Active Batches` and `Completed Batches`. Now, we will have three tables `Running Batches`, `Waiting Batches` and `Completed Batches`. If we have large number of waiting and running batches then keeping track in a single table is difficult. Also other pages have different table for different type type of data. * Earlier empty tables were shown. Now only non-empty tables will be shown. `Active Batches` table used to show details of waiting batches followed by running batches. ### Why are the changes needed? Pagination will allow users to analyse the table in much better way. All spark web UI pages support pagination apart from streaming pages, so this will add consistency as well. Also it might fix the potential OOM errors that can arise. ### Does this PR introduce _any_ user-facing change? Yes. `Active Batches` table is split into two tables `Running Batches` and `Waiting Batches`. Pagination Support is added to the all the tables. Every other functionality is unchanged. ### How was this patch tested? Manually. Before changes: Screenshot 2020-05-03 at 7 07 14 PM After Changes: Screenshot 2020-05-03 at 6 51 22 PM Closes #28439 from iRakson/streamingPagination. Authored-by: iRakson Signed-off-by: Kousuke Saruta --- .../org/apache/spark/ui/static/webui.js | 3 +- .../spark/streaming/ui/AllBatchesTable.scala | 282 ++++++++++-------- .../spark/streaming/ui/StreamingPage.scala | 113 +++++-- .../spark/streaming/UISeleniumSuite.scala | 45 ++- 4 files changed, 267 insertions(+), 176 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index 4f8409ca2b7c..bb3725650c66 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -87,7 +87,8 @@ $(function() { collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages'); collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks'); collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds'); - collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches'); + collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches'); + collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches'); collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches'); collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions'); collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions'); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 1e443f656734..c0eec0e0b0a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -17,30 +17,41 @@ package org.apache.spark.streaming.ui -import scala.xml.Node - -import org.apache.spark.ui.{UIUtils => SparkUIUtils} +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest -private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) { +import scala.xml.Node - protected def columns: Seq[Node] = { - Batch Time - Records - Scheduling Delay - {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")} - - Processing Time - {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")} - } +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils} + +private[ui] class StreamingPagedTable( + request: HttpServletRequest, + tableTag: String, + batches: Seq[BatchUIData], + basePath: String, + subPath: String, + batchInterval: Long) extends PagedTable[BatchUIData] { + + private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time") + private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}" + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val firstFailureReason: Option[String] = + if (!tableTag.equals("waitingBatches")) { + getFirstFailureReason(batches) + } else { + None + } /** * Return the first failure reason if finding in the batches. */ - protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { + private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption } - protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { + private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption firstFailureReason.map { failureReason => val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) @@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) }.getOrElse(-) } - protected def baseRow(batch: BatchUIData): Seq[Node] = { - val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) - val numRecords = batch.numRecords - val schedulingDelay = batch.schedulingDelay - val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - val processingTime = batch.processingDelay - val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") - val batchTimeId = s"batch-$batchTime" - - - - {formattedBatchTime} - - - {numRecords.toString} records - - {formattedSchedulingDelay} - - - {formattedProcessingTime} - - } - - private def batchTable: Seq[Node] = { - - - {columns} - - - {renderRows} - -
- } - - def toNodeSeq: Seq[Node] = { - batchTable - } - - protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { + private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { { - SparkUIUtils.makeProgressBar( - started = batch.numActiveOutputOp, - completed = batch.numCompletedOutputOp, - failed = batch.numFailedOutputOp, - skipped = 0, - reasonToNumKilled = Map.empty, - total = batch.outputOperations.size) + SparkUIUtils.makeProgressBar( + started = batch.numActiveOutputOp, + completed = batch.numCompletedOutputOp, + failed = batch.numFailedOutputOp, + skipped = 0, + reasonToNumKilled = Map.empty, + total = batch.outputOperations.size) } } - /** - * Return HTML for all rows of this table. - */ - protected def renderRows: Seq[Node] -} + override def tableId: String = s"$tableTag-table" -private[ui] class ActiveBatchTable( - runningBatches: Seq[BatchUIData], - waitingBatches: Seq[BatchUIData], - batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" - private val firstFailureReason = getFirstFailureReason(runningBatches) + override def pageSizeFormField: String = s"$tableTag.pageSize" - override protected def columns: Seq[Node] = super.columns ++ { - Output Ops: Succeeded/Total - Status ++ { - if (firstFailureReason.nonEmpty) { - Error - } else { - Nil - } - } - } + override def pageNumberFormField: String = s"$tableTag.page" - override protected def renderRows: Seq[Node] = { - // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display - // waiting batches before running batches - waitingBatches.flatMap(batch => {waitingBatchRow(batch)}) ++ - runningBatches.flatMap(batch => {runningBatchRow(batch)}) + override def pageLink(page: Int): String = { + parameterPath + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$desc" + + s"&$pageNumberFormField=$page" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" } - private def runningBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing ++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil + override def goButtonFormPath: String = + s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag" + + override def dataSource: PagedDataSource[BatchUIData] = + new StreamingDataSource(batches, pageSize, sortColumn, desc) + + override def headers: Seq[Node] = { + // headers, sortable and tooltips + val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = { + Seq( + ("Batch Time", true, None), + ("Records", true, None), + ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " + + "of a batch")), + ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ { + if (tableTag.equals("completedBatches")) { + Seq( + ("Total Delay", true, Some("Total time taken to handle a batch")), + ("Output Ops: Succeeded/Total", false, None)) + } else { + Seq( + ("Output Ops: Succeeded/Total", false, None), + ("Status", false, None)) + } + } ++ { + if (firstFailureReason.nonEmpty) { + Seq(("Error", false, None)) + } else { + Nil + } } } + // check if sort column is a valid sortable column + isSortColumnValid(headersAndCssClasses, sortColumn) + + headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag) } - private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued++ { - if (firstFailureReason.nonEmpty) { - // Waiting batches have not run yet, so must have no failure reasons. - - - } else { - Nil + override def row(batch: BatchUIData): Seq[Node] = { + val batchTime = batch.batchTime.milliseconds + val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) + val numRecords = batch.numRecords + val schedulingDelay = batch.schedulingDelay + val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val processingTime = batch.processingDelay + val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") + val batchTimeId = s"batch-$batchTime" + val totalDelay = batch.totalDelay + val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + + + + + {formattedBatchTime} + + + {numRecords.toString} records + {formattedSchedulingDelay} + {formattedProcessingTime} + { + if (tableTag.equals("completedBatches")) { + {formattedTotalDelay} ++ + createOutputOperationProgressBar(batch) ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else if (tableTag.equals("runningBatches")) { + createOutputOperationProgressBar(batch) ++ + processing ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else { + createOutputOperationProgressBar(batch) ++ + queued ++ { + if (firstFailureReason.nonEmpty) { + // Waiting batches have not run yet, so must have no failure reasons. + - + } else { + Nil + } + } + } } - } + } } -private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) - extends BatchTableBase("completed-batches-table", batchInterval) { +private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String, + desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) { - private val firstFailureReason = getFirstFailureReason(batches) + private val data = info.sorted(ordering(sortColumn, desc)) - override protected def columns: Seq[Node] = super.columns ++ { - Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} - Output Ops: Succeeded/Total ++ { - if (firstFailureReason.nonEmpty) { - Error - } else { - Nil - } - } - } + override protected def dataSize: Int = data.size - override protected def renderRows: Seq[Node] = { - batches.flatMap(batch => {completedBatchRow(batch)}) - } + override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to) - private def completedBatchRow(batch: BatchUIData): Seq[Node] = { - val totalDelay = batch.totalDelay - val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - - baseRow(batch) ++ { - - {formattedTotalDelay} - - } ++ createOutputOperationProgressBar(batch)++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil - } + private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = { + val ordering: Ordering[BatchUIData] = column match { + case "Batch Time" => Ordering.by(_.batchTime.milliseconds) + case "Records" => Ordering.by(_.numRecords) + case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue)) + case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue)) + case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue)) + case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 3bdf009dbce6..42d0e50a068e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest +import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.spark.internal.Logging import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.util.Utils /** * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that @@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab) onClickTimelineFunc ++ basicInfo ++ listener.synchronized { generateStatTable() ++ - generateBatchListTables() + generateBatchListTables(request) } SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent) } @@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab) } - private def generateBatchListTables(): Seq[Node] = { + private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData], + tableTag: String): Seq[Node] = { + val interval: Long = listener.batchDuration + val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) + + try { + new StreamingPagedTable( + request, + tableTag, + batches, + SparkUIUtils.prependBaseUri(request, parent.basePath), + "streaming", + interval + ).table(streamingPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering streaming table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } + + private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = { val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse val completedBatches = listener.retainedCompletedBatches. sortBy(_.batchTime.milliseconds).reverse - val activeBatchesContent = { -
-
- -

- - Active Batches ({runningBatches.size + waitingBatches.size}) -

-
-
- {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq} + val content = mutable.ListBuffer[Node]() + + if (runningBatches.nonEmpty) { + content ++= +
+
+ +

+ + Running Batches ({runningBatches.size}) +

+
+
+ { streamingTable(request, runningBatches, "runningBatches") } +
-
} - val completedBatchesContent = { -
-
- -

- - Completed Batches (last {completedBatches.size} - out of {listener.numTotalCompletedBatches}) -

-
-
- {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq} + if (waitingBatches.nonEmpty) { + content ++= +
+
+ +

+ + Waiting Batches ({waitingBatches.size}) +

+
+
+ { streamingTable(request, waitingBatches, "waitingBatches") } +
-
} - activeBatchesContent ++ completedBatchesContent + if (completedBatches.nonEmpty) { + content ++= +
+
+ +

+ + Completed Batches (last {completedBatches.size} + out of {listener.numTotalCompletedBatches}) +

+
+
+ { streamingTable(request, completedBatches, "completedBatches") } +
+
+
+ } + content } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index bdc9e9ee2aed..952ef6c374f3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -63,7 +63,7 @@ class UISeleniumSuite .setMaster("local") .setAppName("test") .set(UI_ENABLED, true) - val ssc = new StreamingContext(conf, Seconds(1)) + val ssc = new StreamingContext(conf, Milliseconds(100)) assert(ssc.sc.ui.isDefined, "Spark UI is not started!") ssc } @@ -104,7 +104,7 @@ class UISeleniumSuite find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10.seconds), interval(500.milliseconds)) { // check whether streaming page exists go to (sparkUI.webUrl.stripSuffix("/") + "/streaming") val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq @@ -125,24 +125,47 @@ class UISeleniumSuite // Check batch tables val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq - h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true) + h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true) + h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true) h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) - findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", + val arrow = 0x25BE.toChar + findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Output Ops: Succeeded/Total", "Status") + } + findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", "Output Ops: Succeeded/Total", "Status") } - findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", - "Total Delay (?)", "Output Ops: Succeeded/Total") + findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Total Delay", "Output Ops: Succeeded/Total") } - val batchLinks = - findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq + val pageSize = 3 + val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" + + "&completedBatches.desc=true&completedBatches.page=1" + + s"&completedBatches.pageSize=$pageSize#completedBatches" + + go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath) + val completedTableRows = findAll(cssSelector("""#completedBatches-table tr""")) + .map(_.text).toList + // header row + pagesize + completedTableRows.length should be (1 + pageSize) + + val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" + + "&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches" + + // sort batches in ascending order of batch time + go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath) + + val batchLinks = findAll(cssSelector("""#completedBatches-table td a""")) + .flatMap(_.attribute("href")).toSeq batchLinks.size should be >= 1 // Check a normal batch page - go to (batchLinks.last) // Last should be the first batch, so it will have some jobs + go to (batchLinks.head) // Head is the first batch, so it will have some jobs val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq summaryText should contain ("Batch Duration:") summaryText should contain ("Input data size:")