Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private[spark] object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
case object ARTIFACTS extends LogKey
case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
Expand Down Expand Up @@ -282,6 +283,7 @@ private[spark] object LogKeys {
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
case object FROM_TIME extends LogKey
case object FS_DATA_OUTPUT_STREAM extends LogKey
case object FUNCTION_NAME extends LogKey
case object FUNCTION_PARAM extends LogKey
case object GLOBAL_INIT_FILE extends LogKey
Expand All @@ -299,9 +301,8 @@ private[spark] object LogKeys {
case object HIVE_OPERATION_STATE extends LogKey
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
case object HOSTS extends LogKey
case object HOST_LOCAL_BLOCKS_SIZE extends LogKey
case object HOST_NAME extends LogKey
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify HOST_NAME to HOST and remove HOST_NAME

case object HOST_NAMES extends LogKey
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify HOST_NAMES to HOSTS and remove HOST_NAMES

case object HOST_PORT extends LogKey
case object HOST_PORT2 extends LogKey
case object HUGE_METHOD_LIMIT extends LogKey
Expand Down Expand Up @@ -337,6 +338,7 @@ private[spark] object LogKeys {
case object KEY2 extends LogKey
case object KEYTAB extends LogKey
case object KEYTAB_FILE extends LogKey
case object KILL_EXECUTORS extends LogKey
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
Expand All @@ -357,10 +359,10 @@ private[spark] object LogKeys {
case object LOCAL_BLOCKS_SIZE extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
case object LOGICAL_PLAN extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
Expand All @@ -385,6 +387,7 @@ private[spark] object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
case object MAX_SPLIT_BYTES extends LogKey
Expand All @@ -395,6 +398,7 @@ private[spark] object LogKeys {
case object MEMORY_THRESHOLD_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
case object MESSAGE extends LogKey
case object METADATA extends LogKey
case object METADATA_DIRECTORY extends LogKey
case object METADATA_JSON extends LogKey
case object META_FILE extends LogKey
Expand Down Expand Up @@ -541,7 +545,7 @@ private[spark] object LogKeys {
case object OLD_VALUE extends LogKey
case object OPEN_COST_IN_BYTES extends LogKey
case object OPERATION_HANDLE extends LogKey
case object OPERATION_HANDLE_IDENTIFIER extends LogKey
case object OPERATION_ID extends LogKey
case object OPTIMIZED_PLAN_COLUMNS extends LogKey
case object OPTIMIZER_CLASS_NAME extends LogKey
case object OPTIONS extends LogKey
Expand Down Expand Up @@ -583,6 +587,7 @@ private[spark] object LogKeys {
case object POST_SCAN_FILTERS extends LogKey
case object PREDICATE extends LogKey
case object PREDICATES extends LogKey
case object PREFERRED_SERVICE_NAME extends LogKey
case object PREFIX extends LogKey
case object PRETTY_ID_STRING extends LogKey
case object PRINCIPAL extends LogKey
Expand Down Expand Up @@ -613,6 +618,7 @@ private[spark] object LogKeys {
case object RANGE extends LogKey
case object RATE_LIMIT extends LogKey
case object RATIO extends LogKey
case object RDD extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
Expand Down Expand Up @@ -641,6 +647,7 @@ private[spark] object LogKeys {
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
case object REQUEST_EXECUTORS extends LogKey
case object REQUEST_ID extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
Expand Down Expand Up @@ -679,6 +686,7 @@ private[spark] object LogKeys {
case object SESSION_KEY extends LogKey
case object SET_CLIENT_INFO_REQUEST extends LogKey
case object SHARD_ID extends LogKey
case object SHORTER_SERVICE_NAME extends LogKey
case object SHORT_USER_NAME extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
Expand Down Expand Up @@ -756,7 +764,6 @@ private[spark] object LogKeys {
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_INDEX extends LogKey
case object TASK_INFO_ID extends LogKey
case object TASK_LOCALITY extends LogKey
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
Expand Down Expand Up @@ -835,6 +842,7 @@ private[spark] object LogKeys {
case object WORKER_PORT extends LogKey
case object WORKER_URL extends LogKey
case object WRITE_AHEAD_LOG_INFO extends LogKey
case object WRITE_AHEAD_LOG_RECORD_HANDLE extends LogKey
case object WRITE_JOB_UUID extends LogKey
case object XML_SCHEDULING_MODE extends LogKey
case object XSD_PATH extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.ArrayImplicits._

/** Provides utility functions to be used inside SparkSubmit. */
Expand Down Expand Up @@ -215,7 +215,7 @@ private[spark] object MavenUtils extends Logging {
if (artifactInfo.getExt == "jar") {
true
} else {
logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
logInfo(log"Skipping non-jar dependency ${MDC(LogKeys.ARTIFACT_ID, artifactInfo.getId)}")
false
}
}
Expand Down Expand Up @@ -515,8 +515,9 @@ private[spark] object MavenUtils extends Logging {
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
s"attempt to retry while skipping local-m2-cache.")
logInfo(log"Download failed: " +
log"${MDC(LogKeys.ARTIFACTS, failedArtifacts.mkString("[", ", ", "]"))}, " +
log"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}

private[spark] trait SparkErrorUtils extends Logging {
/**
Expand Down Expand Up @@ -74,7 +74,8 @@ private[spark] trait SparkErrorUtils extends Logging {
} catch {
case t: Throwable if (originalThrowable != null && originalThrowable != t) =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
logWarning(
log"Suppressing exception in finally: ${MDC(LogKeys.MESSAGE, t.getMessage)}", t)
throw originalThrowable
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.File
import java.net.{URI, URISyntaxException}
import java.nio.file.Files

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.network.util.JavaUtils

private[spark] trait SparkFileUtils extends Logging {
Expand Down Expand Up @@ -77,12 +77,12 @@ private[spark] trait SparkFileUtils extends Logging {
// remove the check when we're sure that Files.createDirectories() would never fail silently.
Files.createDirectories(dir.toPath)
if ( !dir.exists() || !dir.isDirectory) {
logError(s"Failed to create directory " + dir)
logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}")
}
dir.isDirectory
} catch {
case e: Exception =>
logError(s"Failed to create directory " + dir, e)
logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}", e)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,11 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
case _ =>
logWarning(
log"Unknown StreamingQueryListener event: " +
log"${MDC(LogKeys.EVENT, event)}")
logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make our code format look pretty

}
} catch {
case e: Exception =>
logWarning(s"Listener $listener threw an exception", e)
logWarning(log"Listener ${MDC(LogKeys.LISTENER, listener)} threw an exception", e)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
listenerHolder.isServerSideListenerRegistered match {
case true =>
logWarning(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Redundant server side listener added. Exiting.")
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Redundant server side listener added. Exiting.")
return
case false =>
// This transfers sending back the response to the client until
Expand All @@ -85,37 +87,35 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
logError(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
return
}
}
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until all client side listeners are " +
log"removed or there is error transmitting the event back.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until " +
log"all client side listeners are removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
sessionHolder.streamingServersideListenerHolder.cleanUp()
case false =>
logWarning(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] No active server side listener bus listener " +
s"but received remove listener call. Exiting.")
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"No active server side listener bus listener but received remove listener call. " +
log"Exiting.")
return
}
case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[kinesis] class KinesisCheckpointer(
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
logWarning(log"Failed to checkpoint shardId ${MDC(SHARD_ID, shardId)} to DynamoDB.", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,8 +1390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " +
"trying again...")
logWarning(log"Exception occurred while rebuilding log path " +
log"${MDC(LogKeys.PATH, attempt.logPath)} - trying again...")
retried = true

case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, Maste
import org.apache.spark.deploy.Utils.addRenderLogHandler
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{HOST_NAMES, NUM_REMOVED_WORKERS}
import org.apache.spark.internal.LogKeys.{HOSTS, NUM_REMOVED_WORKERS}
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
Expand Down Expand Up @@ -79,7 +79,7 @@ class MasterWebUI(
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
DecommissionWorkersOnHosts(hostnames))
logInfo(log"Decommissioning of hosts ${MDC(HOST_NAMES, hostnames)}" +
logInfo(log"Decommissioning of hosts ${MDC(HOSTS, hostnames)}" +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify it as HOSTS, there is a detailed explanation below.

log" decommissioned ${MDC(NUM_REMOVED_WORKERS, removedWorkers)} workers")
if (removedWorkers > 0) {
resp.setStatus(HttpServletResponse.SC_OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] class Executor(
extends Logging {

logInfo(log"Starting executor ID ${LogMDC(LogKeys.EXECUTOR_ID, executorId)}" +
log" on host ${LogMDC(HOST_NAME, executorHostname)}")
log" on host ${LogMDC(HOST, executorHostname)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's unify it as HOST, the general rule is
HOST: Only host name, not including port, eg: host_name
Port: Just the port, eg: 10240
HOST_PORT: includes both host name and port, eg: host_name:10240

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log"on host ${MDC(LogKeys.HOST, executorHostname)} for " +

logInfo(log"OS info ${LogMDC(OS_NAME, System.getProperty("os.name"))}," +
log" ${LogMDC(OS_VERSION, System.getProperty("os.version"))}, " +
log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
logWarning(s"Error cancelling write to $path (stream: $fsDataOutputStream)", e)
logWarning(log"Error cancelling write to ${MDC(LogKeys.PATH, path)} " +
log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, fsDataOutputStream)})", e)
} finally {
terminated = true
}
Expand All @@ -71,7 +72,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
logWarning(log"Error closing ${MDC(LogKeys.PATH, path)} " +
log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, fsDataOutputStream)})", e)
} finally {
terminated = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -107,9 +106,11 @@ class KubernetesDriverConf(
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
logWarning(log"Driver's hostname would preferably be " +
log"${MDC(LogKeys.PREFERRED_SERVICE_NAME, preferredServiceName)}, but this is too long " +
log"(must be <= ${MDC(LogKeys.MAX_SERVICE_NAME_LENGTH, MAX_SERVICE_NAME_LENGTH)} " +
log"characters). Falling back to use " +
log"${MDC(LogKeys.SHORTER_SERVICE_NAME, shorterServiceName)} as the driver service's name.")
shorterServiceName
}
}
Expand Down Expand Up @@ -242,10 +243,10 @@ private[spark] class KubernetesExecutorConf(
if (executorEnvRegex.pattern.matcher(key).matches()) {
true
} else {
logWarning(log"Invalid key: ${MDC(CONFIG, key)}, " +
logWarning(log"Invalid key: ${MDC(LogKeys.CONFIG, key)}, " +
log"a valid environment variable name must consist of alphabetic characters, " +
log"digits, '_', '-', or '.', and must not start with a digit. " +
log"Regex used for validation is '${MDC(EXECUTOR_ENV_REGEX, executorEnvRegex)}'")
log"Regex used for validation is '${MDC(LogKeys.EXECUTOR_ENV_REGEX, executorEnvRegex)}'")
false
}
}
Expand Down
Loading