From 7c73c4b5b3d0053d762b0641969e8d080ffbe74e Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Tue, 9 Jul 2024 15:24:29 -0700 Subject: [PATCH 01/11] more structured log migration --- .../org/apache/spark/internal/LogKey.scala | 10 ++++++++ .../org/apache/spark/util/MavenUtils.scala | 5 ++-- .../sql/connect/service/SessionHolder.scala | 22 +++++++++-------- .../SparkConnectListenerBusListener.scala | 12 ++++------ .../org/apache/spark/SecurityManager.scala | 18 +++++++------- .../spark/api/python/PythonRunner.scala | 24 ++++++++++++------- .../org/apache/spark/deploy/SparkSubmit.scala | 6 ++--- .../apache/spark/scheduler/DAGScheduler.scala | 12 ++++++---- .../spark/util/ShutdownHookManager.scala | 2 +- .../spark/util/collection/Spillable.scala | 11 +++++---- .../mllib/clustering/StreamingKMeans.scala | 4 ++-- .../mllib/optimization/GradientDescent.scala | 4 ++-- .../spark/mllib/optimization/LBFGS.scala | 6 ++--- .../sql/hive/client/HiveClientImpl.scala | 6 ++--- .../dstream/DStreamCheckpointData.scala | 3 ++- .../streaming/dstream/FileInputDStream.scala | 3 ++- .../streaming/dstream/RawInputDStream.scala | 2 +- .../streaming/scheduler/JobGenerator.scala | 4 ++-- 18 files changed, 89 insertions(+), 65 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 59e467402aa8..5e1504831d65 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -100,6 +100,7 @@ private[spark] object LogKeys { case object BLOCK_MANAGER_IDS extends LogKey case object BLOCK_TYPE extends LogKey case object BOOT extends LogKey + case object BOOT_TIME extends LogKey case object BOOTSTRAP_TIME extends LogKey case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey @@ -110,6 +111,8 @@ private[spark] object LogKeys { case object BYTE_SIZE extends LogKey case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey case object CACHE_AUTO_REMOVED_SIZE extends LogKey + case object CACHE_SIZE extends LogKey + case object CACHE_SIZE_KEY extends LogKey case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey @@ -282,6 +285,7 @@ private[spark] object LogKeys { case object FINAL_CONTEXT extends LogKey case object FINAL_OUTPUT_PATH extends LogKey case object FINAL_PATH extends LogKey + case object FINISH_TIME extends LogKey case object FINISH_TRIGGER_DURATION extends LogKey case object FREE_MEMORY_SIZE extends LogKey case object FROM_OFFSET extends LogKey @@ -320,10 +324,12 @@ private[spark] object LogKeys { case object INITIAL_CAPACITY extends LogKey case object INITIAL_HEARTBEAT_INTERVAL extends LogKey case object INIT_MODE extends LogKey + case object INIT_TIME extends LogKey case object INPUT extends LogKey case object INPUT_SPLIT extends LogKey case object INTEGRAL extends LogKey case object INTERVAL extends LogKey + case object INVALID_PARAMS extends LogKey case object ISOLATION_LEVEL extends LogKey case object ISSUE_DATE extends LogKey case object IS_NETWORK_REQUEST_DONE extends LogKey @@ -369,6 +375,7 @@ private[spark] object LogKeys { case object LOG_LEVEL extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey + case object LOSSES extends LogKey case object LOWER_BOUND extends LogKey case object MALFORMATTED_STRING extends LogKey case object MAP_ID extends LogKey @@ -566,6 +573,7 @@ private[spark] object LogKeys { case object OS_NAME extends LogKey case object OS_VERSION extends LogKey case object OUTPUT extends LogKey + case object OUTPUT_BUFFER extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PAGE_SIZE extends LogKey case object PARENT_STAGES extends LogKey @@ -611,8 +619,10 @@ private[spark] object LogKeys { case object PUSHED_FILTERS extends LogKey case object PUSH_MERGED_LOCAL_BLOCKS_SIZE extends LogKey case object PVC_METADATA_NAME extends LogKey + case object PYTHON_DAEMON_MODULE extends LogKey case object PYTHON_EXEC extends LogKey case object PYTHON_PACKAGES extends LogKey + case object PYTHON_USE_DAEMON extends LogKey case object PYTHON_VERSION extends LogKey case object PYTHON_WORKER_MODULE extends LogKey case object PYTHON_WORKER_RESPONSE extends LogKey diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala index 546981c8b543..42a1d1612aee 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -650,8 +650,9 @@ private[spark] object MavenUtils extends Logging { val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq if (invalidParams.nonEmpty) { logWarning( - s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " + - s"in Ivy URI query `$uriQuery`.") + log"Invalid parameters `${MDC(LogKeys.INVALID_PARAMS, + invalidParams.sorted.mkString(","))}` " + + log"found in Ivy URI query `${MDC(LogKeys.URI, uriQuery)}`.") } (transitive, exclusionList, repos) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 681f7e29630f..5fd5a1ba0c15 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -33,7 +33,7 @@ import com.google.common.cache.{Cache, CacheBuilder} import org.apache.spark.{SparkEnv, SparkException, SparkSQLException} import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.connect.proto -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession @@ -63,9 +63,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { logWarning( - s"Session plan cache is disabled due to non-positive cache size." + - s" Current value of '${Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" + - s" ${SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE)}.") + log"Session plan cache is disabled due to non-positive cache size." + + log" Current value of '${MDC(LogKeys.CACHE_SIZE_KEY, + Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is" + + log"${MDC(LogKeys.CACHE_SIZE, + SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { Some( @@ -248,15 +250,15 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def updateAccessTime(): Unit = { lastAccessTimeMs = System.currentTimeMillis() logInfo( - log"Session ${MDC(SESSION_KEY, key)} accessed, " + - log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") + log"Session ${MDC(LogKeys.SESSION_KEY, key)} accessed, " + + log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") } private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = { customInactiveTimeoutMs = newInactiveTimeoutMs logInfo( - log"Session ${MDC(SESSION_KEY, key)} " + - log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.") + log"Session ${MDC(LogKeys.SESSION_KEY, key)} " + + log"inactive timeout set to ${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms.") } /** @@ -282,8 +284,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio throw new IllegalStateException(s"Session $key is already closed.") } logInfo( - log"Closing session with userId: ${MDC(USER_ID, userId)} and " + - log"sessionId: ${MDC(SESSION_ID, sessionId)}") + log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}") closedTimeMs = Some(System.currentTimeMillis()) if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 56d0d920e95b..9b1ffcc86388 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -18,17 +18,14 @@ package org.apache.spark.sql.connect.service import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, CountDownLatch} - import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal - import io.grpc.stub.StreamObserver - import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.connect.proto.StreamingQueryEventType import org.apache.spark.connect.proto.StreamingQueryListenerEvent import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogKeys, Logging, MDC} import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.ArrayImplicits._ @@ -132,9 +129,10 @@ private[sql] class SparkConnectListenerBusListener( } catch { case NonFatal(e) => logError( - s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " + - s"Removing SparkConnectListenerBusListener and terminating the long-running thread " + - s"because of exception: $e") + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " + + log"Removing SparkConnectListenerBusListener and terminating the long-running thread " + + log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}") // This likely means that the client is not responsive even with retry, we should // remove this listener and cleanup resources. serverSideListenerHolder.cleanUp() diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 5a40afc76728..6b4e7cf32953 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -21,12 +21,11 @@ import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import java.util.Base64 - import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} - import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.{Logging, LogKeys, MDC} +import org.apache.spark.internal.LogKeys.JOB_ID +import org.apache.spark.internal.{LogKeys, Logging, MDC} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.launcher.SparkLauncher @@ -122,7 +121,7 @@ private[spark] class SecurityManager( */ def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = { viewAcls = adminAcls ++ defaultUsers ++ allowedUsers - logInfo("Changing view acls to: " + viewAcls.mkString(",")) + logInfo(log"Changing view acls to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}") } def setViewAcls(defaultUser: String, allowedUsers: Seq[String]): Unit = { @@ -135,7 +134,7 @@ private[spark] class SecurityManager( */ def setViewAclsGroups(allowedUserGroups: Seq[String]): Unit = { viewAclsGroups = adminAclsGroups ++ allowedUserGroups - logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(",")) + logInfo(log"Changing view acls groups to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}") } /** @@ -163,7 +162,7 @@ private[spark] class SecurityManager( */ def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = { modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers - logInfo("Changing modify acls to: " + modifyAcls.mkString(",")) + logInfo(log"Changing modify acls to: ${MDC(LogKeys.MODIFY_ACLS, modifyAcls.mkString(","))}") } /** @@ -172,7 +171,8 @@ private[spark] class SecurityManager( */ def setModifyAclsGroups(allowedUserGroups: Seq[String]): Unit = { modifyAclsGroups = adminAclsGroups ++ allowedUserGroups - logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(",")) + logInfo(log"Changing modify acls groups to: ${MDC(LogKeys.MODIFY_ACLS, + modifyAcls.mkString(","))}") } /** @@ -200,7 +200,7 @@ private[spark] class SecurityManager( */ def setAdminAcls(adminUsers: Seq[String]): Unit = { adminAcls = adminUsers.toSet - logInfo("Changing admin acls to: " + adminAcls.mkString(",")) + logInfo(log"Changing admin acls to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}") } /** @@ -209,7 +209,7 @@ private[spark] class SecurityManager( */ def setAdminAclsGroups(adminUserGroups: Seq[String]): Unit = { adminAclsGroups = adminUserGroups.toSet - logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(",")) + logInfo(log"Changing admin acls groups to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}") } def setAcls(aclSetting: Boolean): Unit = { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index b2571ffddc57..53392b0ef64f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.api.python.PythonFunction.PythonAccumulator -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.TASK_NAME import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python} import org.apache.spark.internal.config.Python._ @@ -131,9 +131,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val daemonModule = conf.get(PYTHON_DAEMON_MODULE).map { value => logInfo( - s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " + - "using this to start the daemon up. Note that this configuration only has an effect when " + - s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.") + log"Python daemon module in PySpark is set to " + + log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.PYTHON_DAEMON_MODULE, + PYTHON_DAEMON_MODULE.key)}', using this to start the daemon up. Note that this " + + log"configuration only has an effect when '${MDC(LogKeys.PYTHON_USE_DAEMON, + PYTHON_USE_DAEMON.key)}' is enabled and the platform is not Windows.") value }.getOrElse("pyspark.daemon") @@ -141,9 +143,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val workerModule = conf.get(PYTHON_WORKER_MODULE).map { value => logInfo( - s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " + - "using this to start the worker up. Note that this configuration only has an effect when " + - s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.") + log"Python worker module in PySpark is set to ${MDC(LogKeys.VALUE, value)} " + + log"in ${MDC(LogKeys.PYTHON_WORKER_MODULE, PYTHON_WORKER_MODULE.key)}, " + + log"using this to start the worker up. Note that this configuration only has " + + log"an effect when ${MDC(LogKeys.PYTHON_USE_DAEMON, PYTHON_USE_DAEMON.key)} " + + log"is disabled or the platform is Windows.") value }.getOrElse("pyspark.worker") @@ -509,8 +513,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val init = initTime - bootTime val finish = finishTime - initTime val total = finishTime - startTime - logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, - init, finish)) + logInfo(log"Times: total = ${MDC(LogKeys.TOTAL_TIME, total)}, " + + log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " + + log"init = ${MDC(LogKeys.INIT_TIME, init)}, " + + log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}") val memoryBytesSpilled = stream.readLong() val diskBytesSpilled = stream.readLong() context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7bb945ab9f14..e9b834712984 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -136,13 +136,13 @@ private[spark] class SparkSubmit extends Logging { /** Print version information to the log. */ private def printVersion(): Unit = { - logInfo("""Welcome to + logInfo(log"""Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version %s + /___/ .__/\_,_/_/ /_/\_\ version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)} /_/ - """.format(SPARK_VERSION)) + """) logInfo(log"Using Scala ${MDC(LogKeys.SCALA_VERSION, Properties.versionString)}," + log" ${MDC(LogKeys.JAVA_VM_NAME, Properties.javaVmName)}," + log" ${MDC(LogKeys.JAVA_VERSION, Properties.javaVersion)}") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f50e8bd25fec..608f2fc735bb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -36,7 +36,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY @@ -998,11 +998,13 @@ private[spark] class DAGScheduler( ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => - logInfo("Job %d finished: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " + + log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " + + log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e9)} s") case scala.util.Failure(exception) => - logInfo("Job %d failed: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " + + log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " + + log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e9)} s") // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 02556528ce38..ad9c3d3f10cf 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -110,7 +110,7 @@ private[spark] object ShutdownHookManager extends Logging { } } if (retval) { - logInfo("path = " + file + ", already present as root for deletion.") + logInfo(log"path = ${MDC(LogKeys.FILE_NAME, file)}, already present as root for deletion.") } retval } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index fe488f9cf0da..c5987678e720 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -18,7 +18,7 @@ package org.apache.spark.util.collection import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogKeys, Logging, MDC} import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager} @@ -143,8 +143,11 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) */ @inline private def logSpillage(size: Long): Unit = { val threadId = Thread.currentThread().getId - logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)" - .format(threadId, org.apache.spark.util.Utils.bytesToString(size), - _spillCount, if (_spillCount > 1) "s" else "")) + logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " + + log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE, + org.apache.spark.util.Utils.bytesToString(size))} to disk " + + log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)") + + ${MDC(LogKeys.AUTH_ENABLED, aclsOn)} } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 85a735007810..641b4fa4048a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaSparkContext._ -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{CLUSTER_CENTROIDS, CLUSTER_LABEL, CLUSTER_WEIGHT, LARGEST_CLUSTER_INDEX, SMALLEST_CLUSTER_INDEX} import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} import org.apache.spark.rdd.RDD @@ -222,7 +222,7 @@ class StreamingKMeans @Since("1.2.0") ( throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) } this.decayFactor = math.exp(math.log(0.5) / halfLife) - logInfo("Setting decay factor to: %g ".format (this.decayFactor)) + logInfo(log"Setting decay factor to: ${MDC(LogKeys.VALUE, this.decayFactor)}") this.timeUnit = timeUnit this } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index a288d13e57f7..8da2b0e27fb8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -299,8 +299,8 @@ object GradientDescent extends Logging { i += 1 } - logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( - stochasticLossHistory.takeRight(10).mkString(", "))) + logInfo(log"GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses " + + log"${MDC(LogKeys.LOSSES, stochasticLossHistory.takeRight(10).mkString(", "))}") (weights, stochasticLossHistory.toArray) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 4fc297560c08..28c997f5301c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.axpy import org.apache.spark.rdd.RDD @@ -217,8 +217,8 @@ object LBFGS extends Logging { val lossHistoryArray = lossHistory.result() - logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format( - lossHistoryArray.takeRight(10).mkString(", "))) + logInfo(log"LBFGS.runLBFGS finished. Last 10 losses ${MDC(LogKeys.LOSSES, + lossHistoryArray.takeRight(10).mkString(", "))}") (weights, lossHistoryArray) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 11e077e891bd..735814c9ae08 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,7 +46,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier @@ -906,11 +906,11 @@ private[hive] class HiveClientImpl( } catch { case e: Exception => logError( - s""" + log""" |====================== |HIVE FAILURE OUTPUT |====================== - |${outputBuffer.toString} + |${MDC(LogKeys.OUTPUT_BUFFER, outputBuffer.toString)} |====================== |END HIVE FAILURE OUTPUT |====================== diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 6fd6597d4f14..775da5a71f14 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -117,7 +117,8 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) // Create RDDs from the checkpoint data currentCheckpointFiles.foreach { case(time, file) => - logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + logInfo(log"Restoring checkpointed RDD for time ${MDC(LogKeys.TIME, time)} from file " + + log"'${MDC(LogKeys.FILE, file)}'") dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b39cd43ce1a6..b067c505da0d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -147,7 +147,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( override def compute(validTime: Time): Option[RDD[(K, V)]] = { // Find new files val newFiles = findNewFiles(validTime.milliseconds) - logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) + logInfo(log"New files at time ${MDC(LogKeys.BATCH_TIMESTAMP, validTime)}:\n" + + log"${MDC(LogKeys.FILE_NAME, newFiles.mkString("\n"))}") batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((validTime, newFiles)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index efda751c1e43..de7882032122 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -87,7 +87,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) val dataBuffer = ByteBuffer.allocate(length) readFully(channel, dataBuffer) dataBuffer.flip() - logInfo("Read a block with " + length + " bytes") + logInfo(log"Read a block with ${MDC(LogKeys.BYTE_SIZE, length)} bytes") queue.put(dataBuffer) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 91d15e7956a6..1dde435a913c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -221,8 +221,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) - logInfo("Batches during down time (" + downTimes.size + " batches): " - + downTimes.mkString(", ")) + logInfo(log"Batches during down time (${MDC(LogKeys.NUM_BATCHES, downTimes.size)} batches): " + + log"${MDC(LogKeys.BATCH_TIMES, downTimes.mkString(","))}") // Batches that were unprocessed before failure val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) From ca8360bef6c1c6a6c7a8bde8619455827ad89774 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Tue, 9 Jul 2024 15:43:22 -0700 Subject: [PATCH 02/11] style fixes --- .../org/apache/spark/sql/connect/service/SessionHolder.scala | 1 - .../connect/service/SparkConnectListenerBusListener.scala | 5 ++++- core/src/main/scala/org/apache/spark/SecurityManager.scala | 5 +++-- .../scala/org/apache/spark/util/collection/Spillable.scala | 4 +--- .../spark/streaming/dstream/DStreamCheckpointData.scala | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 5fd5a1ba0c15..6ff35b2aa5e7 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -34,7 +34,6 @@ import org.apache.spark.{SparkEnv, SparkException, SparkSQLException} import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.connect.proto import org.apache.spark.internal.{Logging, LogKeys, MDC} -import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 9b1ffcc86388..877d9bb3747b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -18,14 +18,17 @@ package org.apache.spark.sql.connect.service import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, CountDownLatch} + import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal + import io.grpc.stub.StreamObserver + import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.connect.proto.StreamingQueryEventType import org.apache.spark.connect.proto.StreamingQueryListenerEvent import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult -import org.apache.spark.internal.{LogKeys, Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.ArrayImplicits._ diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 6b4e7cf32953..c951876e6203 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -21,11 +21,12 @@ import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import java.util.Base64 + import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} + import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.LogKeys.JOB_ID -import org.apache.spark.internal.{LogKeys, Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.launcher.SparkLauncher diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index c5987678e720..c3d648dccea7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -18,7 +18,7 @@ package org.apache.spark.util.collection import org.apache.spark.SparkEnv -import org.apache.spark.internal.{LogKeys, Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager} @@ -147,7 +147,5 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE, org.apache.spark.util.Utils.bytesToString(size))} to disk " + log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)") - - ${MDC(LogKeys.AUTH_ENABLED, aclsOn)} } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 775da5a71f14..128a5fded49a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -118,7 +118,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) currentCheckpointFiles.foreach { case(time, file) => logInfo(log"Restoring checkpointed RDD for time ${MDC(LogKeys.TIME, time)} from file " + - log"'${MDC(LogKeys.FILE, file)}'") + log"'${MDC(LogKeys.FILE_NAME, file)}'") dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } From 025c5d05a3f125183d503ad42850a7778f141c11 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Tue, 9 Jul 2024 16:05:17 -0700 Subject: [PATCH 03/11] revise logging messages --- .../scala/org/apache/spark/internal/LogKey.scala | 1 - .../spark/sql/connect/service/SessionHolder.scala | 12 +++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 5e1504831d65..d41d3461f946 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -112,7 +112,6 @@ private[spark] object LogKeys { case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey case object CACHE_AUTO_REMOVED_SIZE extends LogKey case object CACHE_SIZE extends LogKey - case object CACHE_SIZE_KEY extends LogKey case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 6ff35b2aa5e7..7fa72d19dc4b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -63,7 +63,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { logWarning( log"Session plan cache is disabled due to non-positive cache size." + - log" Current value of '${MDC(LogKeys.CACHE_SIZE_KEY, + log" Current value of '${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is" + log"${MDC(LogKeys.CACHE_SIZE, SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") @@ -249,15 +249,17 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def updateAccessTime(): Unit = { lastAccessTimeMs = System.currentTimeMillis() logInfo( - log"Session ${MDC(LogKeys.SESSION_KEY, key)} accessed, " + - log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") + log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} accessed," + + log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") } private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = { customInactiveTimeoutMs = newInactiveTimeoutMs logInfo( - log"Session ${MDC(LogKeys.SESSION_KEY, key)} " + - log"inactive timeout set to ${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms.") + log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} inactive timeout set to " + + log"${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms") } /** From 16fe384a8c431564f7e3b05cfe46b8029bfdcb08 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Tue, 9 Jul 2024 16:24:20 -0700 Subject: [PATCH 04/11] logkey modifications --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 8 ++++---- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 53392b0ef64f..6a67587fbd80 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -132,9 +132,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( conf.get(PYTHON_DAEMON_MODULE).map { value => logInfo( log"Python daemon module in PySpark is set to " + - log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.PYTHON_DAEMON_MODULE, + log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.CONFIG, PYTHON_DAEMON_MODULE.key)}', using this to start the daemon up. Note that this " + - log"configuration only has an effect when '${MDC(LogKeys.PYTHON_USE_DAEMON, + log"configuration only has an effect when '${MDC(LogKeys.CONFIG2, PYTHON_USE_DAEMON.key)}' is enabled and the platform is not Windows.") value }.getOrElse("pyspark.daemon") @@ -144,9 +144,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( conf.get(PYTHON_WORKER_MODULE).map { value => logInfo( log"Python worker module in PySpark is set to ${MDC(LogKeys.VALUE, value)} " + - log"in ${MDC(LogKeys.PYTHON_WORKER_MODULE, PYTHON_WORKER_MODULE.key)}, " + + log"in ${MDC(LogKeys.CONFIG, PYTHON_WORKER_MODULE.key)}, " + log"using this to start the worker up. Note that this configuration only has " + - log"an effect when ${MDC(LogKeys.PYTHON_USE_DAEMON, PYTHON_USE_DAEMON.key)} " + + log"an effect when ${MDC(LogKeys.CONFIG2, PYTHON_USE_DAEMON.key)} " + log"is disabled or the platform is Windows.") value }.getOrElse("pyspark.worker") diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e9b834712984..7bb945ab9f14 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -136,13 +136,13 @@ private[spark] class SparkSubmit extends Logging { /** Print version information to the log. */ private def printVersion(): Unit = { - logInfo(log"""Welcome to + logInfo("""Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)} + /___/ .__/\_,_/_/ /_/\_\ version %s /_/ - """) + """.format(SPARK_VERSION)) logInfo(log"Using Scala ${MDC(LogKeys.SCALA_VERSION, Properties.versionString)}," + log" ${MDC(LogKeys.JAVA_VM_NAME, Properties.javaVmName)}," + log" ${MDC(LogKeys.JAVA_VERSION, Properties.javaVersion)}") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 608f2fc735bb..78ae5bd39d23 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1000,11 +1000,11 @@ private[spark] class DAGScheduler( case scala.util.Success(_) => logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " + log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " + - log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e9)} s") + log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms") case scala.util.Failure(exception) => logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " + log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " + - log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e9)} s") + log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms") // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) From bf6c2e01802e797050fe4dd210c13477fde64fa9 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Tue, 9 Jul 2024 16:26:13 -0700 Subject: [PATCH 05/11] remove unused logkeys --- .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index d41d3461f946..34ec69980b5f 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -618,10 +618,8 @@ private[spark] object LogKeys { case object PUSHED_FILTERS extends LogKey case object PUSH_MERGED_LOCAL_BLOCKS_SIZE extends LogKey case object PVC_METADATA_NAME extends LogKey - case object PYTHON_DAEMON_MODULE extends LogKey case object PYTHON_EXEC extends LogKey case object PYTHON_PACKAGES extends LogKey - case object PYTHON_USE_DAEMON extends LogKey case object PYTHON_VERSION extends LogKey case object PYTHON_WORKER_MODULE extends LogKey case object PYTHON_WORKER_RESPONSE extends LogKey From 44c8172fe9e5ca64d76aec2247c61543f580677f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 9 Jul 2024 20:04:55 -0700 Subject: [PATCH 06/11] fix --- .../utils/src/main/scala/org/apache/spark/internal/LogKey.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 34ec69980b5f..51ef112a677d 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -100,8 +100,8 @@ private[spark] object LogKeys { case object BLOCK_MANAGER_IDS extends LogKey case object BLOCK_TYPE extends LogKey case object BOOT extends LogKey - case object BOOT_TIME extends LogKey case object BOOTSTRAP_TIME extends LogKey + case object BOOT_TIME extends LogKey case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey From ebf39c60659759fe1c07db623f021bafad1926a3 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 10 Jul 2024 05:33:30 -0700 Subject: [PATCH 07/11] scalafmt --- .../spark/sql/connect/service/SessionHolder.scala | 11 ++++------- .../service/SparkConnectListenerBusListener.scala | 9 ++++----- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 7fa72d19dc4b..78b66dc5a2bc 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -61,12 +61,9 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio // memorizing `LogicalPlan`s which may be a sub-tree in a subsequent plan. private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { - logWarning( - log"Session plan cache is disabled due to non-positive cache size." + - log" Current value of '${MDC(LogKeys.CONFIG, - Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is" + - log"${MDC(LogKeys.CACHE_SIZE, - SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") + logWarning(log"Session plan cache is disabled due to non-positive cache size." + + log" Current value of '${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is" + + log"${MDC(LogKeys.CACHE_SIZE, SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { Some( @@ -286,7 +283,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio } logInfo( log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + - log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}") + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}") closedTimeMs = Some(System.currentTimeMillis()) if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 877d9bb3747b..c6baad72ee18 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -131,11 +131,10 @@ private[sql] class SparkConnectListenerBusListener( .build()) } catch { case NonFatal(e) => - logError( - log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" + - log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " + - log"Removing SparkConnectListenerBusListener and terminating the long-running thread " + - log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}") + logError(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " + + log"Removing SparkConnectListenerBusListener and terminating the long-running thread " + + log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}") // This likely means that the client is not responsive even with retry, we should // remove this listener and cleanup resources. serverSideListenerHolder.cleanUp() From 5b67ffdcf528d595b5a36a54f883791297b8b5a8 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 10 Jul 2024 10:39:51 -0700 Subject: [PATCH 08/11] style fix --- .../org/apache/spark/sql/connect/service/SessionHolder.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 78b66dc5a2bc..7c86d43e735e 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -62,8 +62,9 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { logWarning(log"Session plan cache is disabled due to non-positive cache size." + - log" Current value of '${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is" + - log"${MDC(LogKeys.CACHE_SIZE, SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") + log" Current value of '${MDC(LogKeys.CONFIG, + Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC(LogKeys.CACHE_SIZE, + SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { Some( From 6f77d42696da862f53e0d96c800ce8a215205d3d Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 10 Jul 2024 15:51:22 -0700 Subject: [PATCH 09/11] reformat --- .../apache/spark/sql/connect/service/SessionHolder.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 7c86d43e735e..289e43323e57 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -62,9 +62,9 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { logWarning(log"Session plan cache is disabled due to non-positive cache size." + - log" Current value of '${MDC(LogKeys.CONFIG, - Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC(LogKeys.CACHE_SIZE, - SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") + log" Current value of '${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC( + LogKeys.CACHE_SIZE, + SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { Some( From 3cb85ba680dbbcf7bbb65a35fb7a460943ad031b Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Thu, 11 Jul 2024 08:46:12 -0700 Subject: [PATCH 10/11] style fix --- .../org/apache/spark/sql/connect/service/SessionHolder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 289e43323e57..e3cb712d253e 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -62,8 +62,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { logWarning(log"Session plan cache is disabled due to non-positive cache size." + - log" Current value of '${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC( - LogKeys.CACHE_SIZE, + log" Current value of '${MDC(LogKeys.CONFIG, + Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC(LogKeys.CACHE_SIZE, SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { From 215d149de7143bb4b1d30c30428840bdd5548c22 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Thu, 11 Jul 2024 10:44:26 -0700 Subject: [PATCH 11/11] scalafmt --- .../spark/sql/connect/service/SessionHolder.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index e3cb712d253e..fbae94afc43d 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -61,10 +61,12 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio // memorizing `LogicalPlan`s which may be a sub-tree in a subsequent plan. private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { - logWarning(log"Session plan cache is disabled due to non-positive cache size." + - log" Current value of '${MDC(LogKeys.CONFIG, - Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC(LogKeys.CACHE_SIZE, - SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") + logWarning( + log"Session plan cache is disabled due to non-positive cache size." + + log" Current value of " + + log"'${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC( + LogKeys.CACHE_SIZE, + SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { Some(