Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ private void transferAllOutstanding() {
if (numRetries > 0) {
logger.error("Exception while beginning {} of {} outstanding blocks (after {} retries)", e,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length),
MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, blockIdsToTransfer.length),
MDC.of(LogKeys.NUM_RETRY$.MODULE$, numRetries));
} else {
logger.error("Exception while beginning {} of {} outstanding blocks", e,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length));
MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, blockIdsToTransfer.length));
}
if (shouldRetry(e) && initiateRetry(e)) {
// successfully initiated a retry
Expand Down Expand Up @@ -219,7 +219,7 @@ synchronized boolean initiateRetry(Throwable e) {
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_RETRY$.MODULE$, retryCount),
MDC.of(LogKeys.MAX_ATTEMPTS$.MODULE$, maxRetries),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, outstandingBlocksIds.size()),
MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, outstandingBlocksIds.size()),
MDC.of(LogKeys.RETRY_WAIT_TIME$.MODULE$, retryWaitTime));

try {
Expand Down
51 changes: 47 additions & 4 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object LogKeys {
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
case object CALL_SITE_LONG_FORM extends LogKey
case object CALL_SITE_SHORT_FORM extends LogKey
case object CANCEL_FUTURE_JOBS extends LogKey
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_FILE extends LogKey
Expand Down Expand Up @@ -118,10 +119,10 @@ object LogKeys {
case object CONTAINER_ID extends LogKey
case object CONTAINER_STATE extends LogKey
case object CONTEXT extends LogKey
case object CONTEXT_CREATION_SITE extends LogKey
case object COST extends LogKey
case object COUNT extends LogKey
case object CREATED_POOL_NAME extends LogKey
case object CREATION_SITE extends LogKey
case object CREDENTIALS_RENEWAL_INTERVAL_RATIO extends LogKey
case object CROSS_VALIDATION_METRIC extends LogKey
case object CROSS_VALIDATION_METRICS extends LogKey
Expand All @@ -132,8 +133,9 @@ object LogKeys {
case object CSV_SCHEMA_FIELD_NAMES extends LogKey
case object CSV_SOURCE extends LogKey
case object CURRENT_BATCH_ID extends LogKey
case object CURRENT_DISK_SIZE extends LogKey
case object CURRENT_FILE extends LogKey
case object CURRENT_MEMORY_BYTES extends LogKey
case object CURRENT_MEMORY_SIZE extends LogKey
case object CURRENT_PATH extends LogKey
case object CURRENT_TIME extends LogKey
case object DATA extends LogKey
Expand All @@ -146,7 +148,6 @@ object LogKeys {
case object DEFAULT_COMPACT_INTERVAL extends LogKey
case object DEFAULT_ISOLATION_LEVEL extends LogKey
case object DEFAULT_NAME extends LogKey
case object DEFAULT_SCHEDULING_MODE extends LogKey
case object DEFAULT_VALUE extends LogKey
case object DELAY extends LogKey
case object DELEGATE extends LogKey
Expand Down Expand Up @@ -207,6 +208,8 @@ object LogKeys {
case object EXPR extends LogKey
case object EXPR_TERMS extends LogKey
case object EXTENDED_EXPLAIN_GENERATOR extends LogKey
case object FAILED_STAGE extends LogKey
case object FAILED_STAGE_NAME extends LogKey
case object FAILURES extends LogKey
case object FALLBACK_VERSION extends LogKey
case object FEATURE_COLUMN extends LogKey
Expand All @@ -231,6 +234,7 @@ object LogKeys {
case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
case object FROM_TIME extends LogKey
case object FUNCTION_NAME extends LogKey
Expand All @@ -250,6 +254,7 @@ object LogKeys {
case object HIVE_OPERATION_STATE extends LogKey
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
case object HOST_LOCAL_BLOCKS_SIZE extends LogKey
case object HOST_NAMES extends LogKey
case object HOST_PORT extends LogKey
case object HOST_PORT2 extends LogKey
Expand All @@ -265,6 +270,7 @@ object LogKeys {
case object INITIAL_HEARTBEAT_INTERVAL extends LogKey
case object INIT_MODE extends LogKey
case object INPUT extends LogKey
case object INPUT_SPLIT extends LogKey
case object INTERVAL extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object ISSUE_DATE extends LogKey
Expand Down Expand Up @@ -299,11 +305,14 @@ object LogKeys {
case object LOAD_FACTOR extends LogKey
case object LOAD_TIME extends LogKey
case object LOCALE extends LogKey
case object LOCAL_BLOCKS_SIZE extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOWER_BOUND extends LogKey
Expand Down Expand Up @@ -351,6 +360,7 @@ object LogKeys {
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
case object MIN_VERSION_NUM extends LogKey
case object MISSING_PARENT_STAGES extends LogKey
case object MODEL_WEIGHTS extends LogKey
case object MODULE_NAME extends LogKey
case object NAMESPACE extends LogKey
Expand All @@ -368,8 +378,9 @@ object LogKeys {
case object NORM extends LogKey
case object NUM_ADDED_PARTITIONS extends LogKey
case object NUM_APPS extends LogKey
case object NUM_ATTEMPT extends LogKey
case object NUM_BIN extends LogKey
case object NUM_BLOCK_IDS extends LogKey
case object NUM_BLOCKS extends LogKey
case object NUM_BROADCAST_BLOCK extends LogKey
case object NUM_BYTES extends LogKey
case object NUM_BYTES_CURRENT extends LogKey
Expand All @@ -388,12 +399,16 @@ object LogKeys {
case object NUM_CORES extends LogKey
case object NUM_DATA_FILE extends LogKey
case object NUM_DATA_FILES extends LogKey
case object NUM_DECOMMISSIONED extends LogKey
case object NUM_DRIVERS extends LogKey
case object NUM_DROPPED_PARTITIONS extends LogKey
case object NUM_EFFECTIVE_RULE_OF_RUNS extends LogKey
case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
case object NUM_EVENTS extends LogKey
case object NUM_EXAMPLES extends LogKey
case object NUM_EXECUTORS extends LogKey
case object NUM_EXECUTORS_EXITED extends LogKey
case object NUM_EXECUTORS_KILLED extends LogKey
case object NUM_EXECUTOR_CORES extends LogKey
case object NUM_EXECUTOR_CORES_REMAINING extends LogKey
case object NUM_EXECUTOR_CORES_TOTAL extends LogKey
Expand All @@ -407,6 +422,7 @@ object LogKeys {
case object NUM_FILES_FAILED_TO_DELETE extends LogKey
case object NUM_FILES_REUSED extends LogKey
case object NUM_FREQUENT_ITEMS extends LogKey
case object NUM_HOST_LOCAL_BLOCKS extends LogKey
case object NUM_INDEX_FILE extends LogKey
case object NUM_INDEX_FILES extends LogKey
case object NUM_ITERATIONS extends LogKey
Expand All @@ -415,8 +431,10 @@ object LogKeys {
case object NUM_LEADING_SINGULAR_VALUES extends LogKey
case object NUM_LEFT_PARTITION_VALUES extends LogKey
case object NUM_LOADED_ENTRIES extends LogKey
case object NUM_LOCAL_BLOCKS extends LogKey
case object NUM_LOCAL_DIRS extends LogKey
case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey
case object NUM_MERGERS extends LogKey
case object NUM_MERGER_LOCATIONS extends LogKey
case object NUM_META_FILES extends LogKey
case object NUM_NODES extends LogKey
Expand All @@ -433,7 +451,10 @@ object LogKeys {
case object NUM_POINT extends LogKey
case object NUM_PREFIXES extends LogKey
case object NUM_PRUNED extends LogKey
case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey
case object NUM_RECORDS_READ extends LogKey
case object NUM_REMAINED extends LogKey
case object NUM_REMOTE_BLOCKS extends LogKey
case object NUM_REMOVED_WORKERS extends LogKey
case object NUM_REPLICAS extends LogKey
case object NUM_REQUESTS extends LogKey
Expand All @@ -449,11 +470,14 @@ object LogKeys {
case object NUM_SPILL_INFOS extends LogKey
case object NUM_SPILL_WRITERS extends LogKey
case object NUM_SUB_DIRS extends LogKey
case object NUM_SUCCESSFUL_TASKS extends LogKey
case object NUM_TASKS extends LogKey
case object NUM_TASK_CPUS extends LogKey
case object NUM_TRAIN_WORD extends LogKey
case object NUM_UNFINISHED_DECOMMISSIONED extends LogKey
case object NUM_VERSIONS_RETAIN extends LogKey
case object NUM_WEIGHTED_EXAMPLES extends LogKey
case object NUM_WORKERS extends LogKey
case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey
case object OBJECT_ID extends LogKey
case object OFFSET extends LogKey
Expand All @@ -470,16 +494,20 @@ object LogKeys {
case object OPTIONS extends LogKey
case object OP_ID extends LogKey
case object OP_TYPE extends LogKey
case object ORIGINAL_DISK_SIZE extends LogKey
case object ORIGINAL_MEMORY_SIZE extends LogKey
case object OS_ARCH extends LogKey
case object OS_NAME extends LogKey
case object OS_VERSION extends LogKey
case object OUTPUT extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PAGE_SIZE extends LogKey
case object PARENT_STAGES extends LogKey
case object PARSE_MODE extends LogKey
case object PARTITIONED_FILE_READER extends LogKey
case object PARTITIONER extends LogKey
case object PARTITION_ID extends LogKey
case object PARTITION_IDS extends LogKey
case object PARTITION_SPECIFICATION extends LogKey
case object PARTITION_SPECS extends LogKey
case object PATH extends LogKey
Expand Down Expand Up @@ -511,12 +539,14 @@ object LogKeys {
case object PROTOCOL_VERSION extends LogKey
case object PROVIDER extends LogKey
case object PUSHED_FILTERS extends LogKey
case object PUSH_MERGED_LOCAL_BLOCKS_SIZE extends LogKey
case object PVC_METADATA_NAME extends LogKey
case object PYTHON_EXEC extends LogKey
case object PYTHON_PACKAGES extends LogKey
case object PYTHON_VERSION extends LogKey
case object PYTHON_WORKER_MODULE extends LogKey
case object PYTHON_WORKER_RESPONSE extends LogKey
case object QUANTILES extends LogKey
case object QUERY_CACHE_VALUE extends LogKey
case object QUERY_HINT extends LogKey
case object QUERY_ID extends LogKey
Expand All @@ -542,11 +572,13 @@ object LogKeys {
case object REDACTED_STATEMENT extends LogKey
case object REDUCE_ID extends LogKey
case object REGISTERED_EXECUTOR_FILE extends LogKey
case object REGISTER_MERGE_RESULTS extends LogKey
case object RELATION_NAME extends LogKey
case object RELATION_OUTPUT extends LogKey
case object RELATIVE_TOLERANCE extends LogKey
case object REMAINING_PARTITIONS extends LogKey
case object REMOTE_ADDRESS extends LogKey
case object REMOTE_BLOCKS_SIZE extends LogKey
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
Expand Down Expand Up @@ -574,6 +606,7 @@ object LogKeys {
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEDULING_MODE extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
case object SERVER_NAME extends LogKey
Expand Down Expand Up @@ -615,13 +648,17 @@ object LogKeys {
case object SPILL_TIMES extends LogKey
case object SQL_TEXT extends LogKey
case object SRC_PATH extends LogKey
case object STAGE extends LogKey
case object STAGES extends LogKey
case object STAGE_ATTEMPT extends LogKey
case object STAGE_ID extends LogKey
case object STAGE_NAME extends LogKey
case object START_INDEX extends LogKey
case object STATEMENT_ID extends LogKey
case object STATE_STORE_ID extends LogKey
case object STATE_STORE_PROVIDER extends LogKey
case object STATE_STORE_VERSION extends LogKey
case object STATS extends LogKey
case object STATUS extends LogKey
case object STDERR extends LogKey
case object STOP_SITE_SHORT_FORM extends LogKey
Expand All @@ -647,13 +684,18 @@ object LogKeys {
case object TABLE_NAME extends LogKey
case object TABLE_TYPE extends LogKey
case object TABLE_TYPES extends LogKey
case object TAG extends LogKey
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_LOCALITY extends LogKey
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
case object TASK_RESOURCE_ASSIGNMENTS extends LogKey
case object TASK_SET_ID extends LogKey
case object TASK_SET_MANAGER extends LogKey
case object TASK_SET_NAME extends LogKey
case object TASK_STATE extends LogKey
case object TEMP_FILE extends LogKey
Expand Down Expand Up @@ -685,6 +727,7 @@ object LogKeys {
case object TOPIC_PARTITION_OFFSET_RANGE extends LogKey
case object TOTAL extends LogKey
case object TOTAL_EFFECTIVE_TIME extends LogKey
case object TOTAL_SIZE extends LogKey
case object TOTAL_TIME extends LogKey
case object TOTAL_TIME_READ extends LogKey
case object TO_TIME extends LogKey
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2930,7 +2930,7 @@ object SparkContext extends Logging {
log" constructor). This may indicate an error, since only one SparkContext should be" +
log" running in this JVM (see SPARK-2243)." +
log" The other SparkContext was created at:\n" +
log"${MDC(LogKeys.CONTEXT_CREATION_SITE, otherContextCreationSite)}"
log"${MDC(LogKeys.CREATION_SITE, otherContextCreationSite)}"
logWarning(warnMsg)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils

Expand All @@ -51,7 +51,8 @@ class LocalSparkCluster private (
private val workerDirs = ArrayBuffer[String]()

def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
logInfo(log"Starting a local Spark cluster with " +
log"${MDC(LogKeys.NUM_WORKERS, numWorkers)} workers.")

// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.BUFFER_SIZE
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -142,8 +142,9 @@ private[spark] class SparkHadoopUtil extends Logging {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
} else {
logInfo("Attempting to login to Kerberos " +
s"using principal: ${principalName} and keytab: ${keytabFilename}")
logInfo(log"Attempting to login to Kerberos using principal: " +
log"${MDC(LogKeys.PRINCIPAL, principalName)} and keytab: " +
log"${MDC(LogKeys.KEYTAB, keytabFilename)}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
Expand Down Expand Up @@ -105,7 +105,8 @@ private[spark] class StandaloneAppClient(
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
logInfo(
log"Connecting to master ${MDC(LogKeys.MASTER_URL, masterAddress.toSparkURL)}...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
Expand Down Expand Up @@ -175,14 +176,16 @@ private[spark] class StandaloneAppClient(

case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
val fullId = s"$appId/$id"
logInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort,
cores))
logInfo(log"Executor added: ${MDC(LogKeys.EXECUTOR_ID, fullId)} on " +
log"${MDC(LogKeys.WORKER_ID, workerId)} (${MDC(LogKeys.HOST_PORT, hostPort)}) " +
log"with ${MDC(LogKeys.NUM_CORES, cores)} core(s)")
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus, workerHost) =>
val fullId = s"$appId/$id"
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
logInfo(log"Executor updated: ${MDC(LogKeys.EXECUTOR_ID, fullId)} is now " +
log"${MDC(LogKeys.EXECUTOR_STATE, state)}${MDC(LogKeys.MESSAGE, messageText)}")
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost)
} else if (state == ExecutorState.DECOMMISSIONED) {
Expand All @@ -191,11 +194,13 @@ private[spark] class StandaloneAppClient(
}

case WorkerRemoved(id, host, message) =>
logInfo("Master removed worker %s: %s".format(id, message))
logInfo(log"Master removed worker ${MDC(LogKeys.WORKER_ID, id)}: " +
log"${MDC(LogKeys.MESSAGE, message)}")
listener.workerRemoved(id, host, message)

case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
logInfo(log"Master has changed, new master is at " +
log"${MDC(LogKeys.MASTER_URL, masterRef.address.toSparkURL)}")
master = Some(masterRef)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId.get))
Expand Down
Loading