diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java index 83be2db5d0b7..31c454f63a92 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java @@ -183,12 +183,12 @@ private void transferAllOutstanding() { if (numRetries > 0) { logger.error("Exception while beginning {} of {} outstanding blocks (after {} retries)", e, MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), - MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length), + MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, blockIdsToTransfer.length), MDC.of(LogKeys.NUM_RETRY$.MODULE$, numRetries)); } else { logger.error("Exception while beginning {} of {} outstanding blocks", e, MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), - MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length)); + MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, blockIdsToTransfer.length)); } if (shouldRetry(e) && initiateRetry(e)) { // successfully initiated a retry @@ -219,7 +219,7 @@ synchronized boolean initiateRetry(Throwable e) { MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), MDC.of(LogKeys.NUM_RETRY$.MODULE$, retryCount), MDC.of(LogKeys.MAX_ATTEMPTS$.MODULE$, maxRetries), - MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, outstandingBlocksIds.size()), + MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, outstandingBlocksIds.size()), MDC.of(LogKeys.RETRY_WAIT_TIME$.MODULE$, retryWaitTime)); try { diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 99fc58b03503..534f00911922 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -72,6 +72,7 @@ object LogKeys { case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey case object CALL_SITE_SHORT_FORM extends LogKey + case object CANCEL_FUTURE_JOBS extends LogKey case object CATALOG_NAME extends LogKey case object CATEGORICAL_FEATURES extends LogKey case object CHECKPOINT_FILE extends LogKey @@ -118,10 +119,10 @@ object LogKeys { case object CONTAINER_ID extends LogKey case object CONTAINER_STATE extends LogKey case object CONTEXT extends LogKey - case object CONTEXT_CREATION_SITE extends LogKey case object COST extends LogKey case object COUNT extends LogKey case object CREATED_POOL_NAME extends LogKey + case object CREATION_SITE extends LogKey case object CREDENTIALS_RENEWAL_INTERVAL_RATIO extends LogKey case object CROSS_VALIDATION_METRIC extends LogKey case object CROSS_VALIDATION_METRICS extends LogKey @@ -132,8 +133,9 @@ object LogKeys { case object CSV_SCHEMA_FIELD_NAMES extends LogKey case object CSV_SOURCE extends LogKey case object CURRENT_BATCH_ID extends LogKey + case object CURRENT_DISK_SIZE extends LogKey case object CURRENT_FILE extends LogKey - case object CURRENT_MEMORY_BYTES extends LogKey + case object CURRENT_MEMORY_SIZE extends LogKey case object CURRENT_PATH extends LogKey case object CURRENT_TIME extends LogKey case object DATA extends LogKey @@ -146,7 +148,6 @@ object LogKeys { case object DEFAULT_COMPACT_INTERVAL extends LogKey case object DEFAULT_ISOLATION_LEVEL extends LogKey case object DEFAULT_NAME extends LogKey - case object DEFAULT_SCHEDULING_MODE extends LogKey case object DEFAULT_VALUE extends LogKey case object DELAY extends LogKey case object DELEGATE extends LogKey @@ -207,6 +208,8 @@ object LogKeys { case object EXPR extends LogKey case object EXPR_TERMS extends LogKey case object EXTENDED_EXPLAIN_GENERATOR extends LogKey + case object FAILED_STAGE extends LogKey + case object FAILED_STAGE_NAME extends LogKey case object FAILURES extends LogKey case object FALLBACK_VERSION extends LogKey case object FEATURE_COLUMN extends LogKey @@ -231,6 +234,7 @@ object LogKeys { case object FINAL_OUTPUT_PATH extends LogKey case object FINAL_PATH extends LogKey case object FINISH_TRIGGER_DURATION extends LogKey + case object FREE_MEMORY_SIZE extends LogKey case object FROM_OFFSET extends LogKey case object FROM_TIME extends LogKey case object FUNCTION_NAME extends LogKey @@ -250,6 +254,7 @@ object LogKeys { case object HIVE_OPERATION_STATE extends LogKey case object HIVE_OPERATION_TYPE extends LogKey case object HOST extends LogKey + case object HOST_LOCAL_BLOCKS_SIZE extends LogKey case object HOST_NAMES extends LogKey case object HOST_PORT extends LogKey case object HOST_PORT2 extends LogKey @@ -265,6 +270,7 @@ object LogKeys { case object INITIAL_HEARTBEAT_INTERVAL extends LogKey case object INIT_MODE extends LogKey case object INPUT extends LogKey + case object INPUT_SPLIT extends LogKey case object INTERVAL extends LogKey case object ISOLATION_LEVEL extends LogKey case object ISSUE_DATE extends LogKey @@ -299,11 +305,14 @@ object LogKeys { case object LOAD_FACTOR extends LogKey case object LOAD_TIME extends LogKey case object LOCALE extends LogKey + case object LOCAL_BLOCKS_SIZE extends LogKey case object LOCAL_SCRATCH_DIR extends LogKey case object LOCATION extends LogKey case object LOGICAL_PLAN_COLUMNS extends LogKey case object LOGICAL_PLAN_LEAVES extends LogKey case object LOG_ID extends LogKey + case object LOG_KEY_FILE extends LogKey + case object LOG_LEVEL extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey case object LOWER_BOUND extends LogKey @@ -351,6 +360,7 @@ object LogKeys { case object MIN_SIZE extends LogKey case object MIN_TIME extends LogKey case object MIN_VERSION_NUM extends LogKey + case object MISSING_PARENT_STAGES extends LogKey case object MODEL_WEIGHTS extends LogKey case object MODULE_NAME extends LogKey case object NAMESPACE extends LogKey @@ -368,8 +378,9 @@ object LogKeys { case object NORM extends LogKey case object NUM_ADDED_PARTITIONS extends LogKey case object NUM_APPS extends LogKey + case object NUM_ATTEMPT extends LogKey case object NUM_BIN extends LogKey - case object NUM_BLOCK_IDS extends LogKey + case object NUM_BLOCKS extends LogKey case object NUM_BROADCAST_BLOCK extends LogKey case object NUM_BYTES extends LogKey case object NUM_BYTES_CURRENT extends LogKey @@ -388,12 +399,16 @@ object LogKeys { case object NUM_CORES extends LogKey case object NUM_DATA_FILE extends LogKey case object NUM_DATA_FILES extends LogKey + case object NUM_DECOMMISSIONED extends LogKey case object NUM_DRIVERS extends LogKey case object NUM_DROPPED_PARTITIONS extends LogKey case object NUM_EFFECTIVE_RULE_OF_RUNS extends LogKey case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey case object NUM_EVENTS extends LogKey case object NUM_EXAMPLES extends LogKey + case object NUM_EXECUTORS extends LogKey + case object NUM_EXECUTORS_EXITED extends LogKey + case object NUM_EXECUTORS_KILLED extends LogKey case object NUM_EXECUTOR_CORES extends LogKey case object NUM_EXECUTOR_CORES_REMAINING extends LogKey case object NUM_EXECUTOR_CORES_TOTAL extends LogKey @@ -407,6 +422,7 @@ object LogKeys { case object NUM_FILES_FAILED_TO_DELETE extends LogKey case object NUM_FILES_REUSED extends LogKey case object NUM_FREQUENT_ITEMS extends LogKey + case object NUM_HOST_LOCAL_BLOCKS extends LogKey case object NUM_INDEX_FILE extends LogKey case object NUM_INDEX_FILES extends LogKey case object NUM_ITERATIONS extends LogKey @@ -415,8 +431,10 @@ object LogKeys { case object NUM_LEADING_SINGULAR_VALUES extends LogKey case object NUM_LEFT_PARTITION_VALUES extends LogKey case object NUM_LOADED_ENTRIES extends LogKey + case object NUM_LOCAL_BLOCKS extends LogKey case object NUM_LOCAL_DIRS extends LogKey case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey + case object NUM_MERGERS extends LogKey case object NUM_MERGER_LOCATIONS extends LogKey case object NUM_META_FILES extends LogKey case object NUM_NODES extends LogKey @@ -433,7 +451,10 @@ object LogKeys { case object NUM_POINT extends LogKey case object NUM_PREFIXES extends LogKey case object NUM_PRUNED extends LogKey + case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey case object NUM_RECORDS_READ extends LogKey + case object NUM_REMAINED extends LogKey + case object NUM_REMOTE_BLOCKS extends LogKey case object NUM_REMOVED_WORKERS extends LogKey case object NUM_REPLICAS extends LogKey case object NUM_REQUESTS extends LogKey @@ -449,11 +470,14 @@ object LogKeys { case object NUM_SPILL_INFOS extends LogKey case object NUM_SPILL_WRITERS extends LogKey case object NUM_SUB_DIRS extends LogKey + case object NUM_SUCCESSFUL_TASKS extends LogKey case object NUM_TASKS extends LogKey case object NUM_TASK_CPUS extends LogKey case object NUM_TRAIN_WORD extends LogKey + case object NUM_UNFINISHED_DECOMMISSIONED extends LogKey case object NUM_VERSIONS_RETAIN extends LogKey case object NUM_WEIGHTED_EXAMPLES extends LogKey + case object NUM_WORKERS extends LogKey case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey case object OBJECT_ID extends LogKey case object OFFSET extends LogKey @@ -470,16 +494,20 @@ object LogKeys { case object OPTIONS extends LogKey case object OP_ID extends LogKey case object OP_TYPE extends LogKey + case object ORIGINAL_DISK_SIZE extends LogKey + case object ORIGINAL_MEMORY_SIZE extends LogKey case object OS_ARCH extends LogKey case object OS_NAME extends LogKey case object OS_VERSION extends LogKey case object OUTPUT extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PAGE_SIZE extends LogKey + case object PARENT_STAGES extends LogKey case object PARSE_MODE extends LogKey case object PARTITIONED_FILE_READER extends LogKey case object PARTITIONER extends LogKey case object PARTITION_ID extends LogKey + case object PARTITION_IDS extends LogKey case object PARTITION_SPECIFICATION extends LogKey case object PARTITION_SPECS extends LogKey case object PATH extends LogKey @@ -511,12 +539,14 @@ object LogKeys { case object PROTOCOL_VERSION extends LogKey case object PROVIDER extends LogKey case object PUSHED_FILTERS extends LogKey + case object PUSH_MERGED_LOCAL_BLOCKS_SIZE extends LogKey case object PVC_METADATA_NAME extends LogKey case object PYTHON_EXEC extends LogKey case object PYTHON_PACKAGES extends LogKey case object PYTHON_VERSION extends LogKey case object PYTHON_WORKER_MODULE extends LogKey case object PYTHON_WORKER_RESPONSE extends LogKey + case object QUANTILES extends LogKey case object QUERY_CACHE_VALUE extends LogKey case object QUERY_HINT extends LogKey case object QUERY_ID extends LogKey @@ -542,11 +572,13 @@ object LogKeys { case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey case object REGISTERED_EXECUTOR_FILE extends LogKey + case object REGISTER_MERGE_RESULTS extends LogKey case object RELATION_NAME extends LogKey case object RELATION_OUTPUT extends LogKey case object RELATIVE_TOLERANCE extends LogKey case object REMAINING_PARTITIONS extends LogKey case object REMOTE_ADDRESS extends LogKey + case object REMOTE_BLOCKS_SIZE extends LogKey case object REMOVE_FROM_MASTER extends LogKey case object REPORT_DETAILS extends LogKey case object REQUESTER_SIZE extends LogKey @@ -574,6 +606,7 @@ object LogKeys { case object RUN_ID extends LogKey case object SCALA_VERSION extends LogKey case object SCHEDULER_POOL_NAME extends LogKey + case object SCHEDULING_MODE extends LogKey case object SCHEMA extends LogKey case object SCHEMA2 extends LogKey case object SERVER_NAME extends LogKey @@ -615,13 +648,17 @@ object LogKeys { case object SPILL_TIMES extends LogKey case object SQL_TEXT extends LogKey case object SRC_PATH extends LogKey + case object STAGE extends LogKey + case object STAGES extends LogKey case object STAGE_ATTEMPT extends LogKey case object STAGE_ID extends LogKey + case object STAGE_NAME extends LogKey case object START_INDEX extends LogKey case object STATEMENT_ID extends LogKey case object STATE_STORE_ID extends LogKey case object STATE_STORE_PROVIDER extends LogKey case object STATE_STORE_VERSION extends LogKey + case object STATS extends LogKey case object STATUS extends LogKey case object STDERR extends LogKey case object STOP_SITE_SHORT_FORM extends LogKey @@ -647,13 +684,18 @@ object LogKeys { case object TABLE_NAME extends LogKey case object TABLE_TYPE extends LogKey case object TABLE_TYPES extends LogKey + case object TAG extends LogKey case object TARGET_NUM_EXECUTOR extends LogKey case object TARGET_NUM_EXECUTOR_DELTA extends LogKey case object TARGET_PATH extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey + case object TASK_LOCALITY extends LogKey case object TASK_NAME extends LogKey case object TASK_REQUIREMENTS extends LogKey + case object TASK_RESOURCE_ASSIGNMENTS extends LogKey + case object TASK_SET_ID extends LogKey + case object TASK_SET_MANAGER extends LogKey case object TASK_SET_NAME extends LogKey case object TASK_STATE extends LogKey case object TEMP_FILE extends LogKey @@ -685,6 +727,7 @@ object LogKeys { case object TOPIC_PARTITION_OFFSET_RANGE extends LogKey case object TOTAL extends LogKey case object TOTAL_EFFECTIVE_TIME extends LogKey + case object TOTAL_SIZE extends LogKey case object TOTAL_TIME extends LogKey case object TOTAL_TIME_READ extends LogKey case object TO_TIME extends LogKey diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6018c87b0122..c70576b8adc1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2930,7 +2930,7 @@ object SparkContext extends Logging { log" constructor). This may indicate an error, since only one SparkContext should be" + log" running in this JVM (see SPARK-2243)." + log" The other SparkContext was created at:\n" + - log"${MDC(LogKeys.CONTEXT_CREATION_SITE, otherContextCreationSite)}" + log"${MDC(LogKeys.CREATION_SITE, otherContextCreationSite)}" logWarning(warnMsg) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 9c57269b28f4..263b1a233b80 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.Utils @@ -51,7 +51,8 @@ class LocalSparkCluster private ( private val workerDirs = ArrayBuffer[String]() def start(): Array[String] = { - logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") + logInfo(log"Starting a local Spark cluster with " + + log"${MDC(LogKeys.NUM_WORKERS, numWorkers)} workers.") // Disable REST server on Master in this mode unless otherwise specified val _conf = conf.clone() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 2edd80db2637..ca932ef5dc05 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -142,8 +142,9 @@ private[spark] class SparkHadoopUtil extends Logging { if (!new File(keytabFilename).exists()) { throw new SparkException(s"Keytab file: ${keytabFilename} does not exist") } else { - logInfo("Attempting to login to Kerberos " + - s"using principal: ${principalName} and keytab: ${keytabFilename}") + logInfo(log"Attempting to login to Kerberos using principal: " + + log"${MDC(LogKeys.PRINCIPAL, principalName)} and keytab: " + + log"${MDC(LogKeys.KEYTAB, keytabFilename)}") UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index ec231610b857..b34e5c408c3b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -105,7 +105,8 @@ private[spark] class StandaloneAppClient( if (registered.get) { return } - logInfo("Connecting to master " + masterAddress.toSparkURL + "...") + logInfo( + log"Connecting to master ${MDC(LogKeys.MASTER_URL, masterAddress.toSparkURL)}...") val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) masterRef.send(RegisterApplication(appDescription, self)) } catch { @@ -175,14 +176,16 @@ private[spark] class StandaloneAppClient( case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => val fullId = s"$appId/$id" - logInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort, - cores)) + logInfo(log"Executor added: ${MDC(LogKeys.EXECUTOR_ID, fullId)} on " + + log"${MDC(LogKeys.WORKER_ID, workerId)} (${MDC(LogKeys.HOST_PORT, hostPort)}) " + + log"with ${MDC(LogKeys.NUM_CORES, cores)} core(s)") listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus, workerHost) => val fullId = s"$appId/$id" val messageText = message.map(s => " (" + s + ")").getOrElse("") - logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) + logInfo(log"Executor updated: ${MDC(LogKeys.EXECUTOR_ID, fullId)} is now " + + log"${MDC(LogKeys.EXECUTOR_STATE, state)}${MDC(LogKeys.MESSAGE, messageText)}") if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost) } else if (state == ExecutorState.DECOMMISSIONED) { @@ -191,11 +194,13 @@ private[spark] class StandaloneAppClient( } case WorkerRemoved(id, host, message) => - logInfo("Master removed worker %s: %s".format(id, message)) + logInfo(log"Master removed worker ${MDC(LogKeys.WORKER_ID, id)}: " + + log"${MDC(LogKeys.MESSAGE, message)}") listener.workerRemoved(id, host, message) case MasterChanged(masterRef, masterWebUiUrl) => - logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) + logInfo(log"Master has changed, new master is at " + + log"${MDC(LogKeys.MASTER_URL, masterRef.address.toSparkURL)}") master = Some(masterRef) alreadyDisconnected = false masterRef.send(MasterChangeAcknowledged(appId.get)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala index 106acc9a7944..964b115865ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.Deploy.{RECOVERY_COMPRESSION_CODEC, RECOVERY_DIRECTORY} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer @@ -57,7 +57,7 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: val recoveryDir = conf.get(RECOVERY_DIRECTORY) def createPersistenceEngine(): PersistenceEngine = { - logInfo("Persisting recovery state to directory: " + recoveryDir) + logInfo(log"Persisting recovery state to directory: ${MDC(LogKeys.PATH, recoveryDir)}") val codec = conf.get(RECOVERY_COMPRESSION_CODEC).map(c => CompressionCodec.createCodec(conf, c)) new FileSystemPersistenceEngine(recoveryDir, serializer, codec) } @@ -76,7 +76,8 @@ private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Se def createPersistenceEngine(): PersistenceEngine = { val recoveryDir = conf.get(RECOVERY_DIRECTORY) - logInfo("Persisting recovery state to directory: " + recoveryDir) + logInfo(log"Persisting recovery state to directory: " + + log"${MDC(LogKeys.PATH, recoveryDir)}") new RocksDBPersistenceEngine(recoveryDir, serializer) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 247504f5ebbb..4fb95033cece 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -79,7 +79,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { * it to the user. Otherwise, report the error message provided by the server. */ def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to launch an application in $master.") + logInfo(log"Submitting a request to launch an application in ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -109,7 +109,9 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Request that the server kill the specified submission. */ def killSubmission(submissionId: String): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to kill submission $submissionId in $master.") + logInfo(log"Submitting a request to kill submission " + + log"${MDC(SUBMISSION_ID, submissionId)} in " + + log"${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -138,7 +140,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Request that the server kill all submissions. */ def killAllSubmissions(): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to kill all submissions in $master.") + logInfo(log"Submitting a request to kill all submissions in ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -167,7 +169,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Request that the server clears all submissions and applications. */ def clear(): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to clear $master.") + logInfo(log"Submitting a request to clear ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null for (m <- masters if !handled) { @@ -196,7 +198,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Check the readiness of Master. */ def readyz(): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request to check the status of $master.") + logInfo(log"Submitting a request to check the status of ${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = new ErrorResponse for (m <- masters if !handled) { @@ -227,7 +229,9 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { def requestSubmissionStatus( submissionId: String, quiet: Boolean = false): SubmitRestProtocolResponse = { - logInfo(s"Submitting a request for the status of submission $submissionId in $master.") + logInfo(log"Submitting a request for the status of submission " + + log"${MDC(SUBMISSION_ID, submissionId)} in " + + log"${MDC(MASTER_URL, master)}.") var handled: Boolean = false var response: SubmitRestProtocolResponse = null @@ -440,7 +444,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { if (submitResponse.success) { val submissionId = submitResponse.submissionId if (submissionId != null) { - logInfo(s"Submission successfully created as $submissionId. Polling submission state...") + logInfo(log"Submission successfully created as ${MDC(SUBMISSION_ID, submissionId)}. " + + log"Polling submission state...") pollSubmissionStatus(submissionId) } else { // should never happen @@ -470,13 +475,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { val exception = Option(statusResponse.message) // Log driver state, if present driverState match { - case Some(state) => logInfo(s"State of driver $submissionId is now $state.") + case Some(state) => + logInfo(log"State of driver ${MDC(SUBMISSION_ID, submissionId)} is now " + + log"${MDC(DRIVER_STATE, state)}.") case _ => logError(log"State of driver ${MDC(SUBMISSION_ID, submissionId)} was not found!") } // Log worker node, if present (workerId, workerHostPort) match { - case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.") + case (Some(id), Some(hp)) => + logInfo( + log"Driver is running on worker ${MDC(WORKER_ID, id)} at ${MDC(HOST_PORT, hp)}.") case _ => } // Log exception stack trace, if present @@ -490,7 +499,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { /** Log the response sent by the server in the REST application submission protocol. */ private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = { - logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}") + logInfo(log"Server responded with ${MDC(CLASS_NAME, response.messageType)}:\n" + + log"${MDC(RESULT, response.toJson)}") } /** Log an appropriate error if the response sent by the server is not of the expected type. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index d1190ca46c2a..a3e7276fc83e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SecurityManager, SSLOptions} import org.apache.spark.deploy.Command -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.launcher.WorkerCommandBuilder import org.apache.spark.util.Utils @@ -120,7 +120,8 @@ object CommandUtils extends Logging { Utils.copyStream(in, out, true) } catch { case e: IOException => - logInfo("Redirection to " + file + " closed: " + e.getMessage) + logInfo(log"Redirection to ${MDC(LogKeys.FILE_NAME, file)} closed: " + + log"${MDC(LogKeys.ERROR, e.getMessage)}") } } }.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7bb8b74eb021..233cfbfe5d5c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -204,7 +204,7 @@ private[deploy] class ExecutorRunner( worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) } catch { case interrupted: InterruptedException => - logInfo("Runner thread for executor " + fullId + " interrupted") + logInfo(log"Runner thread for executor ${MDC(EXECUTOR_ID, fullId)} interrupted") state = ExecutorState.KILLED killProcess(None) case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f030475131d2..7ff7974ab59f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -38,7 +38,6 @@ import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ @@ -74,8 +73,8 @@ private[deploy] class Worker( // If worker decommissioning is enabled register a handler on the configured signal to shutdown. if (conf.get(config.DECOMMISSION_ENABLED)) { val signal = conf.get(config.Worker.WORKER_DECOMMISSION_SIGNAL) - logInfo(s"Registering SIG$signal handler to trigger decommissioning.") - SignalUtils.register(signal, log"Failed to register SIG${MDC(LogKeys.SIGNAL, signal)} " + + logInfo(log"Registering SIG${MDC(SIGNAL, signal)} handler to trigger decommissioning.") + SignalUtils.register(signal, log"Failed to register SIG${MDC(SIGNAL, signal)} " + log"handler - disabling worker decommission feature.") { self.send(WorkerDecommissionSigReceived) true @@ -106,8 +105,12 @@ private[deploy] class Worker( private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES) private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES) if (INITIAL_REGISTRATION_RETRIES > TOTAL_REGISTRATION_RETRIES) { - logInfo(s"${WORKER_INITIAL_REGISTRATION_RETRIES.key} ($INITIAL_REGISTRATION_RETRIES) is " + - s"capped by ${WORKER_MAX_REGISTRATION_RETRIES.key} ($TOTAL_REGISTRATION_RETRIES)") + logInfo( + log"${MDC(CONFIG, WORKER_INITIAL_REGISTRATION_RETRIES.key)} " + + log"(${MDC(VALUE, INITIAL_REGISTRATION_RETRIES)}) is capped by " + + log"${MDC(CONFIG2, WORKER_MAX_REGISTRATION_RETRIES.key)} " + + log"(${MDC(MAX_ATTEMPTS, TOTAL_REGISTRATION_RETRIES)})" + ) } private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = { @@ -236,10 +239,11 @@ private[deploy] class Worker( override def onStart(): Unit = { assert(!registered) - logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( - host, port, cores, Utils.megabytesToString(memory))) - logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") - logInfo("Spark home: " + sparkHome) + logInfo(log"Starting Spark worker ${MDC(HOST, host)}:${MDC(PORT, port)} " + + log"with ${MDC(NUM_CORES, cores)} cores, " + + log"${MDC(MEMORY_SIZE, Utils.megabytesToString(memory))} RAM") + logInfo(log"Running Spark version ${MDC(SPARK_VERSION, org.apache.spark.SPARK_VERSION)}") + logInfo(log"Spark home: ${MDC(PATH, sparkHome)}") createWorkDir() startExternalShuffleService() setupWorkerResources() @@ -300,8 +304,9 @@ private[deploy] class Worker( master = Some(masterRef) connected = true if (reverseProxy) { - logInfo("WorkerWebUI is available at %s/proxy/%s".format( - activeMasterWebUiUrl.stripSuffix("/"), workerId)) + logInfo( + log"WorkerWebUI is available at ${MDC(WEB_URL, activeMasterWebUiUrl.stripSuffix("/"))}" + + log"/proxy/${MDC(WORKER_ID, workerId)}") // if reverseProxyUrl is not set, then we continue to generate relative URLs // starting with "/" throughout the UI and do not use activeMasterWebUiUrl val proxyUrl = conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") @@ -318,7 +323,7 @@ private[deploy] class Worker( registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { - logInfo("Connecting to master " + masterAddress + "...") + logInfo(log"Connecting to master ${MDC(MASTER_URL, masterAddress)}...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) sendRegisterMessageToMaster(masterEndpoint) } catch { @@ -342,7 +347,8 @@ private[deploy] class Worker( if (registered) { cancelLastRegistrationRetry() } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { - logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") + logInfo(log"Retrying connection to master (attempt # " + + log"${MDC(NUM_ATTEMPT, connectionAttemptCount)})") /** * Re-register with the active master this worker has been communicating with. If there * is none, then it means this worker is still bootstrapping and hasn't established a @@ -376,7 +382,7 @@ private[deploy] class Worker( registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { - logInfo("Connecting to master " + masterAddress + "...") + logInfo(log"Connecting to master ${MDC(MASTER_URL, masterAddress)}...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) sendRegisterMessageToMaster(masterEndpoint) } catch { @@ -483,7 +489,7 @@ private[deploy] class Worker( log"${MDC(MASTER_URL, preferredMasterAddress)}") } - logInfo(s"Successfully registered with master $preferredMasterAddress") + logInfo(log"Successfully registered with master ${MDC(MASTER_URL, preferredMasterAddress)}") registered = true changeMaster(masterRef, masterWebUiUrl, masterAddress) forwardMessageScheduler.scheduleAtFixedRate( @@ -491,7 +497,8 @@ private[deploy] class Worker( 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) if (CLEANUP_ENABLED) { logInfo( - s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + log"Worker cleanup enabled; old application directories will be deleted in: " + + log"${MDC(PATH, workDir)}") forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) @@ -539,7 +546,7 @@ private[deploy] class Worker( dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => - logInfo(s"Removing directory: ${dir.getPath}") + logInfo(log"Removing directory: ${MDC(PATH, dir.getPath)}") Utils.deleteRecursively(dir) // Remove some registeredExecutors information of DB in external shuffle service when @@ -562,7 +569,8 @@ private[deploy] class Worker( } case MasterChanged(masterRef, masterWebUiUrl) => - logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) + logInfo(log"Master has changed, new master is at " + + log"${MDC(MASTER_URL, masterRef.address.toSparkURL)}") changeMaster(masterRef, masterWebUiUrl, masterRef.address) val executorResponses = executors.values.map { e => @@ -575,7 +583,8 @@ private[deploy] class Worker( workerId, executorResponses.toList, driverResponses.toSeq)) case ReconnectWorker(masterUrl) => - logInfo(s"Master with url $masterUrl requested this worker to reconnect.") + logInfo( + log"Master with url ${MDC(MASTER_URL, masterUrl)} requested this worker to reconnect.") registerWithMaster() case LaunchExecutor(masterUrl, appId, execId, rpId, appDesc, cores_, memory_, resources_) => @@ -586,7 +595,8 @@ private[deploy] class Worker( logWarning("Asked to launch an executor while decommissioned. Not launching executor.") } else { try { - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + logInfo(log"Asked to launch executor ${MDC(APP_ID, appId)}/${MDC(EXECUTOR_ID, execId)}" + + log" for ${MDC(APP_DESC, appDesc.name)}") // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) @@ -645,8 +655,8 @@ private[deploy] class Worker( } catch { case e: Exception => logError( - log"Failed to launch executor ${MDC(APP_ID, appId)}/${MDC(EXECUTOR_ID, execId)} " + - log"for ${MDC(APP_DESC, appDesc.name)}.", e) + log"Failed to launch executor ${MDC(APP_ID, appId)}/" + + log"${MDC(EXECUTOR_ID, execId)} for ${MDC(APP_DESC, appDesc.name)}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId @@ -667,15 +677,15 @@ private[deploy] class Worker( val fullId = appId + "/" + execId executors.get(fullId) match { case Some(executor) => - logInfo("Asked to kill executor " + fullId) + logInfo(log"Asked to kill executor ${MDC(EXECUTOR_ID, fullId)}") executor.kill() case None => - logInfo("Asked to kill unknown executor " + fullId) + logInfo(log"Asked to kill unknown executor ${MDC(EXECUTOR_ID, fullId)}") } } case LaunchDriver(driverId, driverDesc, resources_) => - logInfo(s"Asked to launch driver $driverId") + logInfo(log"Asked to launch driver ${MDC(DRIVER_ID, driverId)}") val driver = new DriverRunner( conf, driverId, @@ -695,7 +705,7 @@ private[deploy] class Worker( addResourcesUsed(resources_) case KillDriver(driverId) => - logInfo(s"Asked to kill driver $driverId") + logInfo(log"Asked to kill driver ${MDC(DRIVER_ID, driverId)}") drivers.get(driverId) match { case Some(runner) => runner.kill() @@ -735,7 +745,7 @@ private[deploy] class Worker( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (master.exists(_.address == remoteAddress) || masterAddressToConnect.contains(remoteAddress)) { - logInfo(s"$remoteAddress Disassociated !") + logInfo(log"${MDC(REMOTE_ADDRESS, remoteAddress)} Disassociated !") masterDisconnected() } } @@ -753,7 +763,7 @@ private[deploy] class Worker( try { appDirectories.remove(id).foreach { dirList => concurrent.Future { - logInfo(s"Cleaning up local directories for application $id") + logInfo(log"Cleaning up local directories for application ${MDC(APP_ID, id)}") dirList.foreach { dir => Utils.deleteRecursively(new File(dir)) } @@ -874,7 +884,7 @@ private[deploy] class Worker( private[deploy] def decommissionSelf(): Unit = { if (conf.get(config.DECOMMISSION_ENABLED) && !decommissioned) { decommissioned = true - logInfo(s"Decommission worker $workerId.") + logInfo(log"Decommission worker ${MDC(WORKER_ID, workerId)}.") } else if (decommissioned) { logWarning(log"Worker ${MDC(WORKER_ID, workerId)} already started decommissioning.") } else { @@ -898,10 +908,10 @@ private[deploy] class Worker( logWarning(log"Driver ${MDC(DRIVER_ID, driverId)} " + log"exited successfully while master is disconnected.") case _ => - logInfo(s"Driver $driverId exited successfully") + logInfo(log"Driver ${MDC(DRIVER_ID, driverId)} exited successfully") } case DriverState.KILLED => - logInfo(s"Driver $driverId was killed by user") + logInfo(log"Driver ${MDC(DRIVER_ID, driverId)} was killed by user") case _ => logDebug(s"Driver $driverId changed state to $state") } @@ -921,13 +931,22 @@ private[deploy] class Worker( if (ExecutorState.isFinished(state)) { val appId = executorStateChanged.appId val fullId = appId + "/" + executorStateChanged.execId - val message = executorStateChanged.message - val exitStatus = executorStateChanged.exitStatus + val message = executorStateChanged.message match { + case Some(msg) => + log" message ${MDC(MESSAGE, msg)}" + case None => + log"" + } + val exitStatus = executorStateChanged.exitStatus match { + case Some(status) => + log" exitStatus ${MDC(EXIT_CODE, status)}" + case None => + log"" + } executors.get(fullId) match { case Some(executor) => - logInfo("Executor " + fullId + " finished with state " + state + - message.map(" message " + _).getOrElse("") + - exitStatus.map(" exitStatus " + _).getOrElse("")) + logInfo(log"Executor ${MDC(EXECUTOR_ID, fullId)} finished with state " + + log"${MDC(EXECUTOR_STATE, state)}" + message + exitStatus) executors -= fullId finishedExecutors(fullId) = executor trimFinishedExecutorsIfNecessary() @@ -939,9 +958,8 @@ private[deploy] class Worker( shuffleService.executorRemoved(executorStateChanged.execId.toString, appId) } case None => - logInfo("Unknown Executor " + fullId + " finished with state " + state + - message.map(" message " + _).getOrElse("") + - exitStatus.map(" exitStatus " + _).getOrElse("")) + logInfo(log"Unknown Executor ${MDC(EXECUTOR_ID, fullId)} finished with state " + + log"${MDC(EXECUTOR_STATE, state)}" + message + exitStatus) } maybeCleanupApplication(appId) } diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala index 0158dd6ba775..7098961d1649 100644 --- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -156,7 +156,7 @@ private[memory] class ExecutionMemoryPool( val memoryToFree = if (curMem < numBytes) { logWarning( log"Internal error: release called on ${MDC(NUM_BYTES, numBytes)} " + - log"bytes but task only has ${MDC(CURRENT_MEMORY_BYTES, curMem)} bytes " + + log"bytes but task only has ${MDC(CURRENT_MEMORY_SIZE, curMem)} bytes " + log"of memory from the ${MDC(MEMORY_POOL_NAME, poolName)} pool") curMem } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index ff899a2e56dc..cbfce378879e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -270,7 +270,7 @@ class HadoopRDD[K, V]( val iter = new NextIterator[(K, V)] { private val split = theSplit.asInstanceOf[HadoopPartition] - logInfo("Input split: " + split.inputSplit) + logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.inputSplit)}") private val jobConf = getJobConf() private val inputMetrics = context.taskMetrics().inputMetrics diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index a87d02287302..3a1ce4bd1dfd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -197,7 +197,7 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { private val split = theSplit.asInstanceOf[NewHadoopPartition] - logInfo("Input split: " + split.serializableHadoopSplit) + logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.serializableHadoopSplit)}") private val conf = getConf private val inputMetrics = context.taskMetrics().inputMetrics diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1f44f7e782c4..ac93abf3fe7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -211,7 +211,7 @@ abstract class RDD[T: ClassTag]( * @return This RDD. */ def unpersist(blocking: Boolean = false): this.type = { - logInfo(s"Removing RDD $id from persistence list") + logInfo(log"Removing RDD ${MDC(RDD_ID, id)} from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 2f6ff0acdf02..118660ef6947 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} /** * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, @@ -58,8 +58,9 @@ class SequenceFileRDDFunctions[K: IsWritable: ClassTag, V: IsWritable: ClassTag] val convertKey = self.keyClass != _keyWritableClass val convertValue = self.valueClass != _valueWritableClass - logInfo("Saving as sequence file of type " + - s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" ) + logInfo(log"Saving as sequence file of type " + + log"(${MDC(LogKeys.KEY, _keyWritableClass.getSimpleName)}," + + log"${MDC(LogKeys.VALUE, _valueWritableClass.getSimpleName)})") val format = classOf[SequenceFileOutputFormat[Writable, Writable]] val jobConf = new JobConf(self.context.hadoopConfiguration) if (!convertKey && !convertValue) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cc9ae5eb1ebe..7c096dd110e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -535,8 +535,9 @@ private[spark] class DAGScheduler( if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " + - s"shuffle ${shuffleDep.shuffleId}") + logInfo(log"Registering RDD ${MDC(RDD_ID, rdd.id)} " + + log"(${MDC(CREATION_SITE, rdd.getCreationSite)}) as input to " + + log"shuffle ${MDC(SHUFFLE_ID, shuffleDep.shuffleId)}") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length, shuffleDep.partitioner.numPartitions) } @@ -1097,7 +1098,7 @@ private[spark] class DAGScheduler( * Cancel a job that is running or waiting in the queue. */ def cancelJob(jobId: Int, reason: Option[String]): Unit = { - logInfo("Asked to cancel job " + jobId) + logInfo(log"Asked to cancel job ${MDC(JOB_ID, jobId)}") eventProcessLoop.post(JobCancelled(jobId, reason)) } @@ -1106,7 +1107,8 @@ private[spark] class DAGScheduler( * @param cancelFutureJobs if true, future submitted jobs in this job group will be cancelled */ def cancelJobGroup(groupId: String, cancelFutureJobs: Boolean = false): Unit = { - logInfo(s"Asked to cancel job group $groupId with cancelFutureJobs=$cancelFutureJobs") + logInfo(log"Asked to cancel job group ${MDC(GROUP_ID, groupId)} with " + + log"cancelFutureJobs=${MDC(CANCEL_FUTURE_JOBS, cancelFutureJobs)}") eventProcessLoop.post(JobGroupCancelled(groupId, cancelFutureJobs)) } @@ -1115,7 +1117,7 @@ private[spark] class DAGScheduler( */ def cancelJobsWithTag(tag: String): Unit = { SparkContext.throwIfInvalidTag(tag) - logInfo(s"Asked to cancel jobs with tag $tag") + logInfo(log"Asked to cancel jobs with tag ${MDC(TAG, tag)}") eventProcessLoop.post(JobTagCancelled(tag)) } @@ -1209,7 +1211,7 @@ private[spark] class DAGScheduler( // If cancelFutureJobs is true, store the cancelled job group id into internal states. // When a job belonging to this job group is submitted, skip running it. if (cancelFutureJobs) { - logInfo(s"Add job group $groupId into cancelled job groups") + logInfo(log"Add job group ${MDC(GROUP_ID, groupId)} into cancelled job groups") cancelledJobGroups.add(groupId) } @@ -1314,7 +1316,7 @@ private[spark] class DAGScheduler( if (jobGroupIdOpt.exists(cancelledJobGroups.contains(_))) { listener.jobFailed( SparkCoreErrors.sparkJobCancelledAsPartOfJobGroupError(jobId, jobGroupIdOpt.get)) - logInfo(s"Skip running a job that belongs to the cancelled job group ${jobGroupIdOpt.get}.") + logInfo(log"Skip running a job that belongs to the cancelled job group ${MDC(GROUP_ID, jobGroupIdOpt.get)}") return } @@ -1362,11 +1364,13 @@ private[spark] class DAGScheduler( val job = new ActiveJob(jobId, finalStage, callSite, listener, artifacts, properties) clearCacheLocs() - logInfo("Got job %s (%s) with %d output partitions".format( - job.jobId, callSite.shortForm, partitions.length)) - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) + logInfo( + log"Got job ${MDC(JOB_ID, job.jobId)} (${MDC(CALL_SITE_SHORT_FORM, callSite.shortForm)}) " + + log"with ${MDC(NUM_PARTITIONS, partitions.length)} output partitions") + logInfo(log"Final stage: ${MDC(STAGE_ID, finalStage)} " + + log"(${MDC(STAGE_NAME, finalStage.name)})") + logInfo(log"Parents of final stage: ${MDC(STAGE_ID, finalStage.parents)}") + logInfo(log"Missing parents: ${MDC(MISSING_PARENT_STAGES, getMissingParentStages(finalStage))}") val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job @@ -1403,11 +1407,13 @@ private[spark] class DAGScheduler( val job = new ActiveJob(jobId, finalStage, callSite, listener, artifacts, properties) clearCacheLocs() - logInfo("Got map stage job %s (%s) with %d output partitions".format( - jobId, callSite.shortForm, dependency.rdd.partitions.length)) - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) + logInfo(log"Got map stage job ${MDC(JOB_ID, jobId)} " + + log"(${MDC(CALL_SITE_SHORT_FORM, callSite.shortForm)}) with " + + log"${MDC(NUM_PARTITIONS, dependency.rdd.partitions.length)} output partitions") + logInfo(log"Final stage: ${MDC(STAGE_ID, finalStage)} " + + log"(${MDC(STAGE_NAME, finalStage.name)})") + logInfo(log"Parents of final stage: ${MDC(PARENT_STAGES, finalStage.parents.toString)}") + logInfo(log"Missing parents: ${MDC(MISSING_PARENT_STAGES, getMissingParentStages(finalStage))}") val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job @@ -1444,7 +1450,8 @@ private[spark] class DAGScheduler( val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { - logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + logInfo(log"Submitting ${MDC(STAGE_ID, stage)} (${MDC(RDD_ID, stage.rdd)}), " + + log"which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { @@ -1495,13 +1502,16 @@ private[spark] class DAGScheduler( val shuffleId = stage.shuffleDep.shuffleId val shuffleMergeId = stage.shuffleDep.shuffleMergeId if (stage.shuffleDep.shuffleMergeEnabled) { - logInfo(s"Shuffle merge enabled before starting the stage for $stage with shuffle" + - s" $shuffleId and shuffle merge $shuffleMergeId with" + - s" ${stage.shuffleDep.getMergerLocs.size} merger locations") + logInfo(log"Shuffle merge enabled before starting the stage for ${MDC(STAGE_ID, stage)}" + + log" with shuffle ${MDC(SHUFFLE_ID, shuffleId)} and shuffle merge" + + log" ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)} with" + + log" ${MDC(NUM_MERGER_LOCATIONS, stage.shuffleDep.getMergerLocs.size.toString)} merger locations") } else { - logInfo(s"Shuffle merge disabled for $stage with shuffle $shuffleId" + - s" and shuffle merge $shuffleMergeId, but can get enabled later adaptively" + - s" once enough mergers are available") + logInfo(log"Shuffle merge disabled for ${MDC(STAGE_ID, stage)} with " + + log"shuffle ${MDC(SHUFFLE_ID, shuffleId)} and " + + log"shuffle merge ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)}, " + + log"but can get enabled later adaptively once enough " + + log"mergers are available") } } @@ -1558,8 +1568,8 @@ private[spark] class DAGScheduler( // merger locations but the corresponding shuffle map stage did not complete // successfully, we would still enable push for its retry. s.shuffleDep.setShuffleMergeAllowed(false) - logInfo(s"Push-based shuffle disabled for $stage (${stage.name}) since it" + - " is already shuffle merge finalized") + logInfo(log"Push-based shuffle disabled for ${MDC(STAGE_ID, stage)} " + + log"(${MDC(STAGE_NAME, stage.name)}) since it is already shuffle merge finalized") } } case s: ResultStage => @@ -1681,8 +1691,9 @@ private[spark] class DAGScheduler( } if (tasks.nonEmpty) { - logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + - s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") + logInfo(log"Submitting ${MDC(NUM_TASKS, tasks.size)} missing tasks from " + + log"${MDC(STAGE_ID, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15 tasks are " + + log"for partitions ${MDC(PARTITION_IDS, tasks.take(15).map(_.partitionId))})") val shuffleId = stage match { case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId) case _: ResultStage => None @@ -1751,9 +1762,10 @@ private[spark] class DAGScheduler( case Some(accum) => accum.getClass.getName case None => "Unknown class" } - logError( - log"Failed to update accumulator ${MDC(ACCUMULATOR_ID, id)} (${MDC(CLASS_NAME, accumClassName)}) " + - log"for task ${MDC(PARTITION_ID, task.partitionId)}", e) + logError( + log"Failed to update accumulator ${MDC(ACCUMULATOR_ID, id)} " + + log"(${MDC(CLASS_NAME, accumClassName)}) for task " + + log"${MDC(PARTITION_ID, task.partitionId)}", e) } } } @@ -1926,8 +1938,8 @@ private[spark] class DAGScheduler( try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement // killTask. - logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " + - "or zombie tasks for this job") + logInfo(log"Job ${MDC(JOB_ID, job.jobId)} is finished. Cancelling " + + log"potential speculative or zombie tasks for this job") // ResultStage is only used by this job. It's safe to kill speculative or // zombie tasks in this stage. taskScheduler.killAllTaskAttempts( @@ -1954,7 +1966,7 @@ private[spark] class DAGScheduler( } } case None => - logInfo("Ignoring result from " + rt + " because its job has finished") + logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished") } case smt: ShuffleMapTask => @@ -1969,7 +1981,8 @@ private[spark] class DAGScheduler( logDebug("ShuffleMapTask finished on " + execId) if (executorFailureEpoch.contains(execId) && smt.epoch <= executorFailureEpoch(execId)) { - logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") + logInfo(log"Ignoring possibly bogus ${MDC(STAGE_ID, smt)} completion from " + + log"executor ${MDC(EXECUTOR_ID, execId)}") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as @@ -1978,7 +1991,7 @@ private[spark] class DAGScheduler( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) } } else { - logInfo(s"Ignoring $smt completion from an older attempt of indeterminate stage") + logInfo(log"Ignoring ${MDC(TASK_NAME, smt)} completion from an older attempt of indeterminate stage") } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { @@ -1996,17 +2009,22 @@ private[spark] class DAGScheduler( val mapStage = shuffleIdToMapStage(shuffleId) if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + - s"(attempt ${failedStage.latestInfo.attemptNumber()}) running") + logInfo(log"Ignoring fetch failure from " + + log"${MDC(TASK_ID, task)} as it's from " + + log"${MDC(STAGE_ID, failedStage)} attempt " + + log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} and there is a more recent attempt for " + + log"that stage (attempt " + + log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running") } else { val ignoreStageFailure = ignoreDecommissionFetchFailure && isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress) if (ignoreStageFailure) { - logInfo(s"Ignoring fetch failure from $task of $failedStage attempt " + - s"${task.stageAttemptId} when count ${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key} " + - s"as executor ${bmAddress.executorId} is decommissioned and " + - s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true") + logInfo(log"Ignoring fetch failure from ${MDC(TASK_ID, task)} of " + + log"${MDC(STAGE, failedStage)} attempt " + + log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} when count " + + log"${MDC(MAX_ATTEMPTS, config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} " + + log"as executor ${MDC(EXECUTOR_ID, bmAddress.executorId)} is decommissioned and " + + log"${MDC(CONFIG, config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key)}=true") } else { failedStage.failedAttemptIds.add(task.stageAttemptId) } @@ -2019,8 +2037,10 @@ private[spark] class DAGScheduler( // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") + logInfo(log"Marking ${MDC(FAILED_STAGE, failedStage)} " + + log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) as failed " + + log"due to a fetch failure from ${MDC(STAGE, mapStage)} " + + log"(${MDC(STAGE_NAME, mapStage.name)})") markStageAsFinished(failedStage, errorMessage = Some(failureMessage), willRetry = !shouldAbortStage) } else { @@ -2148,9 +2168,9 @@ private[spark] class DAGScheduler( case _ => } - logInfo(s"The shuffle map stage $mapStage with indeterminate output was failed, " + - s"we will roll back and rerun below stages which include itself and all its " + - s"indeterminate child stages: $rollingBackStages") + logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + + log"we will roll back and rerun below stages which include itself and all its " + + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } // We expect one executor failure to trigger many FetchFailures in rapid succession, @@ -2162,9 +2182,9 @@ private[spark] class DAGScheduler( // producing a resubmit for each failed stage makes debugging and logging a little // simpler while not producing an overwhelming number of scheduler events. logInfo( - s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure" - ) + log"Resubmitting ${MDC(STAGE, mapStage)} " + + log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " + + log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure") messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) @@ -2223,12 +2243,13 @@ private[spark] class DAGScheduler( // Always fail the current stage and retry all the tasks when a barrier task fail. val failedStage = stageIdToStage(task.stageId) if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) { - logInfo(s"Ignoring task failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + - s"(attempt ${failedStage.latestInfo.attemptNumber()}) running") + logInfo(log"Ignoring task failure from ${MDC(TASK_ID, task)} as it's from " + + log"${MDC(FAILED_STAGE, failedStage)} attempt ${MDC(STAGE_ATTEMPT, task.stageAttemptId)} " + + log"and there is a more recent attempt for that stage (attempt " + + log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running") } else { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + - "failed.") + logInfo(log"Marking ${MDC(STAGE_ID, failedStage.id)} (${MDC(STAGE_NAME, failedStage.name)}) " + + log"as failed due to a barrier task failed.") val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + failure.toErrorString try { @@ -2283,8 +2304,8 @@ private[spark] class DAGScheduler( val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage if (noResubmitEnqueued) { - logInfo(s"Resubmitting $failedStage (${failedStage.name}) due to barrier stage " + - "failure.") + logInfo(log"Resubmitting ${MDC(FAILED_STAGE, failedStage)} " + + log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to barrier stage failure.") messageScheduler.schedule(new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) @@ -2361,8 +2382,8 @@ private[spark] class DAGScheduler( // delay should be 0 and registerMergeResults should be true. assert(delay == 0 && registerMergeResults) if (task.getDelay(TimeUnit.NANOSECONDS) > 0 && task.cancel(false)) { - logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle merge immediately " + - s"after cancelling previously scheduled task.") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) scheduled " + + log"for finalizing shuffle merge immediately after cancelling previously scheduled task.") shuffleDep.setFinalizeTask( shuffleMergeFinalizeScheduler.schedule( new Runnable { @@ -2373,13 +2394,15 @@ private[spark] class DAGScheduler( ) ) } else { - logInfo(s"$stage (${stage.name}) existing scheduled task for finalizing shuffle merge" + - s"would either be in-progress or finished. No need to schedule shuffle merge" + - s" finalization again.") + logInfo( + log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) existing scheduled task " + + log"for finalizing shuffle merge would either be in-progress or finished. " + + log"No need to schedule shuffle merge finalization again.") } case None => // If no previous finalization task is scheduled, schedule the finalization task. - logInfo(s"$stage (${stage.name}) scheduled for finalizing shuffle merge in $delay s") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) scheduled for " + + log"finalizing shuffle merge in ${MDC(DELAY, delay * 1000L)} ms") shuffleDep.setFinalizeTask( shuffleMergeFinalizeScheduler.schedule( new Runnable { @@ -2408,8 +2431,9 @@ private[spark] class DAGScheduler( private[scheduler] def finalizeShuffleMerge( stage: ShuffleMapStage, registerMergeResults: Boolean = true): Unit = { - logInfo(s"$stage (${stage.name}) finalizing the shuffle merge with registering merge " + - s"results set to $registerMergeResults") + logInfo( + log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) finalizing the shuffle merge with" + + log" registering merge results set to ${MDC(REGISTER_MERGE_RESULTS, registerMergeResults)}") val shuffleId = stage.shuffleDep.shuffleId val shuffleMergeId = stage.shuffleDep.shuffleMergeId val numMergers = stage.shuffleDep.getMergerLocs.length @@ -2479,8 +2503,9 @@ private[spark] class DAGScheduler( } catch { case _: TimeoutException => timedOut = true - logInfo(s"Timed out on waiting for merge results from all " + - s"$numMergers mergers for shuffle $shuffleId") + logInfo(log"Timed out on waiting for merge results from all " + + log"${MDC(NUM_MERGERS, numMergers)} mergers for " + + log"shuffle ${MDC(SHUFFLE_ID, shuffleId)}") } finally { if (timedOut || !registerMergeResults) { cancelFinalizeShuffleMergeFutures(scheduledFutures, @@ -2511,9 +2536,9 @@ private[spark] class DAGScheduler( private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): Unit = { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") - logInfo("running: " + runningStages) - logInfo("waiting: " + waitingStages) - logInfo("failed: " + failedStages) + logInfo(log"running: ${MDC(STAGES, runningStages)}") + logInfo(log"waiting: ${MDC(STAGES, waitingStages)}") + logInfo(log"failed: ${MDC(STAGES, failedStages)}") // This call to increment the epoch may not be strictly necessary, but it is retained // for now in order to minimize the changes in behavior from an earlier version of the @@ -2529,9 +2554,10 @@ private[spark] class DAGScheduler( if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // TODO: Lower-level scheduler should also deal with this - logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + - ") because some of its tasks had failed: " + - shuffleStage.findMissingPartitions().mkString(", ")) + logInfo(log"Resubmitting ${MDC(STAGE, shuffleStage)} " + + log"(${MDC(STAGE_NAME, shuffleStage.name)}) " + + log"because some of its tasks had failed: " + + log"${MDC(PARTITION_IDS, shuffleStage.findMissingPartitions().mkString(", "))}") submitStage(shuffleStage) } else { markMapStageJobsAsFinished(shuffleStage) @@ -2603,7 +2629,7 @@ private[spark] class DAGScheduler( } private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { - logInfo(s"Resubmitted $task, so marking it as still running.") + logInfo(log"Resubmitted ${MDC(TASK_ID, task)}, so marking it as still running.") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId @@ -2679,7 +2705,7 @@ private[spark] class DAGScheduler( if (!isShuffleMerger && (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch)) { executorFailureEpoch(execId) = currentEpoch - logInfo(s"Executor lost: $execId (epoch $currentEpoch)") + logInfo(log"Executor lost: ${MDC(EXECUTOR_ID, execId)} (epoch ${MDC(EPOCH, currentEpoch)})") if (pushBasedShuffleEnabled) { // Remove fetchFailed host in the shuffle push merger list for push based shuffle hostToUnregisterOutputs.foreach( @@ -2703,10 +2729,12 @@ private[spark] class DAGScheduler( if (remove) { hostToUnregisterOutputs match { case Some(host) => - logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") + logInfo(log"Shuffle files lost for host: ${MDC(HOST, host)} (epoch " + + log"${MDC(EPOCH, currentEpoch)}") mapOutputTracker.removeOutputsOnHost(host) case None => - logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") + logInfo(log"Shuffle files lost for executor: ${MDC(EXECUTOR_ID, execId)} " + + log"(epoch ${MDC(EPOCH, currentEpoch)})") mapOutputTracker.removeOutputsOnExecutor(execId) } } @@ -2728,7 +2756,8 @@ private[spark] class DAGScheduler( workerId: String, host: String, message: String): Unit = { - logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host)) + logInfo(log"Shuffle files lost for worker ${MDC(WORKER_ID, workerId)} " + + log"on host ${MDC(HOST, host)}") mapOutputTracker.removeOutputsOnHost(host) clearCacheLocs() } @@ -2736,7 +2765,7 @@ private[spark] class DAGScheduler( private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { // remove from executorFailureEpoch(execId) ? if (executorFailureEpoch.contains(execId)) { - logInfo("Host added was in lost list earlier: " + host) + logInfo(log"Host added was in lost list earlier: ${MDC(HOST, host)}") executorFailureEpoch -= execId } shuffleFileLostEpoch -= execId @@ -2749,10 +2778,10 @@ private[spark] class DAGScheduler( }.foreach { case (_, stage: ShuffleMapStage) => configureShufflePushMergerLocations(stage) if (stage.shuffleDep.getMergerLocs.nonEmpty) { - logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + - s" ${stage.shuffleDep.shuffleId} and shuffle merge" + - s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + - s" merger locations") + logInfo(log"Shuffle merge enabled adaptively for ${MDC(STAGE, stage)} with shuffle" + + log" ${MDC(SHUFFLE_ID, stage.shuffleDep.shuffleId)} and shuffle merge" + + log" ${MDC(SHUFFLE_MERGE_ID, stage.shuffleDep.shuffleMergeId)} with " + + log"${MDC(NUM_MERGER_LOCATIONS, stage.shuffleDep.getMergerLocs.size)} merger locations") } } } @@ -2772,7 +2801,7 @@ private[spark] class DAGScheduler( handleJobCancellation(jobId, Option(reasonStr)) } case None => - logInfo("No active jobs to kill for Stage " + stageId) + logInfo(log"No active jobs to kill for Stage ${MDC(STAGE_ID, stageId)}") } } @@ -2795,11 +2824,12 @@ private[spark] class DAGScheduler( errorMessage: Option[String] = None, willRetry: Boolean = false): Unit = { val serviceTime = stage.latestInfo.submissionTime match { - case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) + case Some(t) => clock.getTimeMillis() - t case _ => "Unknown" } if (errorMessage.isEmpty) { - logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) " + + log"finished in ${MDC(TIME_UNITS, serviceTime)} ms") stage.latestInfo.completionTime = Some(clock.getTimeMillis()) // Clear failure count for this stage, now that it's succeeded. @@ -2809,7 +2839,8 @@ private[spark] class DAGScheduler( stage.clearFailures() } else { stage.latestInfo.stageFailed(errorMessage.get) - logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") + logInfo(log"${MDC(STAGE, stage)} (${MDC(STAGE_NAME, stage.name)}) failed in " + + log"${MDC(TIME_UNITS, serviceTime)} ms due to ${MDC(ERROR, errorMessage.get)}") } updateStageInfoForPushBasedShuffle(stage) if (!willRetry) { @@ -2855,7 +2886,8 @@ private[spark] class DAGScheduler( failJobAndIndependentStages(job, finalException) } if (dependentJobs.isEmpty) { - logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") + logInfo(log"Ignoring failure of ${MDC(FAILED_STAGE, failedStage)} because all jobs " + + log"depending on it are done") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala index cecf5d498ac4..160607215390 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala @@ -111,8 +111,8 @@ private[scheduler] class HealthTracker ( val execsToInclude = executorIdToExcludedStatus.filter(_._2.expiryTime < now).keys if (execsToInclude.nonEmpty) { // Include any executors that have been excluded longer than the excludeOnFailure timeout. - logInfo(s"Removing executors $execsToInclude from exclude list because the " + - s"the executors have reached the timed out") + logInfo(log"Removing executors ${MDC(EXECUTOR_IDS, execsToInclude)} from " + + log"exclude list because the executors have reached the timed out") execsToInclude.foreach { exec => val status = executorIdToExcludedStatus.remove(exec).get val failedExecsOnNode = nodeToExcludedExecs(status.node) @@ -128,8 +128,8 @@ private[scheduler] class HealthTracker ( val nodesToInclude = nodeIdToExcludedExpiryTime.filter(_._2 < now).keys if (nodesToInclude.nonEmpty) { // Include any nodes that have been excluded longer than the excludeOnFailure timeout. - logInfo(s"Removing nodes $nodesToInclude from exclude list because the " + - s"nodes have reached has timed out") + logInfo(log"Removing nodes ${MDC(NODES, nodesToInclude)} from exclude list because the " + + log"nodes have reached has timed out") nodesToInclude.foreach { node => nodeIdToExcludedExpiryTime.remove(node) // post both to keep backwards compatibility @@ -173,8 +173,8 @@ private[scheduler] class HealthTracker ( force = true) } case None => - logInfo(s"Not attempting to kill excluded executor id $exec " + - s"since allocation client is not defined.") + logInfo(log"Not attempting to kill excluded executor id ${MDC(EXECUTOR_ID, exec)}" + + log" since allocation client is not defined.") } } @@ -196,14 +196,15 @@ private[scheduler] class HealthTracker ( allocationClient match { case Some(a) => if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - logInfo(s"Decommissioning all executors on excluded host $node " + - s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") + logInfo(log"Decommissioning all executors on excluded host ${MDC(HOST, node)} " + + log"since ${MDC(CONFIG, config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key)} " + + log"is set.") if (!a.decommissionExecutorsOnHost(node)) { logError(log"Decommissioning executors on ${MDC(HOST, node)} failed.") } } else { - logInfo(s"Killing all executors on excluded host $node " + - s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") + logInfo(log"Killing all executors on excluded host ${MDC(HOST, node)} " + + log"since ${MDC(CONFIG, config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key)} is set.") if (!a.killExecutorsOnHost(node)) { logError(log"Killing executors on node ${MDC(HOST, node)} failed.") } @@ -231,7 +232,8 @@ private[scheduler] class HealthTracker ( if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { if (!nodeIdToExcludedExpiryTime.contains(host)) { - logInfo(s"excluding node $host due to fetch failure of external shuffle service") + logInfo(log"excluding node ${MDC(HOST, host)} due to fetch failure of " + + log"external shuffle service") nodeIdToExcludedExpiryTime.put(host, expiryTimeForNewExcludes) // post both to keep backwards compatibility @@ -242,7 +244,7 @@ private[scheduler] class HealthTracker ( updateNextExpiryTime() } } else if (!executorIdToExcludedStatus.contains(exec)) { - logInfo(s"Excluding executor $exec due to fetch failure") + logInfo(log"Excluding executor ${MDC(EXECUTOR_ID, exec)} due to fetch failure") executorIdToExcludedStatus.put(exec, ExcludedExecutor(host, expiryTimeForNewExcludes)) // We hardcoded number of failure tasks to 1 for fetch failure, because there's no @@ -280,8 +282,8 @@ private[scheduler] class HealthTracker ( // some of the logic around expiry times a little more confusing. But it also wouldn't be a // problem to re-exclude, with a later expiry time. if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToExcludedStatus.contains(exec)) { - logInfo(s"Excluding executor id: $exec because it has $newTotal" + - s" task failures in successful task sets") + logInfo(log"Excluding executor id: ${MDC(EXECUTOR_ID, exec)} because it has " + + log"${MDC(TOTAL, newTotal)} task failures in successful task sets") val node = failuresInTaskSet.node executorIdToExcludedStatus.put(exec, ExcludedExecutor(node, expiryTimeForNewExcludes)) // post both to keep backwards compatibility @@ -299,8 +301,9 @@ private[scheduler] class HealthTracker ( // time. if (excludedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE && !nodeIdToExcludedExpiryTime.contains(node)) { - logInfo(s"Excluding node $node because it has ${excludedExecsOnNode.size} " + - s"executors excluded: ${excludedExecsOnNode}") + logInfo(log"Excluding node ${MDC(HOST, node)} because it has " + + log"${MDC(NUM_EXECUTORS, excludedExecsOnNode.size)} executors " + + log"excluded: ${MDC(EXECUTOR_IDS, excludedExecsOnNode)}") nodeIdToExcludedExpiryTime.put(node, expiryTimeForNewExcludes) // post both to keep backwards compatibility listenerBus.post(SparkListenerNodeBlacklisted(now, node, excludedExecsOnNode.size)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index cd5d6b8f9c90..d9020da4cdcb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler import scala.collection.mutable import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.util.{RpcUtils, ThreadUtils} @@ -124,7 +124,7 @@ private[spark] class OutputCommitCoordinator( stageStates.get(stage) match { case Some(state) => require(state.authorizedCommitters.length == maxPartitionId + 1) - logInfo(s"Reusing state from previous attempt of stage $stage.") + logInfo(log"Reusing state from previous attempt of stage ${MDC(LogKeys.STAGE_ID, stage)}") case _ => stageStates(stage) = new StageState(maxPartitionId + 1) @@ -151,8 +151,10 @@ private[spark] class OutputCommitCoordinator( case Success => // The task output has been committed successfully case _: TaskCommitDenied => - logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " + - s"partition: $partition, attempt: $attemptNumber") + logInfo(log"Task was denied committing, stage: ${MDC(LogKeys.STAGE_ID, stage)}." + + log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)}, " + + log"partition: ${MDC(LogKeys.PARTITION_ID, partition)}, " + + log"attempt: ${MDC(LogKeys.NUM_ATTEMPT, attemptNumber)}") case _ => // Mark the attempt as failed to exclude from future commit protocol val taskId = TaskIdentifier(stageAttempt, attemptNumber) @@ -182,8 +184,10 @@ private[spark] class OutputCommitCoordinator( attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => - logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + - s"task attempt $attemptNumber already marked as failed.") + logInfo(log"Commit denied for stage=${MDC(LogKeys.STAGE_ID, stage)}." + + log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)}, partition=" + + log"${MDC(LogKeys.PARTITION_ID, partition)}: task attempt " + + log"${MDC(LogKeys.NUM_ATTEMPT, attemptNumber)} already marked as failed.") false case Some(state) => val existing = state.authorizedCommitters(partition) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index b5cc6261cea3..6f64dff3f39d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -80,20 +80,23 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext fileData = schedulerAllocFile.map { f => val filePath = new Path(f) val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath) - logInfo(s"Creating Fair Scheduler pools from $f") + logInfo(log"Creating Fair Scheduler pools from ${MDC(LogKeys.FILE_NAME, f)}") Some((fis, f)) }.getOrElse { val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) if (is != null) { - logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") + logInfo(log"Creating Fair Scheduler pools from default file: " + + log"${MDC(LogKeys.FILE_NAME, DEFAULT_SCHEDULER_FILE)}") Some((is, DEFAULT_SCHEDULER_FILE)) } else { val schedulingMode = SchedulingMode.withName(sc.conf.get(SCHEDULER_MODE)) rootPool.addSchedulable(new Pool( DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) - logInfo("Fair scheduler configuration not found, created default pool: " + - "%s, schedulingMode: %s, minShare: %d, weight: %d".format( - DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + logInfo(log"Fair scheduler configuration not found, created default pool: " + + log"${MDC(LogKeys.DEFAULT_NAME, DEFAULT_POOL_NAME)}, " + + log"schedulingMode: ${MDC(LogKeys.SCHEDULING_MODE, schedulingMode)}, " + + log"minShare: ${MDC(LogKeys.MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + + log"weight: ${MDC(LogKeys.WEIGHT, DEFAULT_WEIGHT)}") None } } @@ -122,8 +125,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool) - logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( - DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + logInfo(log"Created default pool: ${MDC(LogKeys.POOL_NAME, DEFAULT_POOL_NAME)}, " + + log"schedulingMode: ${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + + log"minShare: ${MDC(LogKeys.MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + + log"weight: ${MDC(LogKeys.WEIGHT, DEFAULT_WEIGHT)}") } } @@ -142,8 +147,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, schedulingMode, minShare, weight)) + logInfo(log"Created pool: ${MDC(LogKeys.POOL_NAME, poolName)}, " + + log"schedulingMode: ${MDC(LogKeys.SCHEDULING_MODE, schedulingMode)}, " + + log"minShare: ${MDC(LogKeys.MIN_SHARE, minShare)}, " + + log"weight: ${MDC(LogKeys.WEIGHT, weight)}") } } @@ -159,7 +166,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"${MDC(XML_SCHEDULING_MODE, xmlSchedulingMode)} found in " + log"Fair Scheduler configuration file: ${MDC(FILE_NAME, fileName)}, using " + log"the default schedulingMode: " + - log"${MDC(LogKeys.DEFAULT_SCHEDULING_MODE, defaultValue)} for pool: " + + log"${MDC(LogKeys.SCHEDULING_MODE, defaultValue)} for pool: " + log"${MDC(POOL_NAME, poolName)}" try { if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) { @@ -215,11 +222,12 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " + log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " + log"configuration (schedulingMode: " + - log"${MDC(LogKeys.DEFAULT_SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + + log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") } parentPool.addSchedulable(manager) - logInfo("Added task set " + manager.name + " tasks to pool " + poolName) + logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + + log"${MDC(LogKeys.POOL_NAME, poolName)}") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala index 1f12b46412bc..e46dde5561a2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.util.{Distribution, Utils} @@ -46,7 +46,8 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { implicit val sc = stageCompleted - this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") + this.logInfo( + log"Finished stage: ${MDC(LogKeys.STAGE, getStatusDetail(stageCompleted.stageInfo))}") showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics.toSeq) // Shuffle write @@ -111,9 +112,9 @@ private[spark] object StatsReportListener extends Logging { def showDistribution(heading: String, d: Distribution, formatNumber: Double => String): Unit = { val stats = d.statCounter val quantiles = d.getQuantiles(probabilities).map(formatNumber) - logInfo(heading + stats) + logInfo(log"${MDC(LogKeys.DESCRIPTION, heading)}${MDC(LogKeys.STATS, stats)}") logInfo(percentilesHeader) - logInfo("\t" + quantiles.mkString("\t")) + logInfo(log"\t" + log"${MDC(LogKeys.QUANTILES, quantiles.mkString("\t"))}") } def showDistribution( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 15bdd58288f1..ad0e0ddb687e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -250,8 +250,9 @@ private[spark] class TaskSchedulerImpl( override def submitTasks(taskSet: TaskSet): Unit = { val tasks = taskSet.tasks - logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks " - + "resource profile " + taskSet.resourceProfileId) + logInfo(log"Adding task set ${MDC(LogKeys.TASK_SET_ID, taskSet.id)} with " + + log"${MDC(LogKeys.NUM_TASKS, tasks.length)} tasks resource profile " + + log"${MDC(LogKeys.RESOURCE_PROFILE_ID, taskSet.resourceProfileId)}") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId @@ -306,9 +307,10 @@ private[spark] class TaskSchedulerImpl( stageId: Int, interruptThread: Boolean, reason: String): Unit = synchronized { - logInfo("Cancelling stage " + stageId) + logInfo(log"Canceling stage ${MDC(LogKeys.STAGE_ID, stageId)}") // Kill all running tasks for the stage. - logInfo(s"Killing all running tasks in stage $stageId: $reason") + logInfo(log"Killing all running tasks in stage ${MDC(LogKeys.STAGE_ID, stageId)}: " + + log"${MDC(LogKeys.REASON, reason)}") taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => // There are two possible cases here: @@ -322,7 +324,8 @@ private[spark] class TaskSchedulerImpl( } } tsm.suspend() - logInfo("Stage %s.%s was cancelled".format(stageId, tsm.taskSet.stageAttemptId)) + logInfo(log"Stage ${MDC(LogKeys.STAGE_ID, stageId)}." + + log"${MDC(LogKeys.STAGE_ATTEMPT, tsm.taskSet.stageAttemptId)} was cancelled") } } } @@ -331,7 +334,7 @@ private[spark] class TaskSchedulerImpl( taskId: Long, interruptThread: Boolean, reason: String): Boolean = synchronized { - logInfo(s"Killing task $taskId: $reason") + logInfo(log"Killing task ${MDC(LogKeys.TASK_ID, taskId)}: ${MDC(LogKeys.REASON, reason)}") val execId = taskIdToExecutorId.get(taskId) if (execId.isDefined) { backend.killTask(taskId, execId.get, interruptThread, reason) @@ -361,8 +364,8 @@ private[spark] class TaskSchedulerImpl( } noRejectsSinceLastReset -= manager.taskSet manager.parent.removeSchedulable(manager) - logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + - s" ${manager.parent.name}") + logInfo(log"Removed TaskSet ${MDC(LogKeys.TASK_SET_NAME, manager.taskSet.id)}, whose tasks " + + log"have all completed, from pool ${MDC(LogKeys.POOL_NAME, manager.parent.name)}") } /** @@ -559,9 +562,10 @@ private[spark] class TaskSchedulerImpl( // Skip the launch process. // TODO SPARK-24819 If the job requires more slots than available (both busy and free // slots), fail the job on submit. - logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + - s"number of available slots is $numBarrierSlotsAvailable.") + logInfo(log"Skip current round of resource offers for barrier stage " + + log"${MDC(LogKeys.STAGE_ID, taskSet.stageId)} because the barrier taskSet requires " + + log"${MDC(LogKeys.TASK_SET_NAME, taskSet.numTasks)} slots, while the total " + + log"number of available slots is ${MDC(LogKeys.NUM_SLOTS, numBarrierSlotsAvailable)}.") } else { var launchedAnyTask = false var noDelaySchedulingRejects = true @@ -619,18 +623,18 @@ private[spark] class TaskSchedulerImpl( // in order to provision more executors to make them schedulable if (Utils.isDynamicAllocationEnabled(conf)) { if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { - logInfo("Notifying ExecutorAllocationManager to allocate more executors to" + - " schedule the unschedulable task before aborting" + - s" stage ${taskSet.stageId}.") + logInfo(log"Notifying ExecutorAllocationManager to allocate more executors to" + + log" schedule the unschedulable task before aborting" + + log" stage ${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.") dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId, taskSet.taskSet.stageAttemptId) updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex) } } else { // Abort Immediately - logInfo("Cannot schedule any task because all executors excluded from " + - "failures. No idle executors can be found to kill. Aborting stage " + - s"${taskSet.stageId}.") + logInfo(log"Cannot schedule any task because all executors excluded from " + + log"failures. No idle executors can be found to kill. Aborting stage " + + log"${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.") taskSet.abortSinceCompletelyExcludedOnFailure(taskIndex) } } @@ -643,8 +647,8 @@ private[spark] class TaskSchedulerImpl( // non-excluded executor and the abort timer doesn't kick in because of a constant // submission of new TaskSets. See the PR for more details. if (unschedulableTaskSetToExpiryTime.nonEmpty) { - logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " + - "recently scheduled.") + logInfo(log"Clearing the expiry times for all unschedulable taskSets as a task " + + log"was recently scheduled.") // Notify ExecutorAllocationManager as well as other subscribers that a task now // recently becomes schedulable dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId, @@ -679,8 +683,8 @@ private[spark] class TaskSchedulerImpl( val curTime = clock.getTimeMillis() if (curTime - taskSet.lastResourceOfferFailLogTime > TaskSetManager.BARRIER_LOGGING_INTERVAL) { - logInfo("Releasing the assigned resource offers since only partial tasks can " + - "be launched. Waiting for later round resource offers.") + logInfo(log"Releasing the assigned resource offers since only partial tasks can " + + log"be launched. Waiting for later round resource offers.") taskSet.lastResourceOfferFailLogTime = curTime } barrierPendingLaunchTasks.foreach { task => @@ -722,8 +726,8 @@ private[spark] class TaskSchedulerImpl( .mkString(",") addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - logInfo(s"Successfully scheduled all the ${addressesWithDescs.length} tasks for " + - s"barrier stage ${taskSet.stageId}.") + logInfo(log"Successfully scheduled all the ${MDC(LogKeys.NUM_TASKS, addressesWithDescs.length)} " + + log"tasks for barrier stage ${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.") } taskSet.barrierPendingLaunchTasks.clear() } @@ -743,8 +747,8 @@ private[spark] class TaskSchedulerImpl( taskIndex: Int): Unit = { val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout - logInfo(s"Waiting for $timeout ms for completely " + - s"excluded task to be schedulable again before aborting stage ${taskSet.stageId}.") + logInfo(log"Waiting for ${MDC(LogKeys.TIMEOUT, timeout)} ms for completely " + + log"excluded task to be schedulable again before aborting stage ${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.") abortTimer.schedule( createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout, TimeUnit.MILLISECONDS) } @@ -756,8 +760,8 @@ private[spark] class TaskSchedulerImpl( override def run(): Unit = TaskSchedulerImpl.this.synchronized { if (unschedulableTaskSetToExpiryTime.contains(taskSet) && unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis()) { - logInfo("Cannot schedule any task because all executors excluded due to failures. " + - s"Wait time for scheduling expired. Aborting stage ${taskSet.stageId}.") + logInfo(log"Cannot schedule any task because all executors excluded due to failures. " + + log"Wait time for scheduling expired. Aborting stage ${MDC(LogKeys.STAGE_ID, taskSet.stageId)}.") taskSet.abortSinceCompletelyExcludedOnFailure(taskIndex) } else { this.cancel() @@ -1043,7 +1047,8 @@ private[spark] class TaskSchedulerImpl( } override def workerRemoved(workerId: String, host: String, message: String): Unit = { - logInfo(s"Handle removed worker $workerId: $message") + logInfo(log"Handle removed worker ${MDC(LogKeys.WORKER_ID, workerId)}: " + + log"${MDC(LogKeys.MESSAGE, message)}") dagScheduler.workerRemoved(workerId, host, message) } @@ -1054,10 +1059,12 @@ private[spark] class TaskSchedulerImpl( case LossReasonPending => logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.") case ExecutorKilled => - logInfo(s"Executor $executorId on $hostPort killed by driver.") + logInfo(log"Executor ${MDC(LogKeys.EXECUTOR_ID, executorId)} on " + + log"${MDC(LogKeys.HOST_PORT, hostPort)} killed by driver.") case _: ExecutorDecommission => - logInfo(s"Executor $executorId on $hostPort is decommissioned" + - s"${getDecommissionDuration(executorId)}.") + logInfo(log"Executor ${MDC(LogKeys.EXECUTOR_ID, executorId)} on " + + log"${MDC(LogKeys.HOST_PORT, hostPort)} is decommissioned" + + log"${MDC(DURATION, getDecommissionDuration(executorId))}.") case _ => logError(log"Lost executor ${MDC(LogKeys.EXECUTOR_ID, executorId)} on " + log"${MDC(LogKeys.HOST, hostPort)}: ${MDC(LogKeys.REASON, reason)}") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala index f479e5e32bc2..c9aa74e0852b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala @@ -19,8 +19,7 @@ package org.apache.spark.scheduler import scala.collection.mutable.{HashMap, HashSet} import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.util.Clock /** @@ -134,7 +133,8 @@ private[scheduler] class TaskSetExcludelist( val numFailures = execFailures.numUniqueTasksWithFailures if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (excludedExecs.add(exec)) { - logInfo(s"Excluding executor ${exec} for stage $stageId") + logInfo(log"Excluding executor ${MDC(LogKeys.EXECUTOR_ID, exec)} for stage " + + log"${MDC(LogKeys.STAGE_ID, stageId)}") // This executor has been excluded for this stage. Let's check if it // the whole node should be excluded. val excludedExecutorsOnNode = @@ -149,7 +149,8 @@ private[scheduler] class TaskSetExcludelist( val numFailExec = excludedExecutorsOnNode.size if (numFailExec >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (excludedNodes.add(host)) { - logInfo(s"Excluding ${host} for stage $stageId") + logInfo(log"Excluding ${MDC(LogKeys.HOST, host)} for " + + log"stage ${MDC(LogKeys.STAGE_ID, stageId)}") // SparkListenerNodeBlacklistedForStage is deprecated but post both events // to keep backward compatibility listenerBus.post( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b7ff443231f3..6573ab2f23d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -30,8 +30,7 @@ import org.apache.spark.InternalAccumulator import org.apache.spark.InternalAccumulator.{input, shuffleRead} import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.SchedulingMode._ @@ -280,8 +279,9 @@ private[spark] class TaskSetManager( for (e <- set) { pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index } - logInfo(s"Pending task $index has a cached location at ${e.host} " + - ", where there are executors " + set.mkString(",")) + logInfo(log"Pending task ${MDC(INDEX, index)} has a cached location at " + + log"${MDC(HOST, e.host)}, where there are executors " + + log"${MDC(EXECUTOR_IDS, set.mkString(","))}") case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } @@ -554,10 +554,16 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val tName = taskName(taskId) - logInfo(s"Starting $tName ($host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes) " + - (if (taskResourceAssignments.nonEmpty) s"taskResourceAssignments ${taskResourceAssignments}" - else "")) + logInfo(log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," + + log"executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}, " + + log"partition ${MDC(PARTITION_ID, task.partitionId)}, " + + log"${MDC(TASK_LOCALITY, taskLocality)}, " + + log"${MDC(SIZE, serializedTask.limit())} bytes) " + + (if (taskResourceAssignments.nonEmpty) { + log"taskResourceAssignments ${MDC(TASK_RESOURCE_ASSIGNMENTS, taskResourceAssignments)}" + } else { + log"" + })) sched.dagScheduler.taskStarted(task, info) new TaskDescription( @@ -829,8 +835,11 @@ private[spark] class TaskSetManager( // Kill any other attempts for the same task (since those are unnecessary now that one // attempt completed successfully). for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { - logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for ${taskName(attemptInfo.taskId)}" + - s" on ${attemptInfo.host} as the attempt ${info.attemptNumber} succeeded on ${info.host}") + logInfo(log"Killing attempt ${MDC(NUM_ATTEMPT, attemptInfo.attemptNumber)} for " + + log"${MDC(TASK_NAME, taskName(attemptInfo.taskId))} on " + + log"${MDC(HOST, attemptInfo.host)} as the attempt " + + log"${MDC(TASK_ATTEMPT_ID, info.attemptNumber)} succeeded on " + + log"${MDC(HOST, info.host)}") killedByOtherAttempt += attemptInfo.taskId sched.backend.killTask( attemptInfo.taskId, @@ -840,8 +849,10 @@ private[spark] class TaskSetManager( } if (!successful(index)) { tasksSuccessful += 1 - logInfo(s"Finished ${taskName(info.taskId)} in ${info.duration} ms " + - s"on ${info.host} (executor ${info.executorId}) ($tasksSuccessful/$numTasks)") + logInfo(log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " + + log"${MDC(DURATION, info.duration)} ms on ${MDC(HOST, info.host)} " + + log"(executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}) " + + log"(${MDC(NUM_SUCCESSFUL_TASKS, tasksSuccessful)}/${MDC(NUM_TASKS, numTasks)})") // Mark successful and stop if all the tasks have succeeded. successful(index) = true numFailures(index) = 0 @@ -849,8 +860,9 @@ private[spark] class TaskSetManager( isZombie = true } } else { - logInfo(s"Ignoring task-finished event for ${taskName(info.taskId)} " + - s"because it has already completed successfully") + logInfo(log"Ignoring task-finished event for " + + log"${MDC(TASK_NAME, taskName(info.taskId))} " + + log"because it has already completed successfully") } // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not @@ -1007,8 +1019,10 @@ private[spark] class TaskSetManager( logWarning(failureReason) } else { logInfo( - s"Lost $task on ${info.host}, executor ${info.executorId}: " + - s"${ef.className} (${ef.description}) [duplicate $dupCount]") + log"Lost ${MDC(TASK_NAME, task)} on ${MDC(HOST, info.host)}, " + + log"executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}: " + + log"${MDC(CLASS_NAME, ef.className)} " + + log"(${MDC(DESCRIPTION, ef.description)}) [duplicate ${MDC(COUNT, dupCount)}]") } ef.exception @@ -1020,9 +1034,9 @@ private[spark] class TaskSetManager( None case e: ExecutorLostFailure if !e.exitCausedByApp => - logInfo(s"${taskName(tid)} failed because while it was being computed, its executor " + - "exited for a reason unrelated to the task. Not counting this failure towards the " + - "maximum number of failures for the task.") + logInfo(log"${MDC(TASK_NAME, taskName(tid))} failed because while it was being computed," + + log" its executor exited for a reason unrelated to the task. " + + log"Not counting this failure towards the maximum number of failures for the task.") None case _: TaskFailedReason => // TaskResultLost and others @@ -1052,10 +1066,10 @@ private[spark] class TaskSetManager( } if (successful(index)) { - logInfo(s"${taskName(info.taskId)} failed, but the task will not" + - " be re-executed (either because the task failed with a shuffle data fetch failure," + - " so the previous stage needs to be re-run, or because a different copy of the task" + - " has already succeeded).") + logInfo(log"${MDC(LogKeys.TASK_NAME, taskName(info.taskId))} failed, but the task will not" + + log" be re-executed (either because the task failed with a shuffle data fetch failure," + + log" so the previous stage needs to be re-run, or because a different copy of the task" + + log" has already succeeded).") } else { addPendingTask(index) } @@ -1238,9 +1252,10 @@ private[spark] class TaskSetManager( if (speculated) { addPendingTask(index, speculatable = true) logInfo( - ("Marking task %d in stage %s (on %s) as speculatable because it ran more" + - " than %.0f ms(%d speculatable tasks in this taskset now)") - .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) + log"Marking task ${MDC(INDEX, index)} in stage ${MDC(STAGE_ID, taskSet.id)} (on " + + log"${MDC(HOST, info.host)}) as speculatable because it ran more than " + + log"${MDC(TIMEOUT, threshold)} ms(${MDC(NUM_TASKS, speculatableTasks.size + 1)}" + + log"speculatable tasks in this taskset now)") speculatableTasks += index sched.dagScheduler.speculativeTaskSubmitted(tasks(index), index) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d359b65caa93..deaa1b4e4790 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -42,6 +42,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -258,7 +259,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // If the cluster manager gives us an executor on an excluded node (because it // already started allocating those resources before we informed it of our exclusion, // or if it ignored our exclusion), then we reject that executor immediately. - logInfo(s"Rejecting $executorId as it has been excluded.") + logInfo(log"Rejecting ${MDC(LogKeys.EXECUTOR_ID, executorId)} as it has been excluded.") context.sendFailure( new IllegalStateException(s"Executor is excluded due to failures: $executorId")) } else { @@ -269,8 +270,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } else { context.senderAddress } - logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId, " + - s" ResourceProfileId $resourceProfileId") + logInfo(log"Registered executor ${MDC(LogKeys.RPC_ENDPOINT_REF, executorRef)} " + + log"(${MDC(LogKeys.RPC_ADDRESS, executorAddress)}) " + + log"with ID ${MDC(LogKeys.EXECUTOR_ID, executorId)}, " + + log"ResourceProfileId ${MDC(LogKeys.RESOURCE_PROFILE_ID, resourceProfileId)}") addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) @@ -324,7 +327,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case UpdateExecutorsLogLevel(logLevel) => currentLogLevel = Some(logLevel) - logInfo(s"Asking each executor to refresh the log level to $logLevel") + logInfo(log"Asking each executor to refresh the log level to " + + log"${MDC(LogKeys.LOG_LEVEL, logLevel)}") for ((_, executorData) <- executorDataMap) { executorData.executorEndpoint.send(UpdateExecutorLogLevel(logLevel)) } @@ -497,7 +501,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // forever. Therefore, we should also post `SparkListenerExecutorRemoved` here. listenerBus.post(SparkListenerExecutorRemoved( System.currentTimeMillis(), executorId, reason.toString)) - logInfo(s"Asked to remove non-existent executor $executorId") + logInfo( + log"Asked to remove non-existent executor ${MDC(LogKeys.EXECUTOR_ID, executorId)}") } } @@ -526,7 +531,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - logInfo(s"Disabling executor $executorId.") + logInfo(log"Disabling executor ${MDC(LogKeys.EXECUTOR_ID, executorId)}.") scheduler.executorLost(executorId, LossReasonPending) } @@ -570,7 +575,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp return executorsToDecommission.toImmutableArraySeq } - logInfo(s"Decommission executors: ${executorsToDecommission.mkString(", ")}") + logInfo(log"Decommission executors: " + + log"${MDC(LogKeys.EXECUTOR_IDS, executorsToDecommission.mkString(", "))}") // If we don't want to replace the executors we are decommissioning if (adjustTargetNumExecutors) { @@ -589,7 +595,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (!triggeredByExecutor) { executorsToDecommission.foreach { executorId => - logInfo(s"Notify executor $executorId to decommission.") + logInfo(log"Notify executor ${MDC(LogKeys.EXECUTOR_ID, executorId)} to decommission.") executorDataMap(executorId).executorEndpoint.send(DecommissionExecutor) } } @@ -601,7 +607,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorsToDecommission.filter(executorsPendingDecommission.contains) } if (stragglers.nonEmpty) { - logInfo(s"${stragglers.toList} failed to decommission in ${cleanupInterval}, killing.") + logInfo( + log"${MDC(LogKeys.EXECUTOR_IDS, stragglers.toList)} failed to decommission in " + + log"${MDC(LogKeys.INTERVAL, cleanupInterval)}, killing.") killExecutors(stragglers.toImmutableArraySeq, false, false, true) } } @@ -718,13 +726,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def isReady(): Boolean = { if (sufficientResourcesRegistered()) { - logInfo("SchedulerBackend is ready for scheduling beginning after " + - s"reached minRegisteredResourcesRatio: $minRegisteredRatio") + logInfo(log"SchedulerBackend is ready for scheduling beginning after " + + log"reached minRegisteredResourcesRatio: ${MDC(LogKeys.MIN_SIZE, minRegisteredRatio)}") return true } if ((System.nanoTime() - createTimeNs) >= maxRegisteredWaitingTimeNs) { - logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeNs(ns)") + logInfo(log"SchedulerBackend is ready for scheduling beginning after waiting " + + log"maxRegisteredResourcesWaitingTime: " + + log"${MDC(LogKeys.TIMEOUT, maxRegisteredWaitingTimeNs / NANOS_PER_MILLIS.toDouble)}(ms)") return true } false @@ -801,7 +810,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp "Attempted to request a negative number of additional executor(s) " + s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") } - logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") + logInfo(log"Requesting ${MDC(LogKeys.NUM_EXECUTORS, numAdditionalExecutors)} additional " + + log"executor(s) from the cluster manager") val response = synchronized { val defaultProf = scheduler.sc.resourceProfileManager.defaultResourceProfile @@ -951,7 +961,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp adjustTargetNumExecutors: Boolean, countFailures: Boolean, force: Boolean): Seq[String] = { - logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") + logInfo( + log"Requesting to kill executor(s) ${MDC(LogKeys.EXECUTOR_IDS, executorIds.mkString(", "))}") val response = withLock { val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) @@ -966,7 +977,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures } - logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") + logInfo(log"Actual list of executor(s) to be killed is " + + log"${MDC(LogKeys.EXECUTOR_IDS, executorsToKill.mkString(", "))}") // If we do not wish to replace the executors we kill, sync the target number of executors // with the cluster manager to avoid allocating new ones. When computing the new target, @@ -1007,7 +1019,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the decommission request is acknowledged. */ final override def decommissionExecutorsOnHost(host: String): Boolean = { - logInfo(s"Requesting to kill any and all executors on host $host") + logInfo(log"Requesting to kill any and all executors on host ${MDC(LogKeys.HOST, host)}") // A potential race exists if a new executor attempts to register on a host // that is on the exclude list and is no longer valid. To avoid this race, // all executor registration and decommissioning happens in the event loop. This way, either @@ -1023,7 +1035,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. */ final override def killExecutorsOnHost(host: String): Boolean = { - logInfo(s"Requesting to kill any and all executors on host $host") + logInfo(log"Requesting to kill any and all executors on host ${MDC(LogKeys.HOST, host)}") // A potential race exists if a new executor attempts to register on a host // that is on the exclude list and is no longer valid. To avoid this race, // all executor registration and killing happens in the event loop. This way, either diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8f15dec6739a..f4caecd7d674 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys.REASON +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.internal.config.EXECUTOR_REMOVE_DELAY import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} @@ -145,7 +144,7 @@ private[spark] class StandaloneSchedulerBackend( } override def connected(appId: String): Unit = { - logInfo("Connected to Spark cluster with app ID " + appId) + logInfo(log"Connected to Spark cluster with app ID ${MDC(LogKeys.APP_ID, appId)}") this.appId = appId notifyContext() launcherBackend.setAppId(appId) @@ -162,7 +161,7 @@ private[spark] class StandaloneSchedulerBackend( notifyContext() if (!stopping.get) { launcherBackend.setState(SparkAppHandle.State.KILLED) - logError(log"Application has been killed. Reason: ${MDC(REASON, reason)}") + logError(log"Application has been killed. Reason: ${MDC(LogKeys.REASON, reason)}") try { scheduler.error(reason) } finally { @@ -174,8 +173,9 @@ private[spark] class StandaloneSchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit = { - logInfo("Granted executor ID %s on hostPort %s with %d core(s), %s RAM".format( - fullId, hostPort, cores, Utils.megabytesToString(memory))) + logInfo(log"Granted executor ID ${MDC(LogKeys.EXECUTOR_ID, fullId)} on hostPort " + + log"${MDC(LogKeys.HOST_PORT, hostPort)} with ${MDC(LogKeys.NUM_CORES, cores)} core(s), " + + log"${MDC(LogKeys.MEMORY_SIZE, Utils.megabytesToString(memory))} RAM") } override def executorRemoved( @@ -192,23 +192,28 @@ private[spark] class StandaloneSchedulerBackend( case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) case None => ExecutorProcessLost(message, workerHost, causedByApp = workerHost.isEmpty) } - logInfo("Executor %s removed: %s".format(fullId, message)) + logInfo( + log"Executor ${MDC(LogKeys.EXECUTOR_ID, fullId)} removed: ${MDC(LogKeys.MESSAGE, message)}") removeExecutor(fullId.split("/")(1), reason) } override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - logInfo(s"Asked to decommission executor $fullId") + logInfo(log"Asked to decommission executor ${MDC(LogKeys.EXECUTOR_ID, fullId)}") val execId = fullId.split("/")(1) decommissionExecutors( Array((execId, decommissionInfo)), adjustTargetNumExecutors = false, triggeredByExecutor = false) - logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) + logInfo( + log"Executor ${MDC(LogKeys.EXECUTOR_ID, fullId)} " + + log"decommissioned: ${MDC(LogKeys.DESCRIPTION, decommissionInfo)}" + ) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { - logInfo("Worker %s removed: %s".format(workerId, message)) + logInfo(log"Worker ${MDC(LogKeys.WORKER_ID, workerId)} removed: " + + log"${MDC(LogKeys.MESSAGE, message)}") removeWorker(workerId, host, message) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index c389b0c988f4..57505c87f879 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ @@ -342,7 +342,8 @@ private[spark] class ExecutorMonitor( override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId) exec.updateRunningTasks(0) - logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})") + logInfo(log"New executor ${MDC(LogKeys.EXECUTOR_ID, event.executorId)} has registered " + + log"(new total is ${MDC(LogKeys.COUNT, executors.size())})") } private def decrementExecResourceProfileCount(rpId: Int): Unit = { @@ -365,11 +366,14 @@ private[spark] class ExecutorMonitor( } else { metrics.exitedUnexpectedly.inc() } - logInfo(s"Executor ${event.executorId} is removed. Remove reason statistics: (" + - s"gracefully decommissioned: ${metrics.gracefullyDecommissioned.getCount()}, " + - s"decommision unfinished: ${metrics.decommissionUnfinished.getCount()}, " + - s"driver killed: ${metrics.driverKilled.getCount()}, " + - s"unexpectedly exited: ${metrics.exitedUnexpectedly.getCount()}).") + // scalastyle:off line.size.limit + logInfo(log"Executor ${MDC(LogKeys.EXECUTOR_ID, event.executorId)} is removed. " + + log"Remove reason statistics: (gracefully decommissioned: " + + log"${MDC(LogKeys.NUM_DECOMMISSIONED, metrics.gracefullyDecommissioned.getCount())}, " + + log"decommission unfinished: ${MDC(LogKeys.NUM_UNFINISHED_DECOMMISSIONED, metrics.decommissionUnfinished.getCount())}, " + + log"driver killed: ${MDC(LogKeys.NUM_EXECUTORS_KILLED, metrics.driverKilled.getCount())}, " + + log"unexpectedly exited: ${MDC(LogKeys.NUM_EXECUTORS_EXITED, metrics.exitedUnexpectedly.getCount())}).") + // scalastyle:on line.size.limit if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 26e2acf4392c..8655b7231079 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -535,7 +535,7 @@ private[spark] class BlockManager( val priorityClass = conf.get(config.STORAGE_REPLICATION_POLICY) val clazz = Utils.classForName(priorityClass) val ret = clazz.getConstructor().newInstance().asInstanceOf[BlockReplicationPolicy] - logInfo(s"Using $priorityClass for block replication policy") + logInfo(log"Using ${MDC(CLASS_NAME, priorityClass)} for block replication policy") ret } @@ -547,7 +547,7 @@ private[spark] class BlockManager( // the registration with the ESS. Therefore, this registration should be prior to // the BlockManager registration. See SPARK-39647. if (externalShuffleServiceEnabled) { - logInfo(s"external shuffle service port = $externalShuffleServicePort") + logInfo(log"external shuffle service port = ${MDC(PORT, externalShuffleServicePort)}") shuffleServerId = BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) if (!isDriver && !(Utils.isTesting && conf.get(Tests.TEST_SKIP_ESS_REGISTER))) { @@ -585,7 +585,7 @@ private[spark] class BlockManager( } } - logInfo(s"Initialized BlockManager: $blockManagerId") + logInfo(log"Initialized BlockManager: ${MDC(BLOCK_MANAGER_ID, blockManagerId)}") } def shuffleMetricsSource: Source = { @@ -646,7 +646,7 @@ private[spark] class BlockManager( * will be made then. */ private def reportAllBlocks(): Unit = { - logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") + logInfo(log"Reporting ${MDC(NUM_BLOCKS, blockInfoManager.size)} blocks to the master.") for ((blockId, info) <- blockInfoManager.entries) { val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) { @@ -664,7 +664,7 @@ private[spark] class BlockManager( */ def reregister(): Unit = { // TODO: We might need to rate limit re-registering. - logInfo(s"BlockManager $blockManagerId re-registering with master") + logInfo(log"BlockManager ${MDC(BLOCK_MANAGER_ID, blockManagerId)} re-registering with master") val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true) if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) { @@ -875,7 +875,7 @@ private[spark] class BlockManager( droppedMemorySize: Long = 0L): Unit = { val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize) if (needReregister) { - logInfo(s"Got told to re-register updating block $blockId") + logInfo(log"Got told to re-register updating block ${MDC(BLOCK_ID, blockId)}") // Re-registering will report our new block for free. asyncReregister() } @@ -1139,8 +1139,9 @@ private[spark] class BlockManager( None } } - logInfo(s"Read $blockId from the disk of a same host executor is " + - (if (res.isDefined) "successful." else "failed.")) + logInfo( + log"Read ${MDC(BLOCK_ID, blockId)} from the disk of a same host executor is " + + log"${MDC(STATUS, if (res.isDefined) "successful." else "failed.")}") res }.orElse { fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer) @@ -1308,12 +1309,12 @@ private[spark] class BlockManager( def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val local = getLocalValues(blockId) if (local.isDefined) { - logInfo(s"Found block $blockId locally") + logInfo(log"Found block ${MDC(BLOCK_ID, blockId)} locally") return local } val remote = getRemoteValues[T](blockId) if (remote.isDefined) { - logInfo(s"Found block $blockId remotely") + logInfo(log"Found block ${MDC(BLOCK_ID, blockId)} remotely") return remote } None @@ -1820,7 +1821,8 @@ private[spark] class BlockManager( existingReplicas: Set[BlockManagerId], maxReplicas: Int, maxReplicationFailures: Option[Int] = None): Boolean = { - logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") + logInfo(log"Using ${MDC(BLOCK_MANAGER_ID, blockManagerId)} to pro-actively replicate " + + log"${MDC(BLOCK_ID, blockId)}") blockInfoManager.lockForReading(blockId).forall { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( @@ -1977,14 +1979,14 @@ private[spark] class BlockManager( private[storage] override def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { - logInfo(s"Dropping block $blockId from memory") + logInfo(log"Dropping block ${MDC(BLOCK_ID, blockId)} from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false val level = info.level // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { - logInfo(s"Writing block $blockId to disk") + logInfo(log"Writing block ${MDC(BLOCK_ID, blockId)} to disk") data() match { case Left(elements) => diskStore.put(blockId) { channel => @@ -2028,7 +2030,7 @@ private[spark] class BlockManager( */ def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. - logInfo(s"Removing RDD $rddId") + logInfo(log"Removing RDD ${MDC(RDD_ID, rddId)}") val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index fc98fbf6e18b..19807453ee28 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -73,14 +73,15 @@ private[storage] class BlockManagerDecommissioner( private def allowRetry(shuffleBlock: ShuffleBlockInfo, failureNum: Int): Boolean = { if (failureNum < maxReplicationFailuresForDecommission) { - logInfo(s"Add $shuffleBlock back to migration queue for " + - s"retry ($failureNum / $maxReplicationFailuresForDecommission)") + logInfo(log"Add ${MDC(SHUFFLE_BLOCK_INFO, shuffleBlock)} back to migration queue for " + + log" retry (${MDC(FAILURES, failureNum)} / " + + log"${MDC(MAX_ATTEMPTS, maxReplicationFailuresForDecommission)})") // The block needs to retry so we should not mark it as finished shufflesToMigrate.add((shuffleBlock, failureNum)) } else { logWarning(log"Give up migrating ${MDC(SHUFFLE_BLOCK_INFO, shuffleBlock)} " + log"since it's been failed for " + - log"${MDC(NUM_FAILURES, maxReplicationFailuresForDecommission)} times") + log"${MDC(MAX_ATTEMPTS, maxReplicationFailuresForDecommission)} times") false } } @@ -98,7 +99,7 @@ private[storage] class BlockManagerDecommissioner( } override def run(): Unit = { - logInfo(s"Starting shuffle block migration thread for $peer") + logInfo(log"Starting shuffle block migration thread for ${MDC(PEER, peer)}") // Once a block fails to transfer to an executor stop trying to transfer more blocks while (keepRunning) { try { @@ -107,10 +108,12 @@ private[storage] class BlockManagerDecommissioner( var isTargetDecommissioned = false // We only migrate a shuffle block when both index file and data file exist. if (blocks.isEmpty) { - logInfo(s"Ignore deleted shuffle block $shuffleBlockInfo") + logInfo(log"Ignore deleted shuffle block ${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}") } else { - logInfo(s"Got migration sub-blocks $blocks. Trying to migrate $shuffleBlockInfo " + - s"to $peer ($retryCount / $maxReplicationFailuresForDecommission)") + logInfo(log"Got migration sub-blocks ${MDC(BLOCK_IDS, blocks)}. Trying to migrate " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)} to ${MDC(PEER, peer)} " + + log"(${MDC(NUM_RETRY, retryCount)} / " + + log"${MDC(MAX_ATTEMPTS, maxReplicationFailuresForDecommission)}") // Migrate the components of the blocks. try { val startTime = System.currentTimeMillis() @@ -130,9 +133,10 @@ private[storage] class BlockManagerDecommissioner( logDebug(s"Migrated sub-block $blockId") } } - logInfo(s"Migrated $shuffleBlockInfo (" + - s"size: ${Utils.bytesToString(blocks.map(b => b._2.size()).sum)}) to $peer " + - s"in ${System.currentTimeMillis() - startTime} ms") + logInfo(log"Migrated ${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)} (" + + log"size: ${MDC(SIZE, Utils.bytesToString(blocks.map(b => b._2.size()).sum))}) " + + log"to ${MDC(PEER, peer)} in " + + log"${MDC(DURATION, System.currentTimeMillis() - startTime)} ms") } catch { case e @ ( _ : IOException | _ : SparkException) => // If a block got deleted before netty opened the file handle, then trying to @@ -181,7 +185,11 @@ private[storage] class BlockManagerDecommissioner( } } catch { case _: InterruptedException => - logInfo(s"Stop shuffle block migration${if (keepRunning) " unexpectedly"}.") + if (keepRunning) { + logInfo("Stop shuffle block migration unexpectedly.") + } else { + logInfo("Stop shuffle block migration.") + } keepRunning = false case NonFatal(e) => keepRunning = false @@ -234,12 +242,16 @@ private[storage] class BlockManagerDecommissioner( logInfo("Attempting to migrate all cached RDD blocks") rddBlocksLeft = decommissionRddCacheBlocks() lastRDDMigrationTime = startTime - logInfo(s"Finished current round RDD blocks migration, " + - s"waiting for ${sleepInterval}ms before the next round migration.") + logInfo(log"Finished current round RDD blocks migration, " + + log"waiting for ${MDC(SLEEP_TIME, sleepInterval)}ms before the next round migration.") Thread.sleep(sleepInterval) } catch { case _: InterruptedException => - logInfo(s"Stop RDD blocks migration${if (!stopped && !stoppedRDD) " unexpectedly"}.") + if (!stopped && !stoppedRDD) { + logInfo("Stop RDD blocks migration unexpectedly.") + } else { + logInfo("Stop RDD blocks migration.") + } stoppedRDD = true case NonFatal(e) => logError("Error occurred during RDD blocks migration.", e) @@ -265,8 +277,9 @@ private[storage] class BlockManagerDecommissioner( val startTime = System.nanoTime() shuffleBlocksLeft = refreshMigratableShuffleBlocks() lastShuffleMigrationTime = startTime - logInfo(s"Finished current round refreshing migratable shuffle blocks, " + - s"waiting for ${sleepInterval}ms before the next round refreshing.") + logInfo(log"Finished current round refreshing migratable shuffle blocks, " + + log"waiting for ${MDC(SLEEP_TIME, sleepInterval)}ms before the " + + log"next round refreshing.") Thread.sleep(sleepInterval) } catch { case _: InterruptedException if stopped => @@ -302,8 +315,9 @@ private[storage] class BlockManagerDecommissioner( shufflesToMigrate.addAll(newShufflesToMigrate.map(x => (x, 0)).asJava) migratingShuffles ++= newShufflesToMigrate val remainedShuffles = migratingShuffles.size - numMigratedShuffles.get() - logInfo(s"${newShufflesToMigrate.size} of ${localShuffles.size} local shuffles " + - s"are added. In total, $remainedShuffles shuffles are remained.") + logInfo(log"${MDC(COUNT, newShufflesToMigrate.size)} of " + + log"${MDC(TOTAL, localShuffles.size)} local shuffles are added. " + + log"In total, ${MDC(NUM_REMAINED, remainedShuffles)} shuffles are remained.") // Update the threads doing migrations val livePeerSet = bm.getPeers(false).toSet @@ -350,8 +364,9 @@ private[storage] class BlockManagerDecommissioner( // Refresh peers and validate we have somewhere to move blocks. if (replicateBlocksInfo.nonEmpty) { - logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + - "for block manager decommissioning") + logInfo( + log"Need to replicate ${MDC(NUM_REPLICAS, replicateBlocksInfo.size)} RDD blocks " + + log"for block manager decommissioning") } else { logWarning("Asked to decommission RDD cache blocks, but no blocks to migrate") return false @@ -378,9 +393,10 @@ private[storage] class BlockManagerDecommissioner( blockToReplicate.maxReplicas, maxReplicationFailures = Some(maxReplicationFailuresForDecommission)) if (replicatedSuccessfully) { - logInfo(s"Block ${blockToReplicate.blockId} migrated successfully, Removing block now") + logInfo(log"Block ${MDC(BLOCK_ID, blockToReplicate.blockId)} migrated " + + log"successfully, Removing block now") bm.removeBlock(blockToReplicate.blockId) - logInfo(s"Block ${blockToReplicate.blockId} removed") + logInfo(log"Block ${MDC(BLOCK_ID, blockToReplicate.blockId)} removed") } else { logWarning(log"Failed to migrate block ${MDC(BLOCK_ID, blockToReplicate.blockId)}") } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 95af44deef93..276bd63e1423 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -42,7 +42,7 @@ class BlockManagerMaster( /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String): Unit = { tell(RemoveExecutor(execId)) - logInfo("Removed " + execId + " successfully in removeExecutor") + logInfo(log"Removed ${MDC(EXECUTOR_ID, execId)} successfully in removeExecutor") } /** Decommission block managers corresponding to given set of executors @@ -62,7 +62,7 @@ class BlockManagerMaster( */ def removeExecutorAsync(execId: String): Unit = { driverEndpoint.ask[Boolean](RemoveExecutor(execId)) - logInfo("Removal of executor " + execId + " requested") + logInfo(log"Removal of executor ${MDC(EXECUTOR_ID, execId)} requested") } /** @@ -77,7 +77,7 @@ class BlockManagerMaster( maxOffHeapMemSize: Long, storageEndpoint: RpcEndpointRef, isReRegister: Boolean = false): BlockManagerId = { - logInfo(s"Registering BlockManager $id") + logInfo(log"Registering BlockManager ${MDC(BLOCK_MANAGER_ID, id)}") val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager( id, @@ -90,9 +90,9 @@ class BlockManagerMaster( ) if (updatedId.executorId == BlockManagerId.INVALID_EXECUTOR_ID) { assert(isReRegister, "Got invalid executor id from non re-register case") - logInfo(s"Re-register BlockManager $id failed") + logInfo(log"Re-register BlockManager ${MDC(BLOCK_MANAGER_ID, id)} failed") } else { - logInfo(s"Registered BlockManager $updatedId") + logInfo(log"Registered BlockManager ${MDC(BLOCK_MANAGER_ID, updatedId)}") } updatedId } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index be7082807182..73f89ea0e86e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -110,7 +110,7 @@ class BlockManagerMasterEndpoint( val clazz = Utils.classForName(topologyMapperClassName) val mapper = clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper] - logInfo(s"Using $topologyMapperClassName for getting topology information") + logInfo(log"Using ${MDC(CLASS_NAME, topologyMapperClassName)} for getting topology information") mapper } @@ -218,7 +218,8 @@ class BlockManagerMasterEndpoint( // executor is notified(see BlockManager.decommissionSelf), so we don't need to send the // notification here. val bms = executorIds.flatMap(blockManagerIdByExecutor.get) - logInfo(s"Mark BlockManagers (${bms.mkString(", ")}) as being decommissioning.") + logInfo(log"Mark BlockManagers (${MDC(BLOCK_MANAGER_IDS, bms.mkString(", "))}) as " + + log"being decommissioning.") decommissioningBlockManagerSet ++= bms context.reply(true) @@ -535,7 +536,7 @@ class BlockManagerMasterEndpoint( } listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) - logInfo(s"Removing block manager $blockManagerId") + logInfo(log"Removing block manager ${MDC(BLOCK_MANAGER_ID, blockManagerId)}") } @@ -551,7 +552,7 @@ class BlockManagerMasterEndpoint( } private def removeExecutor(execId: String): Unit = { - logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") + logInfo(log"Trying to remove executor ${MDC(EXECUTOR_ID, execId)} from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } @@ -707,8 +708,9 @@ class BlockManagerMasterEndpoint( removeExecutor(id.executorId) case None => } - logInfo("Registering block manager %s with %s RAM, %s".format( - id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id)) + logInfo(log"Registering block manager ${MDC(HOST_PORT, id.hostPort)} with " + + log"${MDC(MEMORY_SIZE, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize))} RAM, " + + log"${MDC(BLOCK_MANAGER_ID, id)}") blockManagerIdByExecutor(id.executorId) = id @@ -738,8 +740,8 @@ class BlockManagerMasterEndpoint( assert(!blockManagerInfo.contains(id), "BlockManager re-registration shouldn't succeed when the executor is lost") - logInfo(s"BlockManager ($id) re-registration is rejected since " + - s"the executor (${id.executorId}) has been lost") + logInfo(log"BlockManager (${MDC(BLOCK_MANAGER_ID, id)}) re-registration is rejected since " + + log"the executor (${MDC(EXECUTOR_ID, id.executorId)}) has been lost") // Use "invalid" as the return executor id to indicate the block manager that // re-registration failed. It's a bit hacky but fine since the returned block @@ -1057,26 +1059,30 @@ private[spark] class BlockManagerInfo( _blocks.put(blockId, blockStatus) _remainingMem -= memSize if (blockExists) { - logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" + - s" (current size: ${Utils.bytesToString(memSize)}," + - s" original size: ${Utils.bytesToString(originalMemSize)}," + - s" free: ${Utils.bytesToString(_remainingMem)})") + logInfo(log"Updated ${MDC(BLOCK_ID, blockId)} in memory on " + + log"${MDC(HOST_PORT, blockManagerId.hostPort)} (current size: " + + log"${MDC(CURRENT_MEMORY_SIZE, Utils.bytesToString(memSize))}, original " + + log"size: ${MDC(ORIGINAL_MEMORY_SIZE, Utils.bytesToString(originalMemSize))}, " + + log"free: ${MDC(FREE_MEMORY_SIZE, Utils.bytesToString(_remainingMem))})") } else { - logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" + - s" (size: ${Utils.bytesToString(memSize)}," + - s" free: ${Utils.bytesToString(_remainingMem)})") + logInfo(log"Added ${MDC(BLOCK_ID, blockId)} in memory on " + + log"${MDC(HOST_PORT, blockManagerId.hostPort)} " + + log"(size: ${MDC(CURRENT_MEMORY_SIZE, Utils.bytesToString(memSize))}, " + + log"free: ${MDC(FREE_MEMORY_SIZE, Utils.bytesToString(_remainingMem))})") } } if (storageLevel.useDisk) { blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) _blocks.put(blockId, blockStatus) if (blockExists) { - logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" + - s" (current size: ${Utils.bytesToString(diskSize)}," + - s" original size: ${Utils.bytesToString(originalDiskSize)})") + logInfo(log"Updated ${MDC(BLOCK_ID, blockId)} on disk on " + + log"${MDC(HOST_PORT, blockManagerId.hostPort)} " + + log"(current size: ${MDC(CURRENT_DISK_SIZE, Utils.bytesToString(diskSize))}," + + log" original size: ${MDC(ORIGINAL_DISK_SIZE, Utils.bytesToString(originalDiskSize))})") } else { - logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" + - s" (size: ${Utils.bytesToString(diskSize)})") + logInfo(log"Added ${MDC(BLOCK_ID, blockId)} on disk on " + + log"${MDC(HOST_PORT, blockManagerId.hostPort)} (size: " + + log"${MDC(CURRENT_DISK_SIZE, Utils.bytesToString(diskSize))})") } } @@ -1092,13 +1098,15 @@ private[spark] class BlockManagerInfo( blockStatus.remove(blockId) } if (originalLevel.useMemory) { - logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" + - s" (size: ${Utils.bytesToString(originalMemSize)}," + - s" free: ${Utils.bytesToString(_remainingMem)})") + logInfo(log"Removed ${MDC(BLOCK_ID, blockId)} on " + + log"${MDC(HOST_PORT, blockManagerId.hostPort)} in memory " + + log"(size: ${MDC(ORIGINAL_MEMORY_SIZE, Utils.bytesToString(originalMemSize))}, " + + log"free: ${MDC(FREE_MEMORY_SIZE, Utils.bytesToString(_remainingMem))})") } if (originalLevel.useDisk) { - logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" + - s" (size: ${Utils.bytesToString(originalDiskSize)})") + logInfo(log"Removed ${MDC(BLOCK_ID, blockId)} on " + + log"${MDC(HOST_PORT, blockManagerId.hostPort)} on disk" + + log" (size: ${MDC(ORIGINAL_DISK_SIZE, Utils.bytesToString(originalDiskSize))})") } } } diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index 059bb5271410..8a3ca3066961 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -342,7 +342,8 @@ private class PushBasedFetchHelper( // Fallback for all the pending fetch requests val pendingShuffleChunks = iterator.removePendingChunks(shuffleChunkId, address) pendingShuffleChunks.foreach { pendingBlockId => - logInfo(s"Falling back immediately for shuffle chunk $pendingBlockId") + logInfo( + log"Falling back immediately for shuffle chunk ${MDC(BLOCK_ID, pendingBlockId)}") shuffleMetrics.incMergedFetchFallbackCount(1) val bitmapOfPendingChunk: RoaringBitmap = chunksMetaMap.remove(pendingBlockId).get chunkBitmap.or(bitmapOfPendingChunk) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index b89d24f4c6dd..ff1799d8ff3e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -342,8 +342,8 @@ final class ShuffleBlockFetcherIterator( if (isNettyOOMOnShuffle.compareAndSet(false, true)) { // The fetcher can fail remaining blocks in batch for the same error. So we only // log the warning once to avoid flooding the logs. - logInfo(s"Block $blockId has failed $failureTimes times " + - s"due to Netty OOM, will retry") + logInfo(log"Block ${MDC(BLOCK_ID, blockId)} has failed " + + log"${MDC(FAILURES, failureTimes)} times due to Netty OOM, will retry") } remainingBlocks -= blockId deferredBlocks += blockId @@ -448,14 +448,17 @@ final class ShuffleBlockFetcherIterator( s"the number of host-local blocks ${numHostLocalBlocks} " + s"the number of push-merged-local blocks ${pushMergedLocalBlocks.size} " + s"+ the number of remote blocks ${numRemoteBlocks} ") - logInfo(s"Getting $blocksToFetchCurrentIteration " + - s"(${Utils.bytesToString(totalBytes)}) non-empty blocks including " + - s"${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local and " + - s"${numHostLocalBlocks} (${Utils.bytesToString(hostLocalBlockBytes)}) " + - s"host-local and ${pushMergedLocalBlocks.size} " + - s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " + - s"push-merged-local and $numRemoteBlocks (${Utils.bytesToString(remoteBlockBytes)}) " + - s"remote blocks") + logInfo( + log"Getting ${MDC(NUM_BLOCKS, blocksToFetchCurrentIteration)} " + + log"(${MDC(TOTAL_SIZE, Utils.bytesToString(totalBytes))}) non-empty blocks including " + + log"${MDC(NUM_LOCAL_BLOCKS, localBlocks.size)} " + + log"(${MDC(LOCAL_BLOCKS_SIZE, Utils.bytesToString(localBlockBytes))}) local and " + + log"${MDC(NUM_HOST_LOCAL_BLOCKS, numHostLocalBlocks)} " + + log"(${MDC(HOST_LOCAL_BLOCKS_SIZE, Utils.bytesToString(hostLocalBlockBytes))}) " + + log"host-local and ${MDC(NUM_PUSH_MERGED_LOCAL_BLOCKS, pushMergedLocalBlocks.size)} " + + log"(${MDC(PUSH_MERGED_LOCAL_BLOCKS_SIZE, Utils.bytesToString(pushMergedLocalBlockBytes))})" + + log" push-merged-local and ${MDC(NUM_REMOTE_BLOCKS, numRemoteBlocks)} " + + log"(${MDC(REMOTE_BLOCKS_SIZE, Utils.bytesToString(remoteBlockBytes))}) remote blocks") this.hostLocalBlocks ++= hostLocalBlocksByExecutor.values .flatMap { infos => infos.map(info => (info._1, info._3)) } collectedRemoteRequests @@ -719,8 +722,10 @@ final class ShuffleBlockFetcherIterator( val numDeferredRequest = deferredFetchRequests.values.map(_.size).sum val numFetches = remoteRequests.size - fetchRequests.size - numDeferredRequest - logInfo(s"Started $numFetches remote fetches in ${Utils.getUsedTimeNs(startTimeNs)}" + - (if (numDeferredRequest > 0 ) s", deferred $numDeferredRequest requests" else "")) + logInfo(log"Started ${MDC(COUNT, numFetches)} remote fetches in " + + log"${MDC(DURATION, Utils.getUsedTimeNs(startTimeNs))}" + + (if (numDeferredRequest > 0) log", deferred ${MDC(NUM_REQUESTS, numDeferredRequest)} requests" + else log"")) // Get Local Blocks fetchLocalBlocks(localBlocks) @@ -1141,7 +1146,8 @@ final class ShuffleBlockFetcherIterator( case otherCause => s"Block $blockId is corrupted due to $otherCause" } - logInfo(s"Finished corruption diagnosis in $duration ms. $diagnosisResponse") + logInfo(log"Finished corruption diagnosis in ${MDC(DURATION, duration)} ms. " + + log"${MDC(STATUS, diagnosisResponse)}") diagnosisResponse case shuffleBlockChunk: ShuffleBlockChunkId => // TODO SPARK-36284 Add shuffle checksum support for push-based shuffle @@ -1277,7 +1283,8 @@ final class ShuffleBlockFetcherIterator( originalLocalBlocks, originalHostLocalBlocksByExecutor, originalMergedLocalBlocks) // Add the remote requests into our queue in a random order fetchRequests ++= Utils.randomize(originalRemoteReqs) - logInfo(s"Created ${originalRemoteReqs.size} fallback remote requests for push-merged") + logInfo(log"Created ${MDC(NUM_REQUESTS, originalRemoteReqs.size)} fallback remote requests " + + log"for push-merged") // fetch all the fallback blocks that are local. fetchLocalBlocks(originalLocalBlocks) // Merged local blocks should be empty during fallback diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 1283b9340a45..6746bbd490c4 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -117,7 +117,8 @@ private[spark] class MemoryStore( log"needed to store a block in memory. Please configure Spark with more memory.") } - logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) + logInfo(log"MemoryStore started with capacity " + + log"${MDC(MEMORY_SIZE, Utils.bytesToString(maxMemory))}") /** Total storage memory used including unroll memory, in bytes. */ private def memoryUsed: Long = memoryManager.storageMemoryUsed @@ -158,8 +159,9 @@ private[spark] class MemoryStore( entries.synchronized { entries.put(blockId, entry) } - logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) + logInfo(log"Block ${MDC(BLOCK_ID, blockId)} stored as bytes in memory " + + log"(estimated size ${MDC(SIZE, Utils.bytesToString(size))}, " + + log"free ${MDC(MEMORY_SIZE, Utils.bytesToString(maxMemory - blocksMemoryUsed))})") true } else { false @@ -250,7 +252,8 @@ private[spark] class MemoryStore( // SPARK-45025 - if a thread interrupt was received, we log a warning and return used memory // to avoid getting killed by task reaper eventually. if (shouldCheckThreadInterruption && Thread.currentThread().isInterrupted) { - logInfo(s"Failed to unroll block=$blockId since thread interrupt was received") + logInfo( + log"Failed to unroll block=${MDC(BLOCK_ID, blockId)} since thread interrupt was received") Left(unrollMemoryUsedByThisBlock) } else if (keepUnrolling) { // Make sure that we have enough memory to store the block. By this point, it is possible that @@ -279,8 +282,9 @@ private[spark] class MemoryStore( entries.put(blockId, entry) } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, - Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) + logInfo(log"Block ${MDC(BLOCK_ID, blockId)} stored as values in memory " + + log"(estimated size ${MDC(MEMORY_SIZE, Utils.bytesToString(entry.size))}, free " + + log"${MDC(FREE_MEMORY_SIZE, Utils.bytesToString(maxMemory - blocksMemoryUsed))})") Right(entry.size) } else { // We ran out of space while unrolling the values for this block @@ -521,8 +525,8 @@ private[spark] class MemoryStore( if (freedMemory >= space) { var lastSuccessfulBlock = -1 try { - logInfo(s"${selectedBlocks.size} blocks selected for dropping " + - s"(${Utils.bytesToString(freedMemory)} bytes)") + logInfo(log"${MDC(NUM_BLOCKS, selectedBlocks.size)} blocks selected for dropping " + + log"(${MDC(MEMORY_SIZE, Utils.bytesToString(freedMemory))} bytes)") selectedBlocks.indices.foreach { idx => val blockId = selectedBlocks(idx) val entry = entries.synchronized { @@ -537,8 +541,9 @@ private[spark] class MemoryStore( } lastSuccessfulBlock = idx } - logInfo(s"After dropping ${selectedBlocks.size} blocks, " + - s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") + logInfo( + log"After dropping ${MDC(NUM_BLOCKS, selectedBlocks.size)} blocks, free memory is" + + log"${MDC(FREE_MEMORY_SIZE, Utils.bytesToString(maxMemory - blocksMemoryUsed))}") freedMemory } finally { // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal @@ -553,7 +558,7 @@ private[spark] class MemoryStore( } } else { blockId.foreach { id => - logInfo(s"Will not store $id") + logInfo(log"Will not store ${MDC(BLOCK_ID, id)}") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) @@ -649,11 +654,11 @@ private[spark] class MemoryStore( */ private def logMemoryUsage(): Unit = { logInfo( - s"Memory use = ${Utils.bytesToString(blocksMemoryUsed)} (blocks) + " + - s"${Utils.bytesToString(currentUnrollMemory)} (scratch space shared across " + - s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(memoryUsed)}. " + - s"Storage limit = ${Utils.bytesToString(maxMemory)}." - ) + log"Memory use = ${MDC(CURRENT_MEMORY_SIZE, Utils.bytesToString(blocksMemoryUsed))} " + + log"(blocks) + ${MDC(FREE_MEMORY_SIZE, Utils.bytesToString(currentUnrollMemory))} " + + log"(scratch space shared across ${MDC(NUM_TASKS, numTasksUnrolling)} " + + log"tasks(s)) = ${MDC(STORAGE_MEMORY_SIZE, Utils.bytesToString(memoryUsed))}. " + + log"Storage limit = ${MDC(MAX_MEMORY_SIZE, Utils.bytesToString(maxMemory))}.") } /**