diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index 4f8409ca2b7c..bb3725650c66 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -87,7 +87,8 @@ $(function() { collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages'); collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks'); collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds'); - collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches'); + collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches'); + collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches'); collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches'); collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions'); collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions'); diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1776d23607f7..69d744d09e1e 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -540,12 +540,13 @@ Here are the details of all the sources in Spark.
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
- maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week) -
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
- "s3a://a/b/c/dataset.txt"
+ "s3a://a/b/c/dataset.txt" +
+ maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week) +
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index b611ffa198b1..76ae3e5e8469 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -58,12 +58,15 @@ class Iso8601DateFormatter( try { val localDate = toLocalDate(formatter.parse(s)) localDateToDays(localDate) - } catch checkDiffResult(s, legacyFormatter.parse) + } catch checkParsedDiff(s, legacyFormatter.parse) } } override def format(localDate: LocalDate): String = { - localDate.format(formatter) + try { + localDate.format(formatter) + } catch checkFormattedDiff(toJavaDate(localDateToDays(localDate)), + (d: Date) => format(d)) } override def format(days: Int): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index 8e5c8651c8c3..992a2b12a462 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -21,7 +21,7 @@ import java.time._ import java.time.chrono.IsoChronology import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} -import java.util.Locale +import java.util.{Date, Locale} import com.google.common.cache.CacheBuilder @@ -109,13 +109,17 @@ trait DateTimeFormatterHelper { formatter } + private def needConvertToSparkUpgradeException(e: Throwable): Boolean = e match { + case _: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => true + case _ => false + } // When legacy time parser policy set to EXCEPTION, check whether we will get different results // between legacy parser and new parser. If new parser fails but legacy parser works, throw a // SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED, // DateTimeParseException will address by the caller side. - protected def checkDiffResult[T]( + protected def checkParsedDiff[T]( s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { - case e: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => + case e if needConvertToSparkUpgradeException(e) => try { legacyParseFunc(s) } catch { @@ -126,6 +130,25 @@ trait DateTimeFormatterHelper { s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e) } + // When legacy time parser policy set to EXCEPTION, check whether we will get different results + // between legacy formatter and new formatter. If new formatter fails but legacy formatter works, + // throw a SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED, + // DateTimeParseException will address by the caller side. + protected def checkFormattedDiff[T <: Date]( + d: T, + legacyFormatFunc: T => String): PartialFunction[Throwable, String] = { + case e if needConvertToSparkUpgradeException(e) => + val resultCandidate = try { + legacyFormatFunc(d) + } catch { + case _: Throwable => throw e + } + throw new SparkUpgradeException("3.0", s"Fail to format it to '$resultCandidate' in the new" + + s" formatter. You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore" + + " the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid" + + " datetime string.", e) + } + /** * When the new DateTimeFormatter failed to initialize because of invalid datetime pattern, it * will throw IllegalArgumentException. If the pattern can be recognized by the legacy formatter @@ -137,7 +160,6 @@ trait DateTimeFormatterHelper { * @param tryLegacyFormatter a func to capture exception, identically which forces a legacy * datetime formatter to be initialized */ - protected def checkLegacyFormatter( pattern: String, tryLegacyFormatter: => Unit): PartialFunction[Throwable, DateTimeFormatter] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 11dcdec7356f..f3b589657b25 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -84,12 +84,15 @@ class Iso8601TimestampFormatter( val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) - } catch checkDiffResult(s, legacyFormatter.parse) + } catch checkParsedDiff(s, legacyFormatter.parse) } } override def format(instant: Instant): String = { - formatter.withZone(zoneId).format(instant) + try { + formatter.withZone(zoneId).format(instant) + } catch checkFormattedDiff(toJavaTimestamp(instantToMicros(instant)), + (t: Timestamp) => format(t)) } override def format(us: Long): String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala index 02333a3eb9fc..e70f805b30f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala @@ -418,4 +418,19 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite { val t5 = f3.parse("AM") assert(t5 === date(1970)) } + + test("check result differences for datetime formatting") { + val formatter = TimestampFormatter("DD", UTC, isParsing = false) + assert(formatter.format(date(1970, 1, 3)) == "03") + assert(formatter.format(date(1970, 4, 9)) == "99") + + if (System.getProperty("java.version").split("\\D+")(0).toInt < 9) { + // https://bugs.openjdk.java.net/browse/JDK-8079628 + intercept[SparkUpgradeException] { + formatter.format(date(1970, 4, 10)) + } + } else { + assert(formatter.format(date(1970, 4, 10)) == "100") + } + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8944b93d9b69..233e6224a10d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ @@ -45,6 +46,8 @@ private[hive] object SparkSQLEnv extends Logging { sparkConf .setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}")) + .set(SQLConf.DATETIME_JAVA8API_ENABLED, true) + val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() sparkContext = sparkSession.sparkContext diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index e10e7ed1a276..806b6146b2db 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager +import org.apache.spark.sql.internal.SQLConf private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) @@ -63,6 +64,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: sqlContext.newSession() } ctx.setConf(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion) + ctx.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) val hiveSessionState = session.getSessionState setConfMap(ctx, hiveSessionState.getOverriddenConfigurations) setConfMap(ctx, hiveSessionState.getHiveVariables) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index ea1a371151c3..8546421a8692 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -551,4 +551,10 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { errorResponses = Seq("AnalysisException"))( ("", "Error in query: The second argument of 'date_sub' function needs to be an integer.")) } + + test("SPARK-30808: use Java 8 time API in Thrift SQL CLI by default") { + // If Java 8 time API is enabled via the SQL config `spark.sql.datetime.java8API.enabled`, + // the date formatter for `java.sql.LocalDate` must output negative years with sign. + runCliWithin(1.minute)("SELECT MAKE_DATE(-44, 3, 15);" -> "-0044-03-15") + } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 4e6d4e104021..ff54879cb508 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -862,6 +862,18 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } } + + test("SPARK-30808: use Java 8 time API and Proleptic Gregorian calendar by default") { + withJdbcStatement() { st => + // Proleptic Gregorian calendar has no gap in the range 1582-10-04..1582-10-15 + val date = "1582-10-10" + val rs = st.executeQuery(s"select date '$date'") + rs.next() + val expected = java.sql.Date.valueOf(date) + assert(rs.getDate(1) === expected) + assert(rs.getString(1) === expected.toString) + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 2b806609426a..8df43b785759 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -708,7 +708,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { .map(col => col.getName).toSet def unapply(attr: Attribute): Option[String] = { - if (varcharKeys.contains(attr.name)) { + val resolver = SQLConf.get.resolver + if (varcharKeys.exists(c => resolver(c, attr.name))) { None } else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType) { Some(attr.name) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e8548fd62ddc..e8cf4ad5d9f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2720,4 +2720,14 @@ class HiveDDLSuite checkAnswer(sql("SHOW PARTITIONS ta_part"), Row("ts=10") :: Nil) } } + + test("SPARK-31904: Fix case sensitive problem of char and varchar partition columns") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(a STRING, B VARCHAR(10), C CHAR(10)) STORED AS parquet") + sql("CREATE TABLE t2 USING parquet PARTITIONED BY (b, c) AS SELECT * FROM t1") + // make sure there is no exception + assert(sql("SELECT * FROM t2 WHERE b = 'A'").collect().isEmpty) + assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 1e443f656734..c0eec0e0b0a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -17,30 +17,41 @@ package org.apache.spark.streaming.ui -import scala.xml.Node - -import org.apache.spark.ui.{UIUtils => SparkUIUtils} +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest -private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) { +import scala.xml.Node - protected def columns: Seq[Node] = { - Batch Time - Records - Scheduling Delay - {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")} - - Processing Time - {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")} - } +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils} + +private[ui] class StreamingPagedTable( + request: HttpServletRequest, + tableTag: String, + batches: Seq[BatchUIData], + basePath: String, + subPath: String, + batchInterval: Long) extends PagedTable[BatchUIData] { + + private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time") + private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}" + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val firstFailureReason: Option[String] = + if (!tableTag.equals("waitingBatches")) { + getFirstFailureReason(batches) + } else { + None + } /** * Return the first failure reason if finding in the batches. */ - protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { + private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption } - protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { + private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption firstFailureReason.map { failureReason => val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) @@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) }.getOrElse(-) } - protected def baseRow(batch: BatchUIData): Seq[Node] = { - val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) - val numRecords = batch.numRecords - val schedulingDelay = batch.schedulingDelay - val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - val processingTime = batch.processingDelay - val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") - val batchTimeId = s"batch-$batchTime" - - - - {formattedBatchTime} - - - {numRecords.toString} records - - {formattedSchedulingDelay} - - - {formattedProcessingTime} - - } - - private def batchTable: Seq[Node] = { - - - {columns} - - - {renderRows} - -
- } - - def toNodeSeq: Seq[Node] = { - batchTable - } - - protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { + private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { { - SparkUIUtils.makeProgressBar( - started = batch.numActiveOutputOp, - completed = batch.numCompletedOutputOp, - failed = batch.numFailedOutputOp, - skipped = 0, - reasonToNumKilled = Map.empty, - total = batch.outputOperations.size) + SparkUIUtils.makeProgressBar( + started = batch.numActiveOutputOp, + completed = batch.numCompletedOutputOp, + failed = batch.numFailedOutputOp, + skipped = 0, + reasonToNumKilled = Map.empty, + total = batch.outputOperations.size) } } - /** - * Return HTML for all rows of this table. - */ - protected def renderRows: Seq[Node] -} + override def tableId: String = s"$tableTag-table" -private[ui] class ActiveBatchTable( - runningBatches: Seq[BatchUIData], - waitingBatches: Seq[BatchUIData], - batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" - private val firstFailureReason = getFirstFailureReason(runningBatches) + override def pageSizeFormField: String = s"$tableTag.pageSize" - override protected def columns: Seq[Node] = super.columns ++ { - Output Ops: Succeeded/Total - Status ++ { - if (firstFailureReason.nonEmpty) { - Error - } else { - Nil - } - } - } + override def pageNumberFormField: String = s"$tableTag.page" - override protected def renderRows: Seq[Node] = { - // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display - // waiting batches before running batches - waitingBatches.flatMap(batch => {waitingBatchRow(batch)}) ++ - runningBatches.flatMap(batch => {runningBatchRow(batch)}) + override def pageLink(page: Int): String = { + parameterPath + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$desc" + + s"&$pageNumberFormField=$page" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" } - private def runningBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing ++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil + override def goButtonFormPath: String = + s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag" + + override def dataSource: PagedDataSource[BatchUIData] = + new StreamingDataSource(batches, pageSize, sortColumn, desc) + + override def headers: Seq[Node] = { + // headers, sortable and tooltips + val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = { + Seq( + ("Batch Time", true, None), + ("Records", true, None), + ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " + + "of a batch")), + ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ { + if (tableTag.equals("completedBatches")) { + Seq( + ("Total Delay", true, Some("Total time taken to handle a batch")), + ("Output Ops: Succeeded/Total", false, None)) + } else { + Seq( + ("Output Ops: Succeeded/Total", false, None), + ("Status", false, None)) + } + } ++ { + if (firstFailureReason.nonEmpty) { + Seq(("Error", false, None)) + } else { + Nil + } } } + // check if sort column is a valid sortable column + isSortColumnValid(headersAndCssClasses, sortColumn) + + headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag) } - private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued++ { - if (firstFailureReason.nonEmpty) { - // Waiting batches have not run yet, so must have no failure reasons. - - - } else { - Nil + override def row(batch: BatchUIData): Seq[Node] = { + val batchTime = batch.batchTime.milliseconds + val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) + val numRecords = batch.numRecords + val schedulingDelay = batch.schedulingDelay + val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val processingTime = batch.processingDelay + val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") + val batchTimeId = s"batch-$batchTime" + val totalDelay = batch.totalDelay + val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + + + + + {formattedBatchTime} + + + {numRecords.toString} records + {formattedSchedulingDelay} + {formattedProcessingTime} + { + if (tableTag.equals("completedBatches")) { + {formattedTotalDelay} ++ + createOutputOperationProgressBar(batch) ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else if (tableTag.equals("runningBatches")) { + createOutputOperationProgressBar(batch) ++ + processing ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else { + createOutputOperationProgressBar(batch) ++ + queued ++ { + if (firstFailureReason.nonEmpty) { + // Waiting batches have not run yet, so must have no failure reasons. + - + } else { + Nil + } + } + } } - } + } } -private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) - extends BatchTableBase("completed-batches-table", batchInterval) { +private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String, + desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) { - private val firstFailureReason = getFirstFailureReason(batches) + private val data = info.sorted(ordering(sortColumn, desc)) - override protected def columns: Seq[Node] = super.columns ++ { - Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} - Output Ops: Succeeded/Total ++ { - if (firstFailureReason.nonEmpty) { - Error - } else { - Nil - } - } - } + override protected def dataSize: Int = data.size - override protected def renderRows: Seq[Node] = { - batches.flatMap(batch => {completedBatchRow(batch)}) - } + override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to) - private def completedBatchRow(batch: BatchUIData): Seq[Node] = { - val totalDelay = batch.totalDelay - val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - - baseRow(batch) ++ { - - {formattedTotalDelay} - - } ++ createOutputOperationProgressBar(batch)++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil - } + private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = { + val ordering: Ordering[BatchUIData] = column match { + case "Batch Time" => Ordering.by(_.batchTime.milliseconds) + case "Records" => Ordering.by(_.numRecords) + case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue)) + case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue)) + case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue)) + case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 3bdf009dbce6..42d0e50a068e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest +import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.spark.internal.Logging import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.util.Utils /** * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that @@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab) onClickTimelineFunc ++ basicInfo ++ listener.synchronized { generateStatTable() ++ - generateBatchListTables() + generateBatchListTables(request) } SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent) } @@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab) } - private def generateBatchListTables(): Seq[Node] = { + private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData], + tableTag: String): Seq[Node] = { + val interval: Long = listener.batchDuration + val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) + + try { + new StreamingPagedTable( + request, + tableTag, + batches, + SparkUIUtils.prependBaseUri(request, parent.basePath), + "streaming", + interval + ).table(streamingPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering streaming table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } + + private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = { val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse val completedBatches = listener.retainedCompletedBatches. sortBy(_.batchTime.milliseconds).reverse - val activeBatchesContent = { -
-
- -

- - Active Batches ({runningBatches.size + waitingBatches.size}) -

-
-
- {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq} + val content = mutable.ListBuffer[Node]() + + if (runningBatches.nonEmpty) { + content ++= +
+
+ +

+ + Running Batches ({runningBatches.size}) +

+
+
+ { streamingTable(request, runningBatches, "runningBatches") } +
-
} - val completedBatchesContent = { -
-
- -

- - Completed Batches (last {completedBatches.size} - out of {listener.numTotalCompletedBatches}) -

-
-
- {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq} + if (waitingBatches.nonEmpty) { + content ++= +
+
+ +

+ + Waiting Batches ({waitingBatches.size}) +

+
+
+ { streamingTable(request, waitingBatches, "waitingBatches") } +
-
} - activeBatchesContent ++ completedBatchesContent + if (completedBatches.nonEmpty) { + content ++= +
+
+ +

+ + Completed Batches (last {completedBatches.size} + out of {listener.numTotalCompletedBatches}) +

+
+
+ { streamingTable(request, completedBatches, "completedBatches") } +
+
+
+ } + content } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index bdc9e9ee2aed..952ef6c374f3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -63,7 +63,7 @@ class UISeleniumSuite .setMaster("local") .setAppName("test") .set(UI_ENABLED, true) - val ssc = new StreamingContext(conf, Seconds(1)) + val ssc = new StreamingContext(conf, Milliseconds(100)) assert(ssc.sc.ui.isDefined, "Spark UI is not started!") ssc } @@ -104,7 +104,7 @@ class UISeleniumSuite find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10.seconds), interval(500.milliseconds)) { // check whether streaming page exists go to (sparkUI.webUrl.stripSuffix("/") + "/streaming") val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq @@ -125,24 +125,47 @@ class UISeleniumSuite // Check batch tables val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq - h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true) + h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true) + h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true) h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) - findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", + val arrow = 0x25BE.toChar + findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Output Ops: Succeeded/Total", "Status") + } + findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", "Output Ops: Succeeded/Total", "Status") } - findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", - "Total Delay (?)", "Output Ops: Succeeded/Total") + findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Total Delay", "Output Ops: Succeeded/Total") } - val batchLinks = - findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq + val pageSize = 3 + val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" + + "&completedBatches.desc=true&completedBatches.page=1" + + s"&completedBatches.pageSize=$pageSize#completedBatches" + + go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath) + val completedTableRows = findAll(cssSelector("""#completedBatches-table tr""")) + .map(_.text).toList + // header row + pagesize + completedTableRows.length should be (1 + pageSize) + + val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" + + "&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches" + + // sort batches in ascending order of batch time + go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath) + + val batchLinks = findAll(cssSelector("""#completedBatches-table td a""")) + .flatMap(_.attribute("href")).toSeq batchLinks.size should be >= 1 // Check a normal batch page - go to (batchLinks.last) // Last should be the first batch, so it will have some jobs + go to (batchLinks.head) // Head is the first batch, so it will have some jobs val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq summaryText should contain ("Batch Duration:") summaryText should contain ("Input data size:")