Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private[spark] object LogKeys {
case object BARRIER_ID extends LogKey
case object BATCH_ID extends LogKey
case object BATCH_NAME extends LogKey
case object BATCH_TIMES extends LogKey
case object BATCH_TIMESTAMP extends LogKey
case object BATCH_WRITE extends LogKey
case object BIND_ADDRESS extends LogKey
Expand Down Expand Up @@ -213,6 +214,7 @@ private[spark] object LogKeys {
case object EFFECTIVE_STORAGE_LEVEL extends LogKey
case object ELAPSED_TIME extends LogKey
case object ENCODING extends LogKey
case object ENDPOINT_NAME extends LogKey
case object END_INDEX extends LogKey
case object END_POINT extends LogKey
case object END_VERSION extends LogKey
Expand Down Expand Up @@ -272,6 +274,7 @@ private[spark] object LogKeys {
case object FILE_NAME extends LogKey
case object FILE_NAME2 extends LogKey
case object FILE_NAME3 extends LogKey
case object FILE_NAMES extends LogKey
case object FILE_START_OFFSET extends LogKey
case object FILE_SYSTEM extends LogKey
case object FILE_VERSION extends LogKey
Expand Down Expand Up @@ -417,7 +420,10 @@ private[spark] object LogKeys {
case object MIN_VERSION_NUM extends LogKey
case object MISSING_PARENT_STAGES extends LogKey
case object MODEL_WEIGHTS extends LogKey
case object MODIFY_ACLS extends LogKey
case object MODIFY_ACLS_GROUPS extends LogKey
case object MODULE_NAME extends LogKey
case object NAME extends LogKey
case object NAMESPACE extends LogKey
case object NETWORK_IF extends LogKey
case object NEW_FEATURE_COLUMN_NAME extends LogKey
Expand All @@ -434,8 +440,10 @@ private[spark] object LogKeys {
case object NUM_ADDED_PARTITIONS extends LogKey
case object NUM_APPS extends LogKey
case object NUM_ATTEMPT extends LogKey
case object NUM_BATCHES extends LogKey
case object NUM_BIN extends LogKey
case object NUM_BLOCKS extends LogKey
case object NUM_BLOCK_IDS extends LogKey
case object NUM_BROADCAST_BLOCK extends LogKey
case object NUM_BYTES extends LogKey
case object NUM_BYTES_CURRENT extends LogKey
Expand Down Expand Up @@ -572,6 +580,7 @@ private[spark] object LogKeys {
case object PATH extends LogKey
case object PATHS extends LogKey
case object PEER extends LogKey
case object PENDING_TIMES extends LogKey
case object PERCENT extends LogKey
case object PIPELINE_STAGE_UID extends LogKey
case object PLUGIN_NAME extends LogKey
Expand Down Expand Up @@ -656,6 +665,7 @@ private[spark] object LogKeys {
case object RESOURCE_PROFILE_IDS extends LogKey
case object RESOURCE_PROFILE_TO_TOTAL_EXECS extends LogKey
case object RESPONSE_BODY_SIZE extends LogKey
case object RESTART_TIME extends LogKey
case object RESULT extends LogKey
case object RESULT_SIZE_BYTES extends LogKey
case object RESULT_SIZE_BYTES_MAX extends LogKey
Expand All @@ -669,6 +679,7 @@ private[spark] object LogKeys {
case object RPC_ADDRESS extends LogKey
case object RPC_ENDPOINT_REF extends LogKey
case object RPC_MESSAGE_CAPACITY extends LogKey
case object RPC_SSL_ENABLED extends LogKey
case object RULE_NAME extends LogKey
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
Expand Down Expand Up @@ -726,6 +737,7 @@ private[spark] object LogKeys {
case object STAGE_ID extends LogKey
case object STAGE_NAME extends LogKey
case object START_INDEX extends LogKey
case object START_TIME extends LogKey
case object STATEMENT_ID extends LogKey
case object STATE_STORE_ID extends LogKey
case object STATE_STORE_PROVIDER extends LogKey
Expand Down Expand Up @@ -812,6 +824,7 @@ private[spark] object LogKeys {
case object TRANSFER_TYPE extends LogKey
case object TREE_NODE extends LogKey
case object TRIGGER_INTERVAL extends LogKey
case object UI_ACLS extends LogKey
case object UI_FILTER extends LogKey
case object UI_FILTER_PARAMS extends LogKey
case object UI_PROXY_BASE extends LogKey
Expand All @@ -830,6 +843,8 @@ private[spark] object LogKeys {
case object UUID extends LogKey
case object VALUE extends LogKey
case object VERSION_NUM extends LogKey
case object VIEW_ACLS extends LogKey
case object VIEW_ACLS_GROUPS extends LogKey
case object VIRTUAL_CORES extends LogKey
case object VOCAB_SIZE extends LogKey
case object WAIT_RESULT_TIME extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -276,8 +276,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
if (params.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
logWarning(CUSTOM_GROUP_ID_ERROR_MESSAGE)
if (params.contains(GROUP_ID_PREFIX)) {
logWarning("Option 'groupIdPrefix' will be ignored as " +
s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.")
logWarning(log"Option groupIdPrefix will be ignored as " +
log"option kafka.${MDC(LogKeys.CONFIG, ConsumerConfig.GROUP_ID_CONFIG)} has been set.")
}
}

Expand Down
27 changes: 14 additions & 13 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -89,17 +89,18 @@ private[spark] class SecurityManager(
private val sslRpcEnabled = sparkConf.getBoolean(
"spark.ssl.rpc.enabled", false)

logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " +
(if (viewAcls.nonEmpty) viewAcls.mkString(", ") else "EMPTY") +
"; groups with view permissions: " +
(if (viewAclsGroups.nonEmpty) viewAclsGroups.mkString(", ") else "EMPTY") +
"; users with modify permissions: " +
(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
"; groups with modify permissions: " +
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
"; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled"))
logInfo(log"SecurityManager: authentication ${MDC(LogKeys.AUTH_ENABLED,
if (authOn) "enabled" else "disabled")}" +
log"; ui acls ${MDC(LogKeys.UI_ACLS, if (aclsOn) "enabled" else "disabled")}" +
log"; users with view permissions: ${MDC(LogKeys.VIEW_ACLS,
if (viewAcls.nonEmpty) viewAcls.mkString(", ")
else "EMPTY")} groups with view permissions: ${MDC(LogKeys.VIEW_ACLS_GROUPS,
if (viewAclsGroups.nonEmpty) viewAclsGroups.mkString(", ") else "EMPTY")}" +
log"; users with modify permissions: ${MDC(LogKeys.MODIFY_ACLS,
if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY")}" +
log"; groups with modify permissions: ${MDC(LogKeys.MODIFY_ACLS_GROUPS,
if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY")}" +
log"; RPC SSL ${MDC(LogKeys.RPC_SSL_ENABLED, if (sslRpcEnabled) "enabled" else "disabled")}")

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
Expand Down Expand Up @@ -213,7 +214,7 @@ private[spark] class SecurityManager(

def setAcls(aclSetting: Boolean): Unit = {
aclsOn = aclSetting
logInfo("Changing acls enabled to: " + aclsOn)
logInfo(log"Changing acls enabled to: ${MDC(LogKeys.AUTH_ENABLED, aclsOn)}")
}

def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ object SparkEnv extends Logging {
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
logInfo(log"Registering ${MDC(LogKeys.ENDPOINT_NAME, name)}")
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.Try
import org.apache.hadoop.fs.FileSystem

import org.apache.spark.SparkConf
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.PATH
import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS

Expand Down Expand Up @@ -66,7 +66,7 @@ private[spark] object ShutdownHookManager extends Logging {
// shutdownDeletePaths as we are traversing through it.
shutdownDeletePaths.toArray.foreach { dirPath =>
try {
logInfo("Deleting directory " + dirPath)
logInfo(log"Deleting directory ${MDC(LogKeys.PATH, dirPath)}")
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
logInfo("There is nothing to roll.")
}
case _ =>
logWarning("This plugin expects " +
logWarning(log"This plugin expects " +
log"${MDC(CLASS_NAME, classOf[KubernetesClusterSchedulerBackend].getSimpleName)}.")
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -913,11 +913,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
val correlatedCols = AttributeSet(subqueryColumns)
val invalidColsLegacy = groupByCols -- correlatedCols
if (!nonEquivalentGroupByCols.isEmpty && invalidColsLegacy.isEmpty) {
logWarning("Using legacy behavior for " +
s"${SQLConf.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE
.key}. Query would be rejected with non-legacy behavior but is allowed by " +
s"legacy behavior. Query may be invalid and return wrong results if the scalar " +
s"subquery's group-by outputs multiple rows.")
logWarning(log"Using legacy behavior for " +
log"${MDC(LogKeys.CONFIG, SQLConf
.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE.key)}. " +
log"Query would be rejected with non-legacy behavior but is allowed by " +
log"legacy behavior. Query may be invalid and return wrong results if the scalar " +
log"subquery's group-by outputs multiple rows.")
}
invalidColsLegacy
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import scala.collection.mutable
import scala.collection.parallel.immutable.ParVector

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -142,19 +142,19 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}

def updateCheckpointData(time: Time): Unit = {
logInfo("Updating checkpoint data for time " + time)
logInfo(log"Updating checkpoint data for time ${MDC(LogKeys.TIME, time)}")
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
logInfo("Updated checkpoint data for time " + time)
logInfo(log"Updated checkpoint data for time ${MDC(LogKeys.TIME, time)}")
}

def clearCheckpointData(time: Time): Unit = {
logInfo("Clearing checkpoint data for time " + time)
logInfo(log"Clearing checkpoint data for time ${MDC(LogKeys.TIME, time)}")
this.synchronized {
outputStreams.foreach(_.clearCheckpointData(time))
}
logInfo("Cleared checkpoint data for time " + time)
logInfo(log"Cleared checkpoint data for time ${MDC(LogKeys.TIME, time)}")
}

def restoreCheckpointData(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.reflect.ClassTag

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{PATH, TIME}
import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -89,7 +89,8 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
}
if (fileSystem.delete(path, true)) {
logInfo("Deleted checkpoint file '" + file + "' for time " + time)
logInfo(log"Deleted checkpoint file ${MDC(LogKeys.FILE_NAME, file)} " +
log"for time ${MDC(LogKeys.TIME, time)}")
} else {
logWarning(log"Error deleting old checkpoint file '${MDC(PATH, file)}' for time " +
log"${MDC(TIME, time)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.internal.LogKeys.{ELAPSED_TIME, PATH}
import org.apache.spark.internal.MDC
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
Expand Down Expand Up @@ -169,8 +169,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
batchTimeToSelectedFiles --= oldFiles.keys
recentlySelectedFiles --= oldFiles.values.flatten
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logInfo(log"Cleared ${MDC(LogKeys.COUNT, oldFiles.size)} old files that were older " +
log"than ${MDC(LogKeys.TIME, time - rememberDuration)}: " +
log"${MDC(LogKeys.FILES, oldFiles.keys.mkString(", "))}")
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
Expand Down Expand Up @@ -341,8 +342,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
case (t, f) =>
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
logInfo(log"Restoring files for time ${MDC(LogKeys.TIME, t)} - " +
log"${MDC(LogKeys.FILES, f.mkString("[", ", ", "]"))}")
batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f.toImmutableArraySeq)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.ArrayBlockingQueue

import scala.reflect.ClassTag

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
Expand Down Expand Up @@ -57,11 +57,11 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)

def onStart(): Unit = {
// Open a socket to the target address and keep reading from it
logInfo("Connecting to " + host + ":" + port)
logInfo(log"Connecting to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, port)}")
val channel = SocketChannel.open()
channel.configureBlocking(true)
channel.connect(new InetSocketAddress(host, port))
logInfo("Connected to " + host + ":" + port)
logInfo(log"Connected to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT, port)}")

val queue = new ArrayBlockingQueue[ByteBuffer](2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.STATUS
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.StreamingConf.BLOCK_INTERVAL
import org.apache.spark.streaming.util.RecurringTimer
Expand Down Expand Up @@ -276,12 +276,13 @@ private[streaming] class BlockGenerator(
}

// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
logInfo(log"Pushing out the last " +
log"${MDC(LogKeys.NUM_BLOCK_IDS, blocksForPushing.size())} blocks")
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
logInfo(log"Blocks left to push ${MDC(LogKeys.NUM_BLOCK_IDS, blocksForPushing.size())}")
}
logInfo("Stopped block pushing thread")
} catch {
Expand All @@ -299,6 +300,6 @@ private[streaming] class BlockGenerator(

private def pushBlock(block: Block): Unit = {
listener.onPushBlock(block.id, block.buffer)
logInfo("Pushed block " + block.id)
logInfo(log"Pushed block ${MDC(LogKeys.BLOCK_ID, block.id)}")
}
}
Loading