Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
more structured log migration
  • Loading branch information
asl3 committed Jul 9, 2024
commit 7c73c4b5b3d0053d762b0641969e8d080ffbe74e
10 changes: 10 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private[spark] object LogKeys {
case object BLOCK_MANAGER_IDS extends LogKey
case object BLOCK_TYPE extends LogKey
case object BOOT extends LogKey
case object BOOT_TIME extends LogKey
case object BOOTSTRAP_TIME extends LogKey
case object BROADCAST extends LogKey
case object BROADCAST_ID extends LogKey
Expand All @@ -110,6 +111,8 @@ private[spark] object LogKeys {
case object BYTE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
case object CACHE_SIZE extends LogKey
case object CACHE_SIZE_KEY extends LogKey
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
case object CALL_SITE_LONG_FORM extends LogKey
Expand Down Expand Up @@ -282,6 +285,7 @@ private[spark] object LogKeys {
case object FINAL_CONTEXT extends LogKey
case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TIME extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
Expand Down Expand Up @@ -320,10 +324,12 @@ private[spark] object LogKeys {
case object INITIAL_CAPACITY extends LogKey
case object INITIAL_HEARTBEAT_INTERVAL extends LogKey
case object INIT_MODE extends LogKey
case object INIT_TIME extends LogKey
case object INPUT extends LogKey
case object INPUT_SPLIT extends LogKey
case object INTEGRAL extends LogKey
case object INTERVAL extends LogKey
case object INVALID_PARAMS extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object ISSUE_DATE extends LogKey
case object IS_NETWORK_REQUEST_DONE extends LogKey
Expand Down Expand Up @@ -369,6 +375,7 @@ private[spark] object LogKeys {
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOSSES extends LogKey
case object LOWER_BOUND extends LogKey
case object MALFORMATTED_STRING extends LogKey
case object MAP_ID extends LogKey
Expand Down Expand Up @@ -566,6 +573,7 @@ private[spark] object LogKeys {
case object OS_NAME extends LogKey
case object OS_VERSION extends LogKey
case object OUTPUT extends LogKey
case object OUTPUT_BUFFER extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PAGE_SIZE extends LogKey
case object PARENT_STAGES extends LogKey
Expand Down Expand Up @@ -611,8 +619,10 @@ private[spark] object LogKeys {
case object PUSHED_FILTERS extends LogKey
case object PUSH_MERGED_LOCAL_BLOCKS_SIZE extends LogKey
case object PVC_METADATA_NAME extends LogKey
case object PYTHON_DAEMON_MODULE extends LogKey
case object PYTHON_EXEC extends LogKey
case object PYTHON_PACKAGES extends LogKey
case object PYTHON_USE_DAEMON extends LogKey
case object PYTHON_VERSION extends LogKey
case object PYTHON_WORKER_MODULE extends LogKey
case object PYTHON_WORKER_RESPONSE extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,9 @@ private[spark] object MavenUtils extends Logging {
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq
if (invalidParams.nonEmpty) {
logWarning(
s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " +
s"in Ivy URI query `$uriQuery`.")
log"Invalid parameters `${MDC(LogKeys.INVALID_PARAMS,
invalidParams.sorted.mkString(","))}` " +
log"found in Ivy URI query `${MDC(LogKeys.URI, uriQuery)}`.")
}

(transitive, exclusionList, repos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.spark.{SparkEnv, SparkException, SparkSQLException}
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.connect.proto
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -63,9 +63,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = {
if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) {
logWarning(
s"Session plan cache is disabled due to non-positive cache size." +
s" Current value of '${Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" +
s" ${SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE)}.")
log"Session plan cache is disabled due to non-positive cache size." +
log" Current value of '${MDC(LogKeys.CACHE_SIZE_KEY,
Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is" +
log"${MDC(LogKeys.CACHE_SIZE,
SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}")
None
} else {
Some(
Expand Down Expand Up @@ -248,15 +250,15 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
private[connect] def updateAccessTime(): Unit = {
lastAccessTimeMs = System.currentTimeMillis()
logInfo(
log"Session ${MDC(SESSION_KEY, key)} accessed, " +
log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.")
log"Session ${MDC(LogKeys.SESSION_KEY, key)} accessed, " +
log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.")
}

private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = {
customInactiveTimeoutMs = newInactiveTimeoutMs
logInfo(
log"Session ${MDC(SESSION_KEY, key)} " +
log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.")
log"Session ${MDC(LogKeys.SESSION_KEY, key)} " +
log"inactive timeout set to ${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms.")
}

/**
Expand All @@ -282,8 +284,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
throw new IllegalStateException(s"Session $key is already closed.")
}
logInfo(
log"Closing session with userId: ${MDC(USER_ID, userId)} and " +
log"sessionId: ${MDC(SESSION_ID, sessionId)}")
log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}")
closedTimeMs = Some(System.currentTimeMillis())

if (Utils.isTesting && eventManager.status == SessionStatus.Pending) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
package org.apache.spark.sql.connect.service

import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, CountDownLatch}

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import io.grpc.stub.StreamObserver

import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryEventType
import org.apache.spark.connect.proto.StreamingQueryListenerEvent
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LogKeys, Logging, MDC}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -132,9 +129,10 @@ private[sql] class SparkConnectListenerBusListener(
} catch {
case NonFatal(e) =>
logError(
s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " +
s"Removing SparkConnectListenerBusListener and terminating the long-running thread " +
s"because of exception: $e")
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " +
log"Removing SparkConnectListenerBusListener and terminating the long-running thread " +
log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}")
// This likely means that the client is not responsive even with retry, we should
// remove this listener and cleanup resources.
serverSideListenerHolder.cleanUp()
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.Base64

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.JOB_ID
import org.apache.spark.internal.{LogKeys, Logging, MDC}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -122,7 +121,7 @@ private[spark] class SecurityManager(
*/
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
logInfo("Changing view acls to: " + viewAcls.mkString(","))
logInfo(log"Changing view acls to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}")
}

def setViewAcls(defaultUser: String, allowedUsers: Seq[String]): Unit = {
Expand All @@ -135,7 +134,7 @@ private[spark] class SecurityManager(
*/
def setViewAclsGroups(allowedUserGroups: Seq[String]): Unit = {
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
logInfo(log"Changing view acls groups to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}")
}

/**
Expand Down Expand Up @@ -163,7 +162,7 @@ private[spark] class SecurityManager(
*/
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
logInfo(log"Changing modify acls to: ${MDC(LogKeys.MODIFY_ACLS, modifyAcls.mkString(","))}")
}

/**
Expand All @@ -172,7 +171,8 @@ private[spark] class SecurityManager(
*/
def setModifyAclsGroups(allowedUserGroups: Seq[String]): Unit = {
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
logInfo(log"Changing modify acls groups to: ${MDC(LogKeys.MODIFY_ACLS,
modifyAcls.mkString(","))}")
}

/**
Expand Down Expand Up @@ -200,7 +200,7 @@ private[spark] class SecurityManager(
*/
def setAdminAcls(adminUsers: Seq[String]): Unit = {
adminAcls = adminUsers.toSet
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
logInfo(log"Changing admin acls to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}")
}

/**
Expand All @@ -209,7 +209,7 @@ private[spark] class SecurityManager(
*/
def setAdminAclsGroups(adminUserGroups: Seq[String]): Unit = {
adminAclsGroups = adminUserGroups.toSet
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
logInfo(log"Changing admin acls groups to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}")
}

def setAcls(aclSetting: Boolean): Unit = {
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.TASK_NAME
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
import org.apache.spark.internal.config.Python._
Expand Down Expand Up @@ -131,19 +131,23 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val daemonModule =
conf.get(PYTHON_DAEMON_MODULE).map { value =>
logInfo(
s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.")
log"Python daemon module in PySpark is set to " +
log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.PYTHON_DAEMON_MODULE,
PYTHON_DAEMON_MODULE.key)}', using this to start the daemon up. Note that this " +
log"configuration only has an effect when '${MDC(LogKeys.PYTHON_USE_DAEMON,
PYTHON_USE_DAEMON.key)}' is enabled and the platform is not Windows.")
value
}.getOrElse("pyspark.daemon")

// This configuration indicates the module to run each Python worker.
private val workerModule =
conf.get(PYTHON_WORKER_MODULE).map { value =>
logInfo(
s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " +
"using this to start the worker up. Note that this configuration only has an effect when " +
s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.")
log"Python worker module in PySpark is set to ${MDC(LogKeys.VALUE, value)} " +
log"in ${MDC(LogKeys.PYTHON_WORKER_MODULE, PYTHON_WORKER_MODULE.key)}, " +
log"using this to start the worker up. Note that this configuration only has " +
log"an effect when ${MDC(LogKeys.PYTHON_USE_DAEMON, PYTHON_USE_DAEMON.key)} " +
log"is disabled or the platform is Windows.")
value
}.getOrElse("pyspark.worker")

Expand Down Expand Up @@ -509,8 +513,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val init = initTime - bootTime
val finish = finishTime - initTime
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
init, finish))
logInfo(log"Times: total = ${MDC(LogKeys.TOTAL_TIME, total)}, " +
log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " +
log"init = ${MDC(LogKeys.INIT_TIME, init)}, " +
log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}")
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ private[spark] class SparkSubmit extends Logging {

/** Print version information to the log. */
private def printVersion(): Unit = {
logInfo("""Welcome to
logInfo(log"""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/___/ .__/\_,_/_/ /_/\_\ version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)}
/_/
""".format(SPARK_VERSION))
""")
logInfo(log"Using Scala ${MDC(LogKeys.SCALA_VERSION, Properties.versionString)}," +
log" ${MDC(LogKeys.JAVA_VM_NAME, Properties.javaVmName)}," +
log" ${MDC(LogKeys.JAVA_VERSION, Properties.javaVersion)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED}
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
Expand Down Expand Up @@ -998,11 +998,13 @@ private[spark] class DAGScheduler(
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " +
log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e9)} s")
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " +
log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e9)} s")
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
if (retval) {
logInfo("path = " + file + ", already present as root for deletion.")
logInfo(log"path = ${MDC(LogKeys.FILE_NAME, file)}, already present as root for deletion.")
}
retval
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.util.collection

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LogKeys, Logging, MDC}
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}

Expand Down Expand Up @@ -143,8 +143,11 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
*/
@inline private def logSpillage(size: Long): Unit = {
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
_spillCount, if (_spillCount > 1) "s" else ""))
logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " +
log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE,
org.apache.spark.util.Utils.bytesToString(size))} to disk " +
log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)")

${MDC(LogKeys.AUTH_ENABLED, aclsOn)}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.reflect.ClassTag

import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaSparkContext._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{CLUSTER_CENTROIDS, CLUSTER_LABEL, CLUSTER_WEIGHT, LARGEST_CLUSTER_INDEX, SMALLEST_CLUSTER_INDEX}
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -222,7 +222,7 @@ class StreamingKMeans @Since("1.2.0") (
throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit)
}
this.decayFactor = math.exp(math.log(0.5) / halfLife)
logInfo("Setting decay factor to: %g ".format (this.decayFactor))
logInfo(log"Setting decay factor to: ${MDC(LogKeys.VALUE, this.decayFactor)}")
this.timeUnit = timeUnit
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ object GradientDescent extends Logging {
i += 1
}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
logInfo(log"GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses " +
log"${MDC(LogKeys.LOSSES, stochasticLossHistory.takeRight(10).mkString(", "))}")

(weights, stochasticLossHistory.toArray)
}
Expand Down
Loading