Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ object LogKeys {
case object ADMIN_ACLS extends LogKey
case object ADMIN_ACL_GROUPS extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALIGNED_FROM_TIME extends LogKey
case object ALIGNED_TO_TIME extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
case object APP_ATTEMPT_ID extends LogKey
Expand All @@ -77,8 +79,10 @@ object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
case object AVG_BATCH_PROC_TIME extends LogKey
case object BACKUP_FILE extends LogKey
case object BARRIER_EPOCH extends LogKey
case object BARRIER_ID extends LogKey
Expand All @@ -98,6 +102,8 @@ object LogKeys {
case object BROADCAST_ID extends LogKey
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object BUCKET extends LogKey
case object BYTE_BUFFER extends LogKey
case object BYTE_SIZE extends LogKey
case object BYTECODE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
Expand All @@ -108,7 +114,9 @@ object LogKeys {
case object CANCEL_FUTURE_JOBS extends LogKey
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_DIR extends LogKey
case object CHECKPOINT_FILE extends LogKey
case object CHECKPOINT_INTERVAL extends LogKey
case object CHECKPOINT_LOCATION extends LogKey
case object CHECKPOINT_PATH extends LogKey
case object CHECKPOINT_ROOT extends LogKey
Expand Down Expand Up @@ -186,17 +194,20 @@ object LogKeys {
case object DELEGATE extends LogKey
case object DELTA extends LogKey
case object DEPRECATED_KEY extends LogKey
case object DERIVATIVE extends LogKey
case object DESCRIPTION extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DESIRED_TREE_DEPTH extends LogKey
case object DESTINATION_PATH extends LogKey
case object DFS_FILE extends LogKey
case object DIFF_DELTA extends LogKey
case object DIRECTORY extends LogKey
case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
case object DRIVER_ID extends LogKey
case object DRIVER_MEMORY_SIZE extends LogKey
case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
case object DSTREAM extends LogKey
case object DURATION extends LogKey
case object EARLIEST_LOADED_VERSION extends LogKey
case object EFFECTIVE_STORAGE_LEVEL extends LogKey
Expand Down Expand Up @@ -242,6 +253,7 @@ object LogKeys {
case object EXPR extends LogKey
case object EXPR_TERMS extends LogKey
case object EXTENDED_EXPLAIN_GENERATOR extends LogKey
case object FAILED_ARTIFACTS extends LogKey
case object FAILED_STAGE extends LogKey
case object FAILED_STAGE_NAME extends LogKey
case object FAILURES extends LogKey
Expand All @@ -251,6 +263,7 @@ object LogKeys {
case object FEATURE_NAME extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
case object FILES extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
case object FILE_END_OFFSET extends LogKey
case object FILE_FORMAT extends LogKey
Expand Down Expand Up @@ -307,6 +320,7 @@ object LogKeys {
case object INIT_MODE extends LogKey
case object INPUT extends LogKey
case object INPUT_SPLIT extends LogKey
case object INTEGRAL extends LogKey
case object INTERVAL extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object ISSUE_DATE extends LogKey
Expand Down Expand Up @@ -347,6 +361,8 @@ object LogKeys {
case object LOCATION extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_DIRECTORY extends LogKey
case object LOG_FILES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_LEVEL extends LogKey
Expand Down Expand Up @@ -394,6 +410,7 @@ object LogKeys {
case object MIN_COMPACTION_BATCH_ID extends LogKey
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_RATE extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
Expand Down Expand Up @@ -490,6 +507,7 @@ object LogKeys {
case object NUM_PREFIXES extends LogKey
case object NUM_PRUNED extends LogKey
case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey
case object NUM_RECEIVERS extends LogKey
case object NUM_RECORDS_READ extends LogKey
case object NUM_RELEASED_LOCKS extends LogKey
case object NUM_REMAINED extends LogKey
Expand Down Expand Up @@ -575,6 +593,7 @@ object LogKeys {
case object PROCESSING_TIME extends LogKey
case object PRODUCER_ID extends LogKey
case object PROPERTY_NAME extends LogKey
case object PROPORTIONAL extends LogKey
case object PROTOCOL_VERSION extends LogKey
case object PROVIDER extends LogKey
case object PUSHED_FILTERS extends LogKey
Expand All @@ -595,6 +614,7 @@ object LogKeys {
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
case object RATIO extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
Expand Down Expand Up @@ -646,10 +666,14 @@ object LogKeys {
case object RULE_NAME extends LogKey
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
case object SCALING_UP_RATIO extends LogKey
case object SCALING_DOWN_RATIO extends LogKey
case object SCALING_INTERVAL_SECS extends LogKey
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEDULING_MODE extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
case object SENDER_ADDRESS extends LogKey
case object SERVER_NAME extends LogKey
case object SERVICE_NAME extends LogKey
case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
Expand Down Expand Up @@ -677,6 +701,7 @@ object LogKeys {
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
case object SOURCE extends LogKey
case object SOURCE_PATH extends LogKey
case object SPARK_BRANCH extends LogKey
case object SPARK_BUILD_DATE extends LogKey
Expand Down Expand Up @@ -751,6 +776,7 @@ object LogKeys {
case object THREAD_POOL_SIZE extends LogKey
case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
case object THRESHOLD extends LogKey
case object THRESH_TIME extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
Expand Down Expand Up @@ -792,6 +818,7 @@ object LogKeys {
case object URL extends LogKey
case object URL2 extends LogKey
case object URLS extends LogKey
case object USER_CLASS extends LogKey
case object USER_ID extends LogKey
case object USER_NAME extends LogKey
case object UUID extends LogKey
Expand All @@ -801,6 +828,7 @@ object LogKeys {
case object VOCAB_SIZE extends LogKey
case object WAIT_RESULT_TIME extends LogKey
case object WAIT_SEND_TIME extends LogKey
case object WAIT_TIME extends LogKey
case object WATERMARK_CONSTRAINT extends LogKey
case object WEB_URL extends LogKey
case object WEIGHT extends LogKey
Expand All @@ -814,4 +842,5 @@ object LogKeys {
case object XML_SCHEDULING_MODE extends LogKey
case object XSD_PATH extends LogKey
case object YOUNG_GENERATION_GC extends LogKey
case object ZERO_TIME extends LogKey
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.ArrayImplicits._

/** Provides utility functions to be used inside SparkSubmit. */
Expand Down Expand Up @@ -215,7 +215,7 @@ private[spark] object MavenUtils extends Logging {
if (artifactInfo.getExt == "jar") {
true
} else {
logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
logInfo(log"Skipping non-jar dependency ${MDC(LogKeys.ARTIFACT_ID, artifactInfo.getId)}")
false
}
}
Expand Down Expand Up @@ -515,8 +515,9 @@ private[spark] object MavenUtils extends Logging {
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
s"attempt to retry while skipping local-m2-cache.")
logInfo(log"Download failed: ${MDC(LogKeys.FAILED_ARTIFACTS,
failedArtifacts.mkString("[", ", ", "]"))}, " +
log"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList
import scala.jdk.CollectionConverters._

import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan, StreamingQueryEventType}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.CloseableIterator
import org.apache.spark.sql.streaming.StreamingQueryListener.{Event, QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
Expand Down Expand Up @@ -115,7 +115,7 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
case StreamingQueryEventType.QUERY_TERMINATED_EVENT =>
postToAll(QueryTerminatedEvent.fromJson(event.getEventJson))
case _ =>
logWarning(s"Unknown StreamingQueryListener event: $event")
logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}")
}
})
}
Expand Down Expand Up @@ -144,7 +144,8 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
listener.onQueryIdle(t)
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
case _ => logWarning(s"Unknown StreamingQueryListener event: $event")
case _ => logWarning(log"Unknown StreamingQueryListener event: " +
log"${MDC(LogKeys.EVENT, event)}")
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.connect.service.ExecuteHolder

/**
Expand Down Expand Up @@ -83,20 +83,27 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
} catch {
case NonFatal(e) =>
logError(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Error sending listener added response.",
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
return
}
}
logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Server side listener added. Now blocking until " +
"all client side listeners are removed or there is error transmitting the event back.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until all client side listeners are " +
log"removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Server side listener long-running handling thread ended.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE, executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.Serializable
import scala.reflect.ClassTag

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -106,7 +106,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo
assertValid()
_isValid = false
_destroySite = Utils.getCallSite().shortForm
logInfo("Destroying %s (from %s)".format(toString, _destroySite))
logInfo(log"Destroying ${MDC(LogKeys.TO_STRING, toString)} " +
log"(from ${MDC(LogKeys.DESTROY_SITE, _destroySite)})")
doDestroy(blocking)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
logWarning(s"'spark.local.dir' should be set first when we use db in " +
s"ExternalShuffleService. Note that this only affects standalone mode.")
logWarning(log"'spark.local.dir' should be set first when we use db in " +
log"ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.WORKER_URL
import org.apache.spark.rpc._

Expand Down Expand Up @@ -64,7 +64,7 @@ private[spark] class WorkerWatcher(
}

override def receive: PartialFunction[Any, Unit] = {
case e => logWarning(s"Received unexpected message: $e")
case e => logWarning(log"Received unexpected message: ${MDC(LogKeys.ERROR, e)}")
}

override def onConnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.google.common.cache.CacheBuilder

import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
Expand Down Expand Up @@ -436,8 +436,8 @@ private[spark] class IndexShuffleBlockResolver(
if (checksumTmp.exists()) {
try {
if (!checksumTmp.delete()) {
logError(s"Failed to delete temporary checksum file " +
s"at ${checksumTmp.getAbsolutePath}")
logError(log"Failed to delete temporary checksum file at " +
log"${MDC(LogKeys.FILE_ABSOLUTE_PATH, checksumTmp.getAbsolutePath)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just use PATH

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see LogKey.scala has this key which is used in other files: case object FILE_ABSOLUTE_PATH extends LogKey

should we migrate these other occurrences to PATH ? @gengliangwang

}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream

Expand All @@ -36,7 +36,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
s" an fs (path: $path) with abortable stream support")
}

logInfo(s"Writing atomically to $path based on abortable stream")
logInfo(log"Writing atomically to ${MDC(LogKeys.PATH, path)} based on abortable stream")

class AbortableStreamBasedFSDataOutputStream(
fsDataOutputStream: FSDataOutputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ class LogisticRegression @Since("1.2.0") (
tol, fitIntercept, maxBlockSizeInMB)

if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
instr.logWarning(log"Input instances will be standardized, blockified to blocks, and " +
log"then cached during training. Be careful of double caching!")
}

val instances = dataset.select(
Expand Down Expand Up @@ -569,8 +569,8 @@ class LogisticRegression @Since("1.2.0") (

val isConstantLabel = histogram.count(_ != 0.0) == 1
if ($(fitIntercept) && isConstantLabel && !usingBoundConstrainedOptimization) {
instr.logWarning(s"All labels are the same value and fitIntercept=true, so the " +
s"coefficients will be zeros. Training is not needed.")
instr.logWarning(log"All labels are the same value and fitIntercept=true, so the " +
log"coefficients will be zeros. Training is not needed.")
val constantLabelIndex = Vectors.dense(histogram).argmax
val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures,
new Array[Int](numCoefficientSets + 1), Array.emptyIntArray, Array.emptyDoubleArray,
Expand All @@ -584,8 +584,8 @@ class LogisticRegression @Since("1.2.0") (
}

if (!$(fitIntercept) && isConstantLabel) {
instr.logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " +
s"dangerous ground, so the algorithm may not converge.")
instr.logWarning(log"All labels belong to a single class and fitIntercept=false. It's a " +
log"dangerous ground, so the algorithm may not converge.")
}

val featuresMean = summarizer.mean.toArray
Expand Down
Loading