diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index 4056be4769d2..e472aaad5bdc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.hive.thriftserver.ui +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Calendar import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.collection.JavaConverters._ +import scala.xml.{Node, Unparsed} import org.apache.commons.text.StringEscapeUtils @@ -29,7 +32,7 @@ import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, import org.apache.spark.sql.hive.thriftserver.ui.ToolTips._ import org.apache.spark.ui._ import org.apache.spark.ui.UIUtils._ - +import org.apache.spark.util.Utils /** Page for Spark Web UI that shows statistics of the thrift server */ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging { @@ -69,45 +72,56 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" /** Generate stats of batch statements of the thrift server program */ private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = { + val numStatement = listener.getExecutionList.size + val table = if (numStatement > 0) { - val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Close Time", - "Execution Time", "Duration", "Statement", "State", "Detail") - val tooltips = Seq(None, None, None, None, Some(THRIFT_SERVER_FINISH_TIME), - Some(THRIFT_SERVER_CLOSE_TIME), Some(THRIFT_SERVER_EXECUTION), - Some(THRIFT_SERVER_DURATION), None, None, None) - assert(headerRow.length == tooltips.length) - val dataRows = listener.getExecutionList.sortBy(_.startTimestamp).reverse - - def generateDataRow(info: ExecutionInfo): Seq[Node] = { - val jobLink = info.jobId.map { id: String => - - [{id}] - + + val sqlTableTag = "sqlstat" + + val parameterOtherTable = request.getParameterMap().asScala + .filterNot(_._1.startsWith(sqlTableTag)) + .map { case (name, vals) => + name + "=" + vals(0) } - val detail = Option(info.detail).filter(!_.isEmpty).getOrElse(info.executePlan) - - {info.userName} - - {jobLink} - - {info.groupId} - {formatDate(info.startTimestamp)} - {if (info.finishTimestamp > 0) formatDate(info.finishTimestamp)} - {if (info.closeTimestamp > 0) formatDate(info.closeTimestamp)} - - {formatDurationOption(Some(info.totalTime(info.finishTimestamp)))} - - {formatDurationOption(Some(info.totalTime(info.closeTimestamp)))} - {info.statement} - {info.state} - {errorMessageCell(detail)} - - } - Some(UIUtils.listingTable(headerRow, generateDataRow, - dataRows, false, None, Seq(null), false, tooltipHeaders = tooltips)) + val parameterSqlTablePage = request.getParameter(s"$sqlTableTag.page") + val parameterSqlTableSortColumn = request.getParameter(s"$sqlTableTag.sort") + val parameterSqlTableSortDesc = request.getParameter(s"$sqlTableTag.desc") + val parameterSqlPageSize = request.getParameter(s"$sqlTableTag.pageSize") + + val sqlTablePage = Option(parameterSqlTablePage).map(_.toInt).getOrElse(1) + val sqlTableSortColumn = Option(parameterSqlTableSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse("Start Time") + val sqlTableSortDesc = Option(parameterSqlTableSortDesc).map(_.toBoolean).getOrElse( + // New executions should be shown above old executions by default. + sqlTableSortColumn == "Start Time" + ) + val sqlTablePageSize = Option(parameterSqlPageSize).map(_.toInt).getOrElse(100) + + try { + Some(new SqlStatsPagedTable( + request, + parent, + listener.getExecutionList, + "sqlserver", + UIUtils.prependBaseUri(request, parent.basePath), + parameterOtherTable, + sqlTableTag, + pageSize = sqlTablePageSize, + sortColumn = sqlTableSortColumn, + desc = sqlTableSortDesc + ).table(sqlTablePage)) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } } else { None } @@ -123,30 +137,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" content } - private def errorMessageCell(errorMessage: String): Seq[Node] = { - val isMultiline = errorMessage.indexOf('\n') >= 0 - val errorSummary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - errorMessage.substring(0, errorMessage.indexOf('\n')) - } else { - errorMessage - }) - val details = if (isMultiline) { - // scalastyle:off - - + details - ++ - - // scalastyle:on - } else { - "" - } - {errorSummary}{details} - } - /** Generate stats of batch sessions of the thrift server program */ private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { val sessionList = listener.getSessionList @@ -185,7 +175,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" content } - /** * Returns a human-readable string representing a duration such as "5 second 35 ms" */ @@ -202,3 +191,254 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" } } +private[ui] class SqlStatsPagedTable( + request: HttpServletRequest, + parent: ThriftServerTab, + data: Seq[ExecutionInfo], + subPath: String, + basePath: String, + parameterOtherTable: Iterable[String], + sqlStatsTableTag: String, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedTable[SqlStatsTableRow] { + + override val dataSource = new SqlStatsTableDataSource(data, pageSize, sortColumn, desc) + + private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" + + override def tableId: String = sqlStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-condensed table-striped " + + "table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + } + + override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sqlStatsTableTag.page" + + override def goButtonFormPath: String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn&$sqlStatsTableTag.desc=$desc" + } + + override def headers: Seq[Node] = { + val sqlTableHeaders = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", + "Close Time", "Execution Time", "Duration", "Statement", "State", "Detail") + + val tooltips = Seq(None, None, None, None, Some(THRIFT_SERVER_FINISH_TIME), + Some(THRIFT_SERVER_CLOSE_TIME), Some(THRIFT_SERVER_EXECUTION), + Some(THRIFT_SERVER_DURATION), None, None, None) + + assert(sqlTableHeaders.length == tooltips.length) + + val headerRow: Seq[Node] = { + sqlTableHeaders.zip(tooltips).map { case (header, tooltip) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$sqlStatsTableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$sqlStatsTableTag.desc=${!desc}" + + s"&$sqlStatsTableTag.pageSize=$pageSize" + + s"#$sqlStatsTableTag") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + if (tooltip.nonEmpty) { + + + + {header} {Unparsed(arrow)} + + + + } else { + + + {header} {Unparsed(arrow)} + + + } + } else { + val headerLink = Unparsed( + parameterPath + + s"&$sqlStatsTableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$sqlStatsTableTag.pageSize=$pageSize" + + s"#$sqlStatsTableTag") + + if(tooltip.nonEmpty) { + + + + {header} + + + + } else { + + + {header} + + + } + } + } + } + + {headerRow} + + } + + override def row(sqlStatsTableRow: SqlStatsTableRow): Seq[Node] = { + val info = sqlStatsTableRow.executionInfo + val startTime = info.startTimestamp + val executionTime = sqlStatsTableRow.executionTime + val duration = sqlStatsTableRow.duration + + def jobLinks(jobData: Seq[String]): Seq[Node] = { + jobData.map { jobId => + [{jobId.toString}] + } + } + + + + {info.userName} + + + {jobLinks(sqlStatsTableRow.jobId)} + + + {info.groupId} + + + {UIUtils.formatDate(startTime)} + + + {if (info.finishTimestamp > 0) formatDate(info.finishTimestamp)} + + + {if (info.closeTimestamp > 0) formatDate(info.closeTimestamp)} + + + {UIUtils.formatDuration(executionTime)} + + + {UIUtils.formatDuration(duration)} + + + {info.statement} + + + {info.state} + + {errorMessageCell(sqlStatsTableRow.detail)} + + } + + + private def errorMessageCell(errorMessage: String): Seq[Node] = { + val isMultiline = errorMessage.indexOf('\n') >= 0 + val errorSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + errorMessage.substring(0, errorMessage.indexOf('\n')) + } else { + errorMessage + }) + val details = if (isMultiline) { + // scalastyle:off + + + details + ++ + + // scalastyle:on + } else { + "" + } + + {errorSummary}{details} + + } + + private def jobURL(request: HttpServletRequest, jobId: String): String = + "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) +} + + private[ui] class SqlStatsTableRow( + val jobId: Seq[String], + val duration: Long, + val executionTime: Long, + val executionInfo: ExecutionInfo, + val detail: String) + + private[ui] class SqlStatsTableDataSource( + info: Seq[ExecutionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[SqlStatsTableRow](pageSize) { + + // Convert ExecutionInfo to SqlStatsTableRow which contains the final contents to show in + // the table so that we can avoid creating duplicate contents during sorting the data + private val data = info.map(sqlStatsTableRow).sorted(ordering(sortColumn, desc)) + + private var _slicedStartTime: Set[Long] = null + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[SqlStatsTableRow] = { + val r = data.slice(from, to) + r.map(x => x) + _slicedStartTime = r.map(_.executionInfo.startTimestamp).toSet + r + } + + private def sqlStatsTableRow(executionInfo: ExecutionInfo): SqlStatsTableRow = { + val duration = executionInfo.totalTime(executionInfo.closeTimestamp) + val executionTime = executionInfo.totalTime(executionInfo.finishTimestamp) + val detail = Option(executionInfo.detail).filter(!_.isEmpty) + .getOrElse(executionInfo.executePlan) + val jobId = executionInfo.jobId.toSeq.sorted + + new SqlStatsTableRow(jobId, duration, executionTime, executionInfo, detail) + + } + + /** + * Return Ordering according to sortColumn and desc. + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[SqlStatsTableRow] = { + val ordering: Ordering[SqlStatsTableRow] = sortColumn match { + case "User" => Ordering.by(_.executionInfo.userName) + case "JobID" => Ordering by (_.jobId.headOption) + case "GroupID" => Ordering.by(_.executionInfo.groupId) + case "Start Time" => Ordering.by(_.executionInfo.startTimestamp) + case "Finish Time" => Ordering.by(_.executionInfo.finishTimestamp) + case "Close Time" => Ordering.by(_.executionInfo.closeTimestamp) + case "Execution Time" => Ordering.by(_.executionTime) + case "Duration" => Ordering.by(_.duration) + case "Statement" => Ordering.by(_.executionInfo.statement) + case "State" => Ordering.by(_.executionInfo.state) + case "Detail" => Ordering.by(_.detail) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } + + }