diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 40d3f67a48a7..f90eb4a77071 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -65,7 +65,10 @@ private[spark] object LogKeys { case object ADDED_JARS extends LogKey case object ADMIN_ACLS extends LogKey case object ADMIN_ACL_GROUPS extends LogKey + case object ADVISORY_TARGET_SIZE extends LogKey case object AGGREGATE_FUNCTIONS extends LogKey + case object ALIGNED_FROM_TIME extends LogKey + case object ALIGNED_TO_TIME extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey case object APP_ATTEMPT_ID extends LogKey @@ -77,8 +80,10 @@ private[spark] object LogKeys { case object APP_STATE extends LogKey case object ARCHIVE_NAME extends LogKey case object ARGS extends LogKey + case object ARTIFACT_ID extends LogKey case object ATTRIBUTE_MAP extends LogKey case object AUTH_ENABLED extends LogKey + case object AVG_BATCH_PROC_TIME extends LogKey case object BACKUP_FILE extends LogKey case object BARRIER_EPOCH extends LogKey case object BARRIER_ID extends LogKey @@ -99,6 +104,8 @@ private[spark] object LogKeys { case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object BUCKET extends LogKey case object BYTECODE_SIZE extends LogKey + case object BYTE_BUFFER extends LogKey + case object BYTE_SIZE extends LogKey case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey case object CACHE_AUTO_REMOVED_SIZE extends LogKey case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey @@ -109,6 +116,7 @@ private[spark] object LogKeys { case object CATALOG_NAME extends LogKey case object CATEGORICAL_FEATURES extends LogKey case object CHECKPOINT_FILE extends LogKey + case object CHECKPOINT_INTERVAL extends LogKey case object CHECKPOINT_LOCATION extends LogKey case object CHECKPOINT_PATH extends LogKey case object CHECKPOINT_ROOT extends LogKey @@ -186,6 +194,7 @@ private[spark] object LogKeys { case object DELEGATE extends LogKey case object DELTA extends LogKey case object DEPRECATED_KEY extends LogKey + case object DERIVATIVE extends LogKey case object DESCRIPTION extends LogKey case object DESIRED_NUM_PARTITIONS extends LogKey case object DESIRED_TREE_DEPTH extends LogKey @@ -197,6 +206,7 @@ private[spark] object LogKeys { case object DRIVER_MEMORY_SIZE extends LogKey case object DRIVER_STATE extends LogKey case object DROPPED_PARTITIONS extends LogKey + case object DSTREAM extends LogKey case object DURATION extends LogKey case object EARLIEST_LOADED_VERSION extends LogKey case object EFFECTIVE_STORAGE_LEVEL extends LogKey @@ -251,6 +261,7 @@ private[spark] object LogKeys { case object FEATURE_NAME extends LogKey case object FETCH_SIZE extends LogKey case object FIELD_NAME extends LogKey + case object FILES extends LogKey case object FILE_ABSOLUTE_PATH extends LogKey case object FILE_END_OFFSET extends LogKey case object FILE_FORMAT extends LogKey @@ -307,6 +318,7 @@ private[spark] object LogKeys { case object INIT_MODE extends LogKey case object INPUT extends LogKey case object INPUT_SPLIT extends LogKey + case object INTEGRAL extends LogKey case object INTERVAL extends LogKey case object ISOLATION_LEVEL extends LogKey case object ISSUE_DATE extends LogKey @@ -394,6 +406,7 @@ private[spark] object LogKeys { case object MIN_COMPACTION_BATCH_ID extends LogKey case object MIN_NUM_FREQUENT_PATTERN extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey + case object MIN_RATE extends LogKey case object MIN_SHARE extends LogKey case object MIN_SIZE extends LogKey case object MIN_TIME extends LogKey @@ -490,6 +503,7 @@ private[spark] object LogKeys { case object NUM_PREFIXES extends LogKey case object NUM_PRUNED extends LogKey case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey + case object NUM_RECEIVERS extends LogKey case object NUM_RECORDS_READ extends LogKey case object NUM_RELEASED_LOCKS extends LogKey case object NUM_REMAINED extends LogKey @@ -547,6 +561,7 @@ private[spark] object LogKeys { case object PARTITIONER extends LogKey case object PARTITION_ID extends LogKey case object PARTITION_IDS extends LogKey + case object PARTITION_SIZE extends LogKey case object PARTITION_SPECIFICATION extends LogKey case object PARTITION_SPECS extends LogKey case object PATH extends LogKey @@ -575,6 +590,7 @@ private[spark] object LogKeys { case object PROCESSING_TIME extends LogKey case object PRODUCER_ID extends LogKey case object PROPERTY_NAME extends LogKey + case object PROPORTIONAL extends LogKey case object PROTOCOL_VERSION extends LogKey case object PROVIDER extends LogKey case object PUSHED_FILTERS extends LogKey @@ -595,6 +611,8 @@ private[spark] object LogKeys { case object QUERY_PLAN_LENGTH_MAX extends LogKey case object QUERY_RUN_ID extends LogKey case object RANGE extends LogKey + case object RATE_LIMIT extends LogKey + case object RATIO extends LogKey case object RDD_CHECKPOINT_DIR extends LogKey case object RDD_DEBUG_STRING extends LogKey case object RDD_DESCRIPTION extends LogKey @@ -646,6 +664,8 @@ private[spark] object LogKeys { case object RULE_NAME extends LogKey case object RUN_ID extends LogKey case object SCALA_VERSION extends LogKey + case object SCALING_DOWN_RATIO extends LogKey + case object SCALING_UP_RATIO extends LogKey case object SCHEDULER_POOL_NAME extends LogKey case object SCHEDULING_MODE extends LogKey case object SCHEMA extends LogKey @@ -671,12 +691,14 @@ private[spark] object LogKeys { case object SHUFFLE_SERVICE_NAME extends LogKey case object SIGMAS_LENGTH extends LogKey case object SIGNAL extends LogKey + case object SINK extends LogKey case object SIZE extends LogKey case object SLEEP_TIME extends LogKey case object SLIDE_DURATION extends LogKey case object SMALLEST_CLUSTER_INDEX extends LogKey case object SNAPSHOT_VERSION extends LogKey case object SOCKET_ADDRESS extends LogKey + case object SOURCE extends LogKey case object SOURCE_PATH extends LogKey case object SPARK_BRANCH extends LogKey case object SPARK_BUILD_DATE extends LogKey @@ -708,6 +730,7 @@ private[spark] object LogKeys { case object STORAGE_LEVEL_REPLICATION extends LogKey case object STORAGE_MEMORY_SIZE extends LogKey case object STORE_ID extends LogKey + case object STREAMING_CONTEXT extends LogKey case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey case object STREAMING_DATA_SOURCE_NAME extends LogKey case object STREAMING_OFFSETS_END extends LogKey @@ -729,6 +752,7 @@ private[spark] object LogKeys { case object TARGET_NUM_EXECUTOR extends LogKey case object TARGET_NUM_EXECUTOR_DELTA extends LogKey case object TARGET_PATH extends LogKey + case object TARGET_SIZE extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey case object TASK_INDEX extends LogKey @@ -752,6 +776,7 @@ private[spark] object LogKeys { case object THREAD_POOL_SIZE extends LogKey case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey case object THRESHOLD extends LogKey + case object THRESH_TIME extends LogKey case object TIME extends LogKey case object TIMEOUT extends LogKey case object TIMER extends LogKey @@ -814,4 +839,5 @@ private[spark] object LogKeys { case object XML_SCHEDULING_MODE extends LogKey case object XSD_PATH extends LogKey case object YOUNG_GENERATION_GC extends LogKey + case object ZERO_TIME extends LogKey } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala index 56a9e19a1b78..c3c23740e2fe 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList import scala.jdk.CollectionConverters._ import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan, StreamingQueryEventType} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.client.CloseableIterator import org.apache.spark.sql.streaming.StreamingQueryListener.{Event, QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} @@ -115,7 +115,7 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging { case StreamingQueryEventType.QUERY_TERMINATED_EVENT => postToAll(QueryTerminatedEvent.fromJson(event.getEventJson)) case _ => - logWarning(s"Unknown StreamingQueryListener event: $event") + logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}") } }) } @@ -144,7 +144,10 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging { listener.onQueryIdle(t) case t: QueryTerminatedEvent => listener.onQueryTerminated(t) - case _ => logWarning(s"Unknown StreamingQueryListener event: $event") + case _ => + logWarning( + log"Unknown StreamingQueryListener event: " + + log"${MDC(LogKeys.EVENT, event)}") } } catch { case e: Exception => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala index 94f01026b7a5..d072b56e022a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala @@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.connect.service.ExecuteHolder /** @@ -83,20 +83,30 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex } catch { case NonFatal(e) => logError( - s"[SessionId: $sessionId][UserId: $userId][operationId: " + - s"${executeHolder.operationId}] Error sending listener added response.", + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" + + log"[operationId: " + + log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " + + log"Error sending listener added response.", e) listenerHolder.cleanUp() return } } - logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " + - s"${executeHolder.operationId}] Server side listener added. Now blocking until " + - "all client side listeners are removed or there is error transmitting the event back.") + logInfo( + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " + + log"${MDC(LogKeys.USER_ID, userId)}][operationId: " + + log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " + + log"Server side listener added. Now blocking until all client side listeners are " + + log"removed or there is error transmitting the event back.") // Block the handling thread, and have serverListener continuously send back new events listenerHolder.streamingQueryListenerLatch.await() - logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " + - s"${executeHolder.operationId}] Server side listener long-running handling thread ended.") + logInfo( + log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " + + log"${MDC(LogKeys.USER_ID, userId)}]" + + log"[operationId: " + + log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " + + log"Server side listener long-running handling thread ended.") case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER => listenerHolder.isServerSideListenerRegistered match { case true => diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 445b7d4d7aa0..3adb540a7ad1 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -22,7 +22,7 @@ import java.io.Serializable import scala.reflect.ClassTag import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.util.Utils /** @@ -106,7 +106,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo assertValid() _isValid = false _destroySite = Utils.getCallSite().shortForm - logInfo("Destroying %s (from %s)".format(toString, _destroySite)) + logInfo(log"Destroying ${MDC(LogKeys.BROADCAST, toString)} " + + log"(from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, _destroySite)})") doDestroy(blocking) } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 3ce5e2d62b6a..851fb453fd09 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -71,8 +71,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana if (localDirs.length >= 1) { new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName) } else { - logWarning(s"'spark.local.dir' should be set first when we use db in " + - s"ExternalShuffleService. Note that this only affects standalone mode.") + logWarning("'spark.local.dir' should be set first when we use db in " + + "ExternalShuffleService. Note that this only affects standalone mode.") null } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index be11c23f306e..bd07a0ade523 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker import java.util.concurrent.atomic.AtomicBoolean -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.WORKER_URL import org.apache.spark.rpc._ @@ -64,7 +64,7 @@ private[spark] class WorkerWatcher( } override def receive: PartialFunction[Any, Unit] = { - case e => logWarning(s"Received unexpected message: $e") + case e => logWarning(log"Received unexpected message: ${MDC(LogKeys.ERROR, e)}") } override def onConnected(remoteAddress: RpcAddress): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index d0a202cb7951..dde9b541b62f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -28,7 +28,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException} import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} @@ -436,8 +436,8 @@ private[spark] class IndexShuffleBlockResolver( if (checksumTmp.exists()) { try { if (!checksumTmp.delete()) { - logError(s"Failed to delete temporary checksum file " + - s"at ${checksumTmp.getAbsolutePath}") + logError(log"Failed to delete temporary checksum file at " + + log"${MDC(LogKeys.PATH, checksumTmp.getAbsolutePath)}") } } catch { case e: Exception => diff --git a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala index 2afab01ec7b0..20b3d9444884 100644 --- a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala +++ b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala @@ -24,7 +24,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream @@ -36,7 +36,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur s" an fs (path: $path) with abortable stream support") } - logInfo(s"Writing atomically to $path based on abortable stream") + logInfo(log"Writing atomically to ${MDC(LogKeys.PATH, path)} based on abortable stream") class AbortableStreamBasedFSDataOutputStream( fsDataOutputStream: FSDataOutputStream, diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index b523bd750836..b3c48f13591f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -503,8 +503,8 @@ class LogisticRegression @Since("1.2.0") ( tol, fitIntercept, maxBlockSizeInMB) if (dataset.storageLevel != StorageLevel.NONE) { - instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + - s"then cached during training. Be careful of double caching!") + instr.logWarning("Input instances will be standardized, blockified to blocks, and " + + "then cached during training. Be careful of double caching!") } val instances = dataset.select( @@ -569,8 +569,8 @@ class LogisticRegression @Since("1.2.0") ( val isConstantLabel = histogram.count(_ != 0.0) == 1 if ($(fitIntercept) && isConstantLabel && !usingBoundConstrainedOptimization) { - instr.logWarning(s"All labels are the same value and fitIntercept=true, so the " + - s"coefficients will be zeros. Training is not needed.") + instr.logWarning("All labels are the same value and fitIntercept=true, so the " + + "coefficients will be zeros. Training is not needed.") val constantLabelIndex = Vectors.dense(histogram).argmax val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures, new Array[Int](numCoefficientSets + 1), Array.emptyIntArray, Array.emptyDoubleArray, @@ -584,8 +584,8 @@ class LogisticRegression @Since("1.2.0") ( } if (!$(fitIntercept) && isConstantLabel) { - instr.logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " + - s"dangerous ground, so the algorithm may not converge.") + instr.logWarning("All labels belong to a single class and fitIntercept=false. It's a " + + "dangerous ground, so the algorithm may not converge.") } val featuresMean = summarizer.mean.toArray diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4b5f9be3193f..2523941e0bff 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -528,8 +528,9 @@ private[spark] class ApplicationMaster( } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( - s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + - "Please check earlier log output for errors. Failing the application.") + log"""SparkContext did not initialize after waiting for + |${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms. + | Please check earlier log output for errors. Failing the application.""".stripMargin) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") @@ -690,7 +691,7 @@ private[spark] class ApplicationMaster( } } catch { case ioe: IOException => - logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + logError(log"Failed to cleanup staging dir ${MDC(LogKeys.PATH, stagingDirPath)}", ioe) } } @@ -736,7 +737,8 @@ private[spark] class ApplicationMaster( override def run(): Unit = { try { if (!Modifier.isStatic(mainMethod.getModifiers)) { - logError(s"Could not find static main method in object ${args.userClass}") + logError(log"Could not find static main method in object " + + log"${MDC(LogKeys.CLASS_NAME, args.userClass)}") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) } else { mainMethod.invoke(null, userArgs.toArray) @@ -866,7 +868,8 @@ private[spark] class ApplicationMaster( finish(FinalApplicationStatus.FAILED, exitCode) } } else { - logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress") + logError(log"Application Master lost connection with driver! Shutting down. " + + log"${MDC(LogKeys.REMOTE_ADDRESS, remoteAddress)}") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0567d8efb85e..bf31e03ba9a8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1385,7 +1385,7 @@ private[spark] class Client( val YarnAppReport(appState, finalState, diags) = monitorApplication() if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) { diags.foreach { err => - logError(s"Application diagnostics message: $err") + logError(log"Application diagnostics message: ${MDC(LogKeys.ERROR, err)}") } throw new SparkException(s"Application $appId finished with failed status") } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 26be1ff89314..8032d782cf4f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicati import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{APP_ID, APP_STATE} import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl @@ -120,7 +120,7 @@ private[spark] class YarnClientSchedulerBackend( logError(log"YARN application has exited unexpectedly with state " + log"${MDC(APP_STATE, state)}! Check the YARN application logs for more details.") diags.foreach { err => - logError(s"Diagnostics message: $err") + logError(log"Diagnostics message: ${MDC(LogKeys.ERROR, err)}") } allowInterrupt = false sc.stop() diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala index 42c8c783e54c..9219c1d139b9 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.types import scala.collection.mutable import org.apache.spark.annotation.{DeveloperApi, Since} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.errors.DataTypeErrors import org.apache.spark.util.SparkClassUtils @@ -58,7 +58,8 @@ object UDTRegistration extends Serializable with Logging { */ def register(userClass: String, udtClass: String): Unit = { if (udtMap.contains(userClass)) { - logWarning(s"Cannot register UDT for ${userClass}, which is already registered.") + logWarning(log"Cannot register UDT for ${MDC(LogKeys.CLASS_NAME, userClass)}, " + + log"which is already registered.") } else { // When register UDT with class name, we can't check if the UDT class is an UserDefinedType, // or not. The check is deferred. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index ba3f1df22bc4..a39c10866984 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -32,9 +32,8 @@ import org.codehaus.janino.util.ClassFile import org.apache.spark.{SparkException, SparkIllegalArgumentException, TaskContext, TaskKilledException} import org.apache.spark.executor.InputMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.MDC import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.HashableWeakReference @@ -1593,9 +1592,10 @@ object CodeGenerator extends Logging { CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize) if (byteCodeSize > DEFAULT_JVM_HUGE_METHOD_LIMIT) { - logInfo("Generated method too long to be JIT compiled: " + - log"${MDC(CLASS_NAME, cf.getThisClassName)}.${MDC(METHOD_NAME, method.getName)} " + - log"is ${MDC(BYTECODE_SIZE, byteCodeSize)} bytes") + logInfo(log"Generated method too long to be JIT compiled: " + + log"${MDC(LogKeys.CLASS_NAME, cf.getThisClassName)}." + + log"${MDC(LogKeys.METHOD_NAME, method.getName)} is " + + log"${MDC(LogKeys.BYTECODE_SIZE, byteCodeSize)} bytes") } byteCodeSize diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 9370b3d8d1d7..bb7d904402de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable.ArrayBuffer import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec} object ShufflePartitionsUtil extends Logging { @@ -61,8 +61,10 @@ object ShufflePartitionsUtil extends Logging { val targetSize = maxTargetSize.min(advisoryTargetSize).max(minPartitionSize) val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ") - logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + - s"actual target size $targetSize, minimum partition size: $minPartitionSize") + logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_ID, shuffleIds)}, advisory target size: " + + log"${MDC(LogKeys.ADVISORY_TARGET_SIZE, advisoryTargetSize)}, actual target size " + + log"${MDC(LogKeys.TARGET_SIZE, targetSize)}, minimum partition size: " + + log"${MDC(LogKeys.PARTITION_SIZE, minPartitionSize)}") // If `inputPartitionSpecs` are all empty, it means skew join optimization is not applied. if (inputPartitionSpecs.forall(_.isEmpty)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala index 75a38b8ea622..7d80cc272810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.python import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} @@ -52,7 +52,8 @@ class PythonStreamingPartitionReaderFactory( val block = SparkEnv.get.blockManager.get[InternalRow](part.blockId.get) .map(_.data.asInstanceOf[Iterator[InternalRow]]) if (block.isEmpty) { - logWarning(s"Prefetched block ${part.blockId} for Python data source not found.") + logWarning(log"Prefetched block ${MDC(LogKeys.BLOCK_ID, part.blockId)} " + + log"for Python data source not found.") } block } else None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala index a512b34db345..33612b6947f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala @@ -27,7 +27,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader import org.apache.spark.SparkEnv import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.PYTHON_EXEC import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT @@ -214,7 +214,8 @@ class PythonStreamingSourceRunner( * Stop the python worker process and invoke stop() on stream reader. */ def stop(): Unit = { - logInfo(s"Stopping streaming runner for module: $workerModule.") + logInfo(log"Stopping streaming runner for module: " + + log"${MDC(LogKeys.MODULE_NAME, workerModule)}.") try { pythonWorkerFactory.foreach { factory => pythonWorker.foreach { worker => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 3842ed574355..c440ec451b72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -352,8 +352,9 @@ abstract class ProgressContext( metrics = sourceMetrics ) } - logInfo(s"Extracting source progress metrics for source=${source.toString} took " + - s"duration_ms=$duration") + logInfo(log"Extracting source progress metrics for source=" + + log"${MDC(LogKeys.SOURCE, source.toString)} " + + log"took duration_ms=${MDC(LogKeys.DURATION, duration)}") result } } @@ -368,8 +369,8 @@ abstract class ProgressContext( SinkProgress(sink.toString, sinkOutput, sinkMetrics) } - logInfo(s"Extracting sink progress metrics for sink=${sink.toString} took " + - s"duration_ms=$duration") + logInfo(log"Extracting sink progress metrics for sink=${MDC(LogKeys.SINK, sink.toString)} " + + log"took duration_ms=${MDC(LogKeys.DURATION, duration)}") result } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 420deda3e017..4198d7367fe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -32,7 +32,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, SparkThrowable} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT, PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -322,7 +322,8 @@ abstract class StreamExecution( if (state.compareAndSet(INITIALIZING, ACTIVE)) { // Log logical plan at the start of the query to help debug issues related to // plan changes. - logInfo(s"Finish initializing with logical plan:\n$logicalPlan") + logInfo(log"Finish initializing with logical plan:\n" + + log"${MDC(LogKeys.QUERY_PLAN, logicalPlan)}") // Unblock `awaitInitialization` initializationLatch.countDown() @@ -372,7 +373,8 @@ abstract class StreamExecution( case _ => None } - logError(s"Query $prettyIdString terminated with error", e) + logError(log"Query ${MDC(LogKeys.PRETTY_ID_STRING, prettyIdString)} " + + log"terminated with error", e) getLatestExecutionContext().updateStatusMessage(s"Terminated with exception: $message") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index fa49da5feeed..633aaf2682db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -445,7 +445,8 @@ class ContinuousExecution( */ def stopInNewThread(error: Throwable): Unit = { if (failure.compareAndSet(null, error)) { - logError(s"Query $prettyIdString received exception $error") + logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} received exception " + + log"${MDC(ERROR, error)}") stopInNewThread() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala index 5d54b5754915..d5daa9a875f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.write.DataWriter @@ -89,9 +89,11 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writerFactory: StreamingDat })(catchBlock = { // If there is an error, abort this writer. We enter this callback in the middle of // rethrowing an exception, so compute() will stop executing at this point. - logError(s"Writer for partition ${context.partitionId()} is aborting.") + logError(log"Writer for partition ${MDC(LogKeys.PARTITION_ID, context.partitionId())} " + + log"is aborting.") if (dataWriter != null) dataWriter.abort() - logError(s"Writer for partition ${context.partitionId()} aborted.") + logError(log"Writer for partition ${MDC(LogKeys.PARTITION_ID, context.partitionId())} " + + log"aborted.") }, finallyBlock = { if (dataWriter != null) dataWriter.close() }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index 23f867d3e6c0..20df67b25bfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -25,7 +25,7 @@ import com.google.common.io.ByteStreams import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FSError, Path} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.errors.QueryExecutionErrors @@ -176,7 +176,8 @@ class StateStoreChangelogWriterV1( } catch { case e: Throwable => abort() - logError(s"Fail to commit changelog file $file because of exception $e") + logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME, file)} " + + log"because of exception ${MDC(LogKeys.EXCEPTION, e)}") throw e } finally { backingFileStream = null @@ -255,7 +256,8 @@ class StateStoreChangelogWriterV2( } catch { case e: Throwable => abort() - logError(s"Fail to commit changelog file $file because of exception $e") + logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME, file)} " + + log"because of exception ${MDC(LogKeys.EXCEPTION, e)}") throw e } finally { backingFileStream = null diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index b86e996a3408..51a5e88aa633 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -30,7 +30,7 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, TTypeQualifiers, TTypeQualifierValue} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -83,7 +83,7 @@ private[hive] class SparkExecuteStatementOperation( val sparkType = new StructType().add("Result", "string") SparkExecuteStatementOperation.toTTableSchema(sparkType) } else { - logInfo(s"Result Schema: ${result.schema.sql}") + logInfo(log"Result Schema: ${MDC(LogKeys.SCHEMA, result.schema.sql)}") SparkExecuteStatementOperation.toTTableSchema(result.schema) } } @@ -126,8 +126,8 @@ private[hive] class SparkExecuteStatementOperation( override def runInternal(): Unit = { setState(OperationState.PENDING) logInfo( - log"Submitting query '${MDC(REDACTED_STATEMENT, redactedStatement)}' with " + - log"${MDC(STATEMENT_ID, statementId)}") + log"Submitting query '${MDC(LogKeys.REDACTED_STATEMENT, redactedStatement)}' with " + + log"${MDC(LogKeys.STATEMENT_ID, statementId)}") HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, @@ -215,8 +215,8 @@ private[hive] class SparkExecuteStatementOperation( synchronized { if (getStatus.getState.isTerminal) { logInfo( - log"Query with ${MDC(STATEMENT_ID, statementId)} in terminal state " + - log"before it started running") + log"Query with ${MDC(LogKeys.STATEMENT_ID, statementId)} in terminal state " + + log"before it started running") return } else { logInfo(log"Running query with ${MDC(STATEMENT_ID, statementId)}") @@ -289,8 +289,8 @@ private[hive] class SparkExecuteStatementOperation( synchronized { if (!getStatus.getState.isTerminal) { logInfo( - log"Query with ${MDC(STATEMENT_ID, statementId)} timed out after " + - log"${MDC(TIMEOUT, timeout)} seconds") + log"Query with ${MDC(LogKeys.STATEMENT_ID, statementId)} timed out " + + log"after ${MDC(LogKeys.TIMEOUT, timeout)} seconds") setState(OperationState.TIMEDOUT) cleanup() HiveThriftServer2.eventManager.onStatementTimeout(statementId) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 53a94a128c0e..9cf31d99ccfa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -27,7 +27,7 @@ import org.apache.hive.service.cli.operation.GetFunctionsOperation import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.SQLContext /** @@ -53,7 +53,7 @@ private[hive] class SparkGetFunctionsOperation( // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'" - logInfo(s"$logMsg with $statementId") + logInfo(log"${MDC(LogKeys.MESSAGE, logMsg)} with ${MDC(LogKeys.STATEMENT_ID, statementId)}") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 6cbc74a75a06..f09f9caf129b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE, CHECKPOINT_TIME, NUM_RETRY, PATH, TEMP_FILE} import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec @@ -102,7 +102,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) assert(framework != null, "Checkpoint.framework is null") assert(graph != null, "Checkpoint.graph is null") assert(checkpointTime != null, "Checkpoint.checkpointTime is null") - logInfo(s"Checkpoint for time $checkpointTime validated") + logInfo(log"Checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME, checkpointTime)} validated") } } @@ -242,7 +242,8 @@ class CheckpointWriter( while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'") + logInfo(log"Saving checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME, checkpointTime)} " + + log"to file '${MDC(LogKeys.CHECKPOINT_FILE, checkpointFile)}'") if (fs == null) { fs = new Path(checkpointDir).getFileSystem(hadoopConf) } @@ -275,15 +276,19 @@ class CheckpointWriter( val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)) if (allCheckpointFiles.size > 10) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file => - logInfo(s"Deleting $file") + logInfo(log"Deleting ${MDC(LogKeys.FILE_NAME, file)}") fs.delete(file, true) } } // All done, print success - logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" + - s", took ${bytes.length} bytes and " + - s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms") + logInfo( + log"Checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME, checkpointTime)} " + + log"saved to file " + + log"'${MDC(LogKeys.CHECKPOINT_FILE, checkpointFile)}', took " + + log"${MDC(LogKeys.BYTE_SIZE, bytes.length)} bytes and " + + log"${MDC(LogKeys.TIME, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() + - startTimeNs))} ms") jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater) return } catch { @@ -304,7 +309,8 @@ class CheckpointWriter( val bytes = Checkpoint.serialize(checkpoint, conf) executor.execute(new CheckpointWriteHandler( checkpoint.checkpointTime, bytes, clearCheckpointDataLater)) - logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to writer queue") + logInfo(log"Submitted checkpoint of time ${MDC(LogKeys.CHECKPOINT_TIME, + checkpoint.checkpointTime)} to writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) @@ -316,8 +322,10 @@ class CheckpointWriter( val startTimeNs = System.nanoTime() ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS)) - logInfo(s"CheckpointWriter executor terminated? ${executor.isTerminated}," + - s" waited for ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms.") + logInfo(log"CheckpointWriter executor terminated? " + + log"${MDC(LogKeys.EXECUTOR_STATE, executor.isTerminated)}, waited for " + + log"${MDC(LogKeys.DURATION, TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTimeNs))} ms.") stopped = true } } @@ -357,15 +365,17 @@ object CheckpointReader extends Logging { } // Try to read the checkpoint files in the order - logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}") + logInfo(log"Checkpoint files found: " + + log"${MDC(LogKeys.CHECKPOINT_FILE, checkpointFiles.mkString(","))}") var readError: Exception = null checkpointFiles.foreach { file => - logInfo(s"Attempting to load checkpoint from file $file") + logInfo(log"Attempting to load checkpoint from file ${MDC(LogKeys.FILE_NAME, file)}") try { val fis = fs.open(file) val cp = Checkpoint.deserialize(fis, conf) - logInfo(s"Checkpoint successfully loaded from file $file") - logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}") + logInfo(log"Checkpoint successfully loaded from file ${MDC(LogKeys.FILE_NAME, file)}") + logInfo(log"Checkpoint was generated at time " + + log"${MDC(LogKeys.CHECKPOINT_TIME, cp.checkpointTime)}") return Some(cp) } catch { case e: Exception => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 30bd30329283..94b695e6452e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -36,7 +36,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.input.FixedLengthBinaryInputFormat -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.SerializationDebugger @@ -725,7 +725,8 @@ class StreamingContext private[streaming] ( private def stopOnShutdown(): Unit = { val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN) - logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") + logInfo(log"Invoking stop(stopGracefully=" + + log"${MDC(LogKeys.VALUE, stopGracefully)}) from shutdown hook") // Do not stop SparkContext, let its own shutdown hook stop it stop(stopSparkContext = false, stopGracefully = stopGracefully) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 38f55f80657b..87d6a4909fdd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import scala.util.matching.Regex import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{FROM_TIME, SLIDE_DURATION, TO_TIME} import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope} @@ -201,7 +201,8 @@ abstract class DStream[T: ClassTag] ( // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint && checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt - logInfo(s"Checkpoint interval automatically set to $checkpointDuration") + logInfo(log"Checkpoint interval automatically set to " + + log"${MDC(LogKeys.CHECKPOINT_INTERVAL, checkpointDuration)}") } // Set the minimum value of the rememberDuration if not already set @@ -277,11 +278,11 @@ abstract class DStream[T: ClassTag] ( dependencies.foreach(_.validateAtStart()) - logInfo(s"Slide time = $slideDuration") - logInfo(s"Storage level = ${storageLevel.description}") - logInfo(s"Checkpoint interval = $checkpointDuration") - logInfo(s"Remember interval = $rememberDuration") - logInfo(s"Initialized and validated $this") + logInfo(log"Slide time = ${MDC(LogKeys.SLIDE_DURATION, slideDuration)}") + logInfo(log"Storage level = ${MDC(LogKeys.STORAGE_LEVEL, storageLevel.description)}") + logInfo(log"Checkpoint interval = ${MDC(LogKeys.CHECKPOINT_INTERVAL, checkpointDuration)}") + logInfo(log"Remember interval = ${MDC(LogKeys.INTERVAL, rememberDuration)}") + logInfo(log"Initialized and validated ${MDC(LogKeys.DSTREAM, this)}") } private[streaming] def setContext(s: StreamingContext): Unit = { @@ -289,7 +290,7 @@ abstract class DStream[T: ClassTag] ( throw new SparkException(s"Context must not be set again for $this") } ssc = s - logInfo(s"Set context for $this") + logInfo(log"Set context for ${MDC(LogKeys.STREAMING_CONTEXT, this)}") dependencies.foreach(_.setContext(ssc)) } @@ -304,7 +305,9 @@ abstract class DStream[T: ClassTag] ( private[streaming] def remember(duration: Duration): Unit = { if (duration != null && (rememberDuration == null || duration > rememberDuration)) { rememberDuration = duration - logInfo(s"Duration for remembering RDDs set to $rememberDuration for $this") + logInfo(log"Duration for remembering RDDs set to " + + log"${MDC(LogKeys.DURATION, rememberDuration)} for " + + log"${MDC(LogKeys.DSTREAM, this.toString)}") } dependencies.foreach(_.remember(parentRememberDuration)) } @@ -314,8 +317,10 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new SparkException (this.toString + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { - logInfo(s"Time $time is invalid as zeroTime is $zeroTime" + - s" , slideDuration is $slideDuration and difference is ${time - zeroTime}") + logInfo(log"Time ${MDC(LogKeys.TIME, time)} is invalid as zeroTime is " + + log"${MDC(LogKeys.ZERO_TIME, zeroTime)}, slideDuration is " + + log"${MDC(LogKeys.SLIDE_DURATION, slideDuration)} and difference is " + + log"${MDC(LogKeys.DURATION, time - zeroTime)}") false } else { logDebug(s"Time $time is valid") @@ -353,7 +358,8 @@ abstract class DStream[T: ClassTag] ( } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() - logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") + logInfo(log"Marking RDD ${MDC(LogKeys.RDD_ID, newRDD.id)} for time " + + log"${MDC(LogKeys.TIME, time)} for checkpointing") } generatedRDDs.put(time, newRDD) } @@ -461,7 +467,8 @@ abstract class DStream[T: ClassTag] ( // Explicitly remove blocks of BlockRDD rdd match { case b: BlockRDD[_] => - logInfo(s"Removing blocks of RDD $b of time $time") + logInfo(log"Removing blocks of RDD ${MDC(LogKeys.RDD_ID, b)} " + + log"of time ${MDC(LogKeys.TIME, time)}") b.removeBlocks() case _ => } @@ -898,8 +905,10 @@ abstract class DStream[T: ClassTag] ( fromTime.floor(slideDuration, zeroTime) } - logInfo(s"Slicing from $fromTime to $toTime" + - s" (aligned to $alignedFromTime and $alignedToTime)") + logInfo(log"Slicing from ${MDC(LogKeys.FROM_TIME, fromTime)} to " + + log"${MDC(LogKeys.TO_TIME, toTime)}" + + log" (aligned to ${MDC(LogKeys.ALIGNED_FROM_TIME, alignedFromTime)} and " + + log"${MDC(LogKeys.ALIGNED_TO_TIME, alignedToTime)})") alignedFromTime.to(alignedToTime, slideDuration).flatMap { time => if (time >= zeroTime) getOrCompute(time) else None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 883d56c012f6..34b079219c99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets import scala.reflect.ClassTag import scala.util.control.NonFatal -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.receiver.Receiver @@ -56,7 +56,7 @@ class SocketReceiver[T: ClassTag]( def onStart(): Unit = { - logInfo(s"Connecting to $host:$port") + logInfo(log"Connecting to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, port)}") try { socket = new Socket(host, port) } catch { @@ -64,7 +64,7 @@ class SocketReceiver[T: ClassTag]( restart(s"Error connecting to $host:$port", e) return } - logInfo(s"Connected to $host:$port") + logInfo(log"Connected to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, port)}") // Start the thread that receives data over a connection new Thread("Socket Receiver") { @@ -79,7 +79,7 @@ class SocketReceiver[T: ClassTag]( if (socket != null) { socket.close() socket = null - logInfo(s"Closed socket to $host:$port") + logInfo(log"Closed socket to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, port)}") } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 79bfd8674b44..7cc08b421f78 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -25,7 +25,7 @@ import scala.concurrent._ import scala.util.control.NonFatal import org.apache.spark.SparkConf -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{DELAY, ERROR, MESSAGE, STREAM_ID} import org.apache.spark.storage.StreamBlockId import org.apache.spark.util.{ThreadUtils, Utils} @@ -145,10 +145,10 @@ private[streaming] abstract class ReceiverSupervisor( def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { - logInfo(s"Starting receiver $streamId") + logInfo(log"Starting receiver ${MDC(LogKeys.STREAM_ID, streamId)}") receiverState = Started receiver.onStart() - logInfo(s"Called receiver $streamId onStart") + logInfo(log"Called receiver ${MDC(LogKeys.STREAM_ID, streamId)} onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) @@ -162,7 +162,8 @@ private[streaming] abstract class ReceiverSupervisor( /** Stop receiver */ def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized { try { - logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse("")) + logInfo(log"Stopping receiver with message: ${MDC(LogKeys.MESSAGE, message)}: " + + log"${MDC(LogKeys.ERROR, error.getOrElse(""))}") receiverState match { case Initialized => logWarning("Skip stopping receiver because it has not yet stared") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 47beb4521950..aafa99bd5285 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -28,7 +28,7 @@ import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkEnv, SparkException} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{ERROR, MESSAGE} import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.storage.StreamBlockId @@ -85,7 +85,7 @@ private[streaming] class ReceiverSupervisorImpl( logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => - logInfo(s"Received a new rate limit: $eps.") + logInfo(log"Received a new rate limit: ${MDC(LogKeys.RATE_LIMIT, eps)}.") registeredBlockGenerators.asScala.foreach { bg => bg.updateRate(eps) } @@ -195,10 +195,10 @@ private[streaming] class ReceiverSupervisorImpl( } override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = { - logInfo("Deregistering receiver " + streamId) + logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}") val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString)) - logInfo("Stopped receiver " + streamId) + logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}") } override def createBlockGenerator( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 5aa2a9df3ba8..903cde8082db 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -21,7 +21,7 @@ package org.apache.spark.streaming.scheduler import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.Streaming._ import org.apache.spark.resource.ResourceProfile @@ -75,8 +75,10 @@ private[streaming] class ExecutorAllocationManager( def start(): Unit = { timer.start() - logInfo(s"ExecutorAllocationManager started with " + - s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec") + logInfo(log"ExecutorAllocationManager started with ratios = " + + log"[${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " + + log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}] and interval = " + + log"${MDC(LogKeys.INTERVAL, scalingIntervalSecs)} sec") } def stop(): Unit = { @@ -89,11 +91,14 @@ private[streaming] class ExecutorAllocationManager( * batch statistics. */ private def manageAllocation(): Unit = synchronized { - logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]") + logInfo(log"Managing executor allocation with ratios = [" + + log"${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " + + log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}]") if (batchProcTimeCount > 0) { val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount val ratio = averageBatchProcTime.toDouble / batchDurationMs - logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" ) + logInfo(log"Average: ${MDC(LogKeys.AVG_BATCH_PROC_TIME, averageBatchProcTime)}, " + + log"ratio = ${MDC(LogKeys.RATIO, ratio)}") if (ratio >= scalingUpRatio) { logDebug("Requesting executors") val numNewExecutors = math.max(math.round(ratio).toInt, 1) @@ -119,7 +124,8 @@ private[streaming] class ExecutorAllocationManager( Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> targetTotalExecutors), Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0), Map.empty) - logInfo(s"Requested total $targetTotalExecutors executors") + logInfo(log"Requested total ${MDC(LogKeys.NUM_EXECUTORS, + targetTotalExecutors)} executors") } /** Kill an executor that is not running any receiver, if possible */ @@ -129,7 +135,9 @@ private[streaming] class ExecutorAllocationManager( if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) { val execIdsWithReceivers = receiverTracker.allocatedExecutors().values.flatten.toSeq - logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}") + logInfo(log"Executors with receivers (${MDC(LogKeys.NUM_EXECUTORS, + execIdsWithReceivers.size)}): " + + log"${MDC(LogKeys.EXECUTOR_IDS, execIdsWithReceivers)}") val removableExecIds = allExecIds.diff(execIdsWithReceivers) logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") @@ -142,7 +150,7 @@ private[streaming] class ExecutorAllocationManager( } else { client.killExecutor(execIdToRemove) } - logInfo(s"Requested to kill executor $execIdToRemove") + logInfo(log"Requested to kill executor ${MDC(LogKeys.EXECUTOR_ID, execIdToRemove)}") } else { logInfo(s"No non-receiver executors to kill") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala index 639ac6de4f5d..bd9ea7b5a268 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.streaming.{StreamingContext, Time} /** @@ -82,7 +82,8 @@ private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging /** Cleanup the tracked input information older than threshold batch time */ def cleanup(batchThreshTime: Time): Unit = synchronized { val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime) - logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}") + logInfo(log"remove old batch metadata: " + + log"${MDC(LogKeys.DURATION, timesToCleanup.mkString(" "))}") batchTimeToInputInfos --= timesToCleanup } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index f2700737384b..7fb35a04be6d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{RECEIVED_BLOCK_INFO, RECEIVED_BLOCK_TRACKER_LOG_EVENT} import org.apache.spark.network.util.JavaUtils import org.apache.spark.streaming.Time @@ -127,7 +127,9 @@ private[streaming] class ReceivedBlockTracker( timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { - logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") + logInfo(log"Possibly processed batch ${MDC(LogKeys.BATCH_TIMESTAMP, + batchTime)} needs to be " + + log"processed again in WAL recovery") } } else { // This situation occurs when: @@ -137,7 +139,9 @@ private[streaming] class ReceivedBlockTracker( // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. - logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") + logInfo(log"Possibly processed batch ${MDC(LogKeys.BATCH_TIMESTAMP, + batchTime)} needs to be processed " + + log"again in WAL recovery") } } @@ -175,7 +179,7 @@ private[streaming] class ReceivedBlockTracker( def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq - logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}") + logInfo(log"Deleting batches: ${MDC(LogKeys.DURATION, timesToCleanup.mkString(" "))}") if (writeToLog(BatchCleanupEvent(timesToCleanup))) { timeToAllocatedBlocks --= timesToCleanup writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) @@ -221,9 +225,10 @@ private[streaming] class ReceivedBlockTracker( } writeAheadLogOption.foreach { writeAheadLog => - logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}") + logInfo(log"Recovering from write ahead logs in " + + log"${MDC(LogKeys.PATH, checkpointDirOption.get)}") writeAheadLog.readAll().asScala.foreach { byteBuffer => - logInfo("Recovering record " + byteBuffer) + logInfo(log"Recovering record ${MDC(LogKeys.BYTE_BUFFER, byteBuffer)}") Utils.deserialize[ReceivedBlockTrackerLogEvent]( JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader) match { case BlockAdditionEvent(receivedBlockInfo) => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 48273b3b593c..a37ba04c1012 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} import org.apache.spark._ -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{ERROR, MESSAGE, RECEIVER_ID, RECEIVER_IDS, STREAM_ID} import org.apache.spark.rdd.RDD import org.apache.spark.rpc._ @@ -232,7 +232,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Signal the receivers to delete old block data if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { - logInfo(s"Cleanup old received batch data: $cleanupThreshTime") + logInfo(log"Cleanup old received batch data: " + + log"${MDC(LogKeys.CLEANUP_LOCAL_DIRS, cleanupThreshTime)}") synchronized { if (isTrackerStarted) { endpoint.send(CleanupOldBlocks(cleanupThreshTime)) @@ -306,7 +307,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false endpoint = Some(receiverEndpoint)) receiverTrackingInfos.put(streamId, receiverTrackingInfo) listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) - logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) + logInfo(log"Registered receiver for stream ${MDC(LogKeys.STREAM_ID, streamId)} " + + log"from ${MDC(LogKeys.RPC_ADDRESS, senderAddress)}") true } } @@ -447,7 +449,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false runDummySparkJob() - logInfo("Starting " + receivers.length + " receivers") + logInfo(log"Starting ${MDC(LogKeys.NUM_RECEIVERS, receivers.length)} receivers") endpoint.send(StartAllReceivers(receivers.toImmutableArraySeq)) } @@ -625,7 +627,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { - logInfo(s"Restarting Receiver $receiverId") + logInfo(log"Restarting Receiver ${MDC(LogKeys.STREAM_ID, receiverId)}") self.send(RestartReceiver(receiver)) } case Failure(e) => @@ -633,11 +635,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) - logInfo(s"Restarting Receiver $receiverId") + logInfo(log"Restarting Receiver ${MDC(LogKeys.STREAM_ID, receiverId)}") self.send(RestartReceiver(receiver)) } }(ThreadUtils.sameThread) - logInfo(s"Receiver ${receiver.streamId} started") + logInfo(log"Receiver ${MDC(LogKeys.STREAM_ID, receiver.streamId)} started") } override def onStop(): Unit = { @@ -660,7 +662,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Send stop signal to the receivers. */ private def stopReceivers(): Unit = { receiverTrackingInfos.values.flatMap(_.endpoint).foreach { _.send(StopReceiver) } - logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers") + logInfo(log"Sent stop signal to all " + + log"${MDC(LogKeys.NUM_RECEIVERS, receiverTrackingInfos.size)} receivers") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala index dc02062b9eb4..1b05a6ac30cc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.scheduler.rate -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} /** * Implements a proportional-integral-derivative (PID) controller which acts on @@ -74,8 +74,11 @@ private[streaming] class PIDRateEstimator( minRate > 0, s"Minimum rate in PIDRateEstimator should be > 0") - logInfo(s"Created PIDRateEstimator with proportional = $proportional, integral = $integral, " + - s"derivative = $derivative, min rate = $minRate") + logInfo(log"Created PIDRateEstimator with proportional = " + + log"${MDC(LogKeys.PROPORTIONAL, proportional)}, integral = " + + log"${MDC(LogKeys.INTEGRAL, integral)}, derivative = " + + log"${MDC(LogKeys.DERIVATIVE, derivative)}, min rate = " + + log"${MDC(LogKeys.MIN_RATE, minRate)}") def compute( time: Long, // in milliseconds diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 5dcdd573c744..8befe53efffa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import org.apache.spark.SparkConf -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.RECORDS import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.{ThreadUtils, Utils} @@ -122,7 +122,8 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp * Stop the batched writer thread, fulfill promises with failures and close the wrapped WAL. */ override def close(): Unit = { - logInfo(s"BatchedWriteAheadLog shutting down at time: ${System.currentTimeMillis()}.") + logInfo(log"BatchedWriteAheadLog shutting down at time: " + + log"${MDC(LogKeys.TIME, System.currentTimeMillis())}.") if (!active.getAndSet(false)) return batchedWriterThread.interrupt() batchedWriterThread.join() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 58a6b929a81f..d90095c73785 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{NUM_RETRY, WRITE_AHEAD_LOG_INFO} import org.apache.spark.util.{CompletionIterator, ThreadUtils} import org.apache.spark.util.ArrayImplicits._ @@ -137,7 +137,8 @@ private[streaming] class FileBasedWriteAheadLog( */ def readAll(): JIterator[ByteBuffer] = synchronized { val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath - logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n")) + logInfo(log"Reading from the logs:\n" + + log"${MDC(LogKeys.PATHS, logFilesToRead.mkString("\n"))}") def readFile(file: String): Iterator[ByteBuffer] = { logDebug(s"Creating log reader with $file") val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) @@ -170,8 +171,11 @@ private[streaming] class FileBasedWriteAheadLog( pastLogs --= expiredLogs expiredLogs } - logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + - s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") + logInfo(log"Attempting to clear ${MDC(LogKeys.NUM_RECORDS_READ, oldLogFiles.size)} " + + log"old log files in " + + log"${MDC(LogKeys.PATH, logDirectory)} older than " + + log"${MDC(LogKeys.THRESHOLD, threshTime)}: " + + log"${MDC(LogKeys.FILES, oldLogFiles.map(_.path).mkString("\n"))}") def deleteFile(walInfo: LogInfo): Unit = { try { @@ -184,7 +188,8 @@ private[streaming] class FileBasedWriteAheadLog( logWarning(log"Error clearing write ahead log file " + log"${MDC(WRITE_AHEAD_LOG_INFO, walInfo)}", ex) } - logInfo(s"Cleared log files in $logDirectory older than $threshTime") + logInfo(log"Cleared log files in ${MDC(LogKeys.PATH, logDirectory)} older than " + + log"${MDC(LogKeys.THRESH_TIME, threshTime)}") } oldLogFiles.foreach { logInfo => if (!executionContext.isShutdown) { @@ -252,7 +257,9 @@ private[streaming] class FileBasedWriteAheadLog( fileSystem.listStatus(logDirectoryPath).map { _.getPath }.toImmutableArraySeq) pastLogs.clear() pastLogs ++= logFileInfo - logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") + logInfo(log"Recovered ${MDC(LogKeys.NUM_FILES, logFileInfo.size)} " + + log"write ahead log files from " + + log"${MDC(LogKeys.PATH, logDirectory)}") logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") } } catch {