Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
panbingkun committed Jun 24, 2024
commit 89538d0fc728e8768c34c2f343fbfd3b947d6d9f
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ private[spark] object LogKeys {
case object OLD_VALUE extends LogKey
case object OPEN_COST_IN_BYTES extends LogKey
case object OPERATION_HANDLE extends LogKey
case object OPERATION_HANDLE_ID extends LogKey
case object OPERATION_ID extends LogKey
case object OPTIMIZED_PLAN_COLUMNS extends LogKey
case object OPTIMIZER_CLASS_NAME extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
case StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
logWarning(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Redundant server side listener added. Exiting.")
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Redundant server side listener added. Exiting.")
return
case false =>
// This transfers sending back the response to the client until
Expand All @@ -87,7 +86,8 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
logError(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"[operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
Expand All @@ -96,15 +96,15 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
}
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until " +
log"all client side listeners are removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
Expand All @@ -113,7 +113,7 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
case false =>
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"No active server side listener bus listener but received remove listener call. " +
log"Exiting.")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{APP_ID, APP_STATE}
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand Down Expand Up @@ -98,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
throw new SparkException(exceptionMsg)
}
if (state == YarnApplicationState.RUNNING) {
logInfo(log"Application ${MDC(LogKeys.APP_ID, appId.get)} has started running.")
logInfo(log"Application ${MDC(APP_ID, appId.get)} has started running.")
}
}

Expand All @@ -117,7 +118,7 @@ private[spark] class YarnClientSchedulerBackend(
val YarnAppReport(_, state, diags) =
client.monitorApplication(logApplicationReport = false)
logError(log"YARN application has exited unexpectedly with state " +
log"${MDC(LogKeys.APP_STATE, state)}! Check the YARN application logs for more details.")
log"${MDC(APP_STATE, state)}! Check the YARN application logs for more details.")
diags.foreach { err =>
logError(log"Diagnostics message: ${MDC(LogKeys.ERROR, err)}")
}
Expand All @@ -127,7 +128,7 @@ private[spark] class YarnClientSchedulerBackend(
case FinalApplicationStatus.FAILED | FinalApplicationStatus.KILLED
if conf.get(AM_CLIENT_MODE_EXIT_ON_ERROR) =>
logWarning(log"ApplicationMaster finished with status " +
log"${MDC(LogKeys.APP_STATE, state)}, SparkContext should exit with code 1.")
log"${MDC(APP_STATE, state)}, SparkContext should exit with code 1.")
System.exit(1)
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ abstract class StreamExecution(
case _ => None
}

logError(log"Query ${MDC(QUERY_ID, prettyIdString)} terminated with error", e)
logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} terminated with error", e)
getLatestExecutionContext().updateStatusMessage(s"Terminated with exception: $message")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ class ContinuousExecution(
*/
def stopInNewThread(error: Throwable): Unit = {
if (failure.compareAndSet(null, error)) {
logError(log"Query ${MDC(QUERY_ID, prettyIdString)} received exception", error)
logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} received exception " +
log"${MDC(ERROR, error)}")
stopInNewThread()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected void cleanupOperationLog() {
if (operationLog == null) {
LOG.error("Operation [ {} ] logging is enabled, " +
"but its OperationLog object cannot be found.",
MDC.of(LogKeys.OPERATION_ID$.MODULE$, opHandle.getHandleIdentifier()));
MDC.of(LogKeys.OPERATION_HANDLE_ID$.MODULE$, opHandle.getHandleIdentifier()));
} else {
operationLog.close();
}
Expand Down