Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ private[spark] object LogKeys {
case object ADDED_JARS extends LogKey
case object ADMIN_ACLS extends LogKey
case object ADMIN_ACL_GROUPS extends LogKey
case object ADVISORY_TARGET_SIZE extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALIGNED_FROM_TIME extends LogKey
case object ALIGNED_TO_TIME extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
case object APP_ATTEMPT_ID extends LogKey
Expand All @@ -77,8 +80,10 @@ private[spark] object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
case object AVG_BATCH_PROC_TIME extends LogKey
case object BACKUP_FILE extends LogKey
case object BARRIER_EPOCH extends LogKey
case object BARRIER_ID extends LogKey
Expand All @@ -98,6 +103,8 @@ private[spark] object LogKeys {
case object BROADCAST_ID extends LogKey
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object BUCKET extends LogKey
case object BYTE_BUFFER extends LogKey
case object BYTE_SIZE extends LogKey
case object BYTECODE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
Expand All @@ -109,6 +116,7 @@ private[spark] object LogKeys {
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_FILE extends LogKey
case object CHECKPOINT_INTERVAL extends LogKey
case object CHECKPOINT_LOCATION extends LogKey
case object CHECKPOINT_PATH extends LogKey
case object CHECKPOINT_ROOT extends LogKey
Expand Down Expand Up @@ -186,6 +194,7 @@ private[spark] object LogKeys {
case object DELEGATE extends LogKey
case object DELTA extends LogKey
case object DEPRECATED_KEY extends LogKey
case object DERIVATIVE extends LogKey
case object DESCRIPTION extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DESIRED_TREE_DEPTH extends LogKey
Expand All @@ -197,6 +206,7 @@ private[spark] object LogKeys {
case object DRIVER_MEMORY_SIZE extends LogKey
case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
case object DSTREAM extends LogKey
case object DURATION extends LogKey
case object EARLIEST_LOADED_VERSION extends LogKey
case object EFFECTIVE_STORAGE_LEVEL extends LogKey
Expand Down Expand Up @@ -251,6 +261,7 @@ private[spark] object LogKeys {
case object FEATURE_NAME extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
case object FILES extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
case object FILE_END_OFFSET extends LogKey
case object FILE_FORMAT extends LogKey
Expand Down Expand Up @@ -307,6 +318,7 @@ private[spark] object LogKeys {
case object INIT_MODE extends LogKey
case object INPUT extends LogKey
case object INPUT_SPLIT extends LogKey
case object INTEGRAL extends LogKey
case object INTERVAL extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object ISSUE_DATE extends LogKey
Expand Down Expand Up @@ -394,6 +406,7 @@ private[spark] object LogKeys {
case object MIN_COMPACTION_BATCH_ID extends LogKey
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_RATE extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
Expand Down Expand Up @@ -490,6 +503,7 @@ private[spark] object LogKeys {
case object NUM_PREFIXES extends LogKey
case object NUM_PRUNED extends LogKey
case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey
case object NUM_RECEIVERS extends LogKey
case object NUM_RECORDS_READ extends LogKey
case object NUM_RELEASED_LOCKS extends LogKey
case object NUM_REMAINED extends LogKey
Expand Down Expand Up @@ -548,6 +562,7 @@ private[spark] object LogKeys {
case object PARTITION_ID extends LogKey
case object PARTITION_IDS extends LogKey
case object PARTITION_SPECIFICATION extends LogKey
case object PARTITION_SIZE extends LogKey
case object PARTITION_SPECS extends LogKey
case object PATH extends LogKey
case object PATHS extends LogKey
Expand Down Expand Up @@ -575,6 +590,7 @@ private[spark] object LogKeys {
case object PROCESSING_TIME extends LogKey
case object PRODUCER_ID extends LogKey
case object PROPERTY_NAME extends LogKey
case object PROPORTIONAL extends LogKey
case object PROTOCOL_VERSION extends LogKey
case object PROVIDER extends LogKey
case object PUSHED_FILTERS extends LogKey
Expand All @@ -595,6 +611,8 @@ private[spark] object LogKeys {
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
case object RATE_LIMIT extends LogKey
case object RATIO extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
Expand Down Expand Up @@ -646,6 +664,8 @@ private[spark] object LogKeys {
case object RULE_NAME extends LogKey
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
case object SCALING_UP_RATIO extends LogKey
case object SCALING_DOWN_RATIO extends LogKey
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEDULING_MODE extends LogKey
case object SCHEMA extends LogKey
Expand All @@ -671,12 +691,14 @@ private[spark] object LogKeys {
case object SHUFFLE_SERVICE_NAME extends LogKey
case object SIGMAS_LENGTH extends LogKey
case object SIGNAL extends LogKey
case object SINK extends LogKey
case object SIZE extends LogKey
case object SLEEP_TIME extends LogKey
case object SLIDE_DURATION extends LogKey
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
case object SOURCE extends LogKey
case object SOURCE_PATH extends LogKey
case object SPARK_BRANCH extends LogKey
case object SPARK_BUILD_DATE extends LogKey
Expand Down Expand Up @@ -708,6 +730,7 @@ private[spark] object LogKeys {
case object STORAGE_LEVEL_REPLICATION extends LogKey
case object STORAGE_MEMORY_SIZE extends LogKey
case object STORE_ID extends LogKey
case object STREAMING_CONTEXT extends LogKey
case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey
case object STREAMING_DATA_SOURCE_NAME extends LogKey
case object STREAMING_OFFSETS_END extends LogKey
Expand All @@ -729,6 +752,7 @@ private[spark] object LogKeys {
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
case object TARGET_SIZE extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_INDEX extends LogKey
Expand All @@ -752,6 +776,7 @@ private[spark] object LogKeys {
case object THREAD_POOL_SIZE extends LogKey
case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
case object THRESHOLD extends LogKey
case object THRESH_TIME extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
Expand Down Expand Up @@ -814,4 +839,5 @@ private[spark] object LogKeys {
case object XML_SCHEDULING_MODE extends LogKey
case object XSD_PATH extends LogKey
case object YOUNG_GENERATION_GC extends LogKey
case object ZERO_TIME extends LogKey
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList
import scala.jdk.CollectionConverters._

import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan, StreamingQueryEventType}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.CloseableIterator
import org.apache.spark.sql.streaming.StreamingQueryListener.{Event, QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
Expand Down Expand Up @@ -115,7 +115,7 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
case StreamingQueryEventType.QUERY_TERMINATED_EVENT =>
postToAll(QueryTerminatedEvent.fromJson(event.getEventJson))
case _ =>
logWarning(s"Unknown StreamingQueryListener event: $event")
logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}")
}
})
}
Expand Down Expand Up @@ -144,7 +144,8 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
listener.onQueryIdle(t)
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
case _ => logWarning(s"Unknown StreamingQueryListener event: $event")
case _ => logWarning(log"Unknown StreamingQueryListener event: " +
log"${MDC(LogKeys.EVENT, event)}")
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.connect.service.ExecuteHolder

/**
Expand Down Expand Up @@ -83,20 +83,28 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
} catch {
case NonFatal(e) =>
logError(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Error sending listener added response.",
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
return
}
}
logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Server side listener added. Now blocking until " +
"all client side listeners are removed or there is error transmitting the event back.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until all client side listeners are " +
log"removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Server side listener long-running handling thread ended.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.Serializable
import scala.reflect.ClassTag

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -106,7 +106,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo
assertValid()
_isValid = false
_destroySite = Utils.getCallSite().shortForm
logInfo("Destroying %s (from %s)".format(toString, _destroySite))
logInfo(log"Destroying ${MDC(LogKeys.BROADCAST, toString)} " +
log"(from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, _destroySite)})")
doDestroy(blocking)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
logWarning(s"'spark.local.dir' should be set first when we use db in " +
s"ExternalShuffleService. Note that this only affects standalone mode.")
logWarning("'spark.local.dir' should be set first when we use db in " +
"ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.WORKER_URL
import org.apache.spark.rpc._

Expand Down Expand Up @@ -64,7 +64,7 @@ private[spark] class WorkerWatcher(
}

override def receive: PartialFunction[Any, Unit] = {
case e => logWarning(s"Received unexpected message: $e")
case e => logWarning(log"Received unexpected message: ${MDC(LogKeys.ERROR, e)}")
}

override def onConnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.google.common.cache.CacheBuilder

import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
Expand Down Expand Up @@ -436,8 +436,8 @@ private[spark] class IndexShuffleBlockResolver(
if (checksumTmp.exists()) {
try {
if (!checksumTmp.delete()) {
logError(s"Failed to delete temporary checksum file " +
s"at ${checksumTmp.getAbsolutePath}")
logError(log"Failed to delete temporary checksum file at " +
log"${MDC(LogKeys.PATH, checksumTmp.getAbsolutePath)}")
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream

Expand All @@ -36,7 +36,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
s" an fs (path: $path) with abortable stream support")
}

logInfo(s"Writing atomically to $path based on abortable stream")
logInfo(log"Writing atomically to ${MDC(LogKeys.PATH, path)} based on abortable stream")

class AbortableStreamBasedFSDataOutputStream(
fsDataOutputStream: FSDataOutputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ class LogisticRegression @Since("1.2.0") (
tol, fitIntercept, maxBlockSizeInMB)

if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
instr.logWarning("Input instances will be standardized, blockified to blocks, and " +
"then cached during training. Be careful of double caching!")
}

val instances = dataset.select(
Expand Down Expand Up @@ -569,8 +569,8 @@ class LogisticRegression @Since("1.2.0") (

val isConstantLabel = histogram.count(_ != 0.0) == 1
if ($(fitIntercept) && isConstantLabel && !usingBoundConstrainedOptimization) {
instr.logWarning(s"All labels are the same value and fitIntercept=true, so the " +
s"coefficients will be zeros. Training is not needed.")
instr.logWarning("All labels are the same value and fitIntercept=true, so the " +
"coefficients will be zeros. Training is not needed.")
val constantLabelIndex = Vectors.dense(histogram).argmax
val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures,
new Array[Int](numCoefficientSets + 1), Array.emptyIntArray, Array.emptyDoubleArray,
Expand All @@ -584,8 +584,8 @@ class LogisticRegression @Since("1.2.0") (
}

if (!$(fitIntercept) && isConstantLabel) {
instr.logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " +
s"dangerous ground, so the algorithm may not converge.")
instr.logWarning("All labels belong to a single class and fitIntercept=false. It's a " +
"dangerous ground, so the algorithm may not converge.")
}

val featuresMean = summarizer.mean.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,9 @@ private[spark] class ApplicationMaster(
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
log"""SparkContext did not initialize after waiting for
|${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms.
| Please check earlier log output for errors. Failing the application.""".stripMargin)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
Expand Down Expand Up @@ -690,7 +691,7 @@ private[spark] class ApplicationMaster(
}
} catch {
case ioe: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
logError(log"Failed to cleanup staging dir ${MDC(LogKeys.PATH, stagingDirPath)}", ioe)
}
}

Expand Down Expand Up @@ -736,7 +737,8 @@ private[spark] class ApplicationMaster(
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
logError(log"Could not find static main method in object " +
log"${MDC(LogKeys.CLASS_NAME, args.userClass)}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
Expand Down Expand Up @@ -866,7 +868,8 @@ private[spark] class ApplicationMaster(
finish(FinalApplicationStatus.FAILED, exitCode)
}
} else {
logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress")
logError(log"Application Master lost connection with driver! Shutting down. " +
log"${MDC(LogKeys.REMOTE_ADDRESS, remoteAddress)}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED)
}
}
Expand Down
Loading