Skip to content

Commit 75709f5

Browse files
asl3gengliangwang
authored andcommitted
[SPARK-48623][CORE] Structured logging migrations [Part 2]
### What changes were proposed in this pull request? This PR makes additional Scala logging migrations to comply with the scala style changes in #46947 ### Why are the changes needed? This makes development and PR review of the structured logging migration easier. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested by ensuring dev/scalastyle checks pass ### Was this patch authored or co-authored using generative AI tooling? No Closes #47256 from asl3/morestructuredloggingmigrations. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 97ce5b8 commit 75709f5

File tree

16 files changed

+93
-67
lines changed

16 files changed

+93
-67
lines changed

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ private[spark] object LogKeys {
9090
case object BARRIER_ID extends LogKey
9191
case object BATCH_ID extends LogKey
9292
case object BATCH_NAME extends LogKey
93+
case object BATCH_TIMES extends LogKey
9394
case object BATCH_TIMESTAMP extends LogKey
9495
case object BATCH_WRITE extends LogKey
9596
case object BIND_ADDRESS extends LogKey
@@ -213,6 +214,7 @@ private[spark] object LogKeys {
213214
case object EFFECTIVE_STORAGE_LEVEL extends LogKey
214215
case object ELAPSED_TIME extends LogKey
215216
case object ENCODING extends LogKey
217+
case object ENDPOINT_NAME extends LogKey
216218
case object END_INDEX extends LogKey
217219
case object END_POINT extends LogKey
218220
case object END_VERSION extends LogKey
@@ -272,6 +274,7 @@ private[spark] object LogKeys {
272274
case object FILE_NAME extends LogKey
273275
case object FILE_NAME2 extends LogKey
274276
case object FILE_NAME3 extends LogKey
277+
case object FILE_NAMES extends LogKey
275278
case object FILE_START_OFFSET extends LogKey
276279
case object FILE_SYSTEM extends LogKey
277280
case object FILE_VERSION extends LogKey
@@ -417,7 +420,10 @@ private[spark] object LogKeys {
417420
case object MIN_VERSION_NUM extends LogKey
418421
case object MISSING_PARENT_STAGES extends LogKey
419422
case object MODEL_WEIGHTS extends LogKey
423+
case object MODIFY_ACLS extends LogKey
424+
case object MODIFY_ACLS_GROUPS extends LogKey
420425
case object MODULE_NAME extends LogKey
426+
case object NAME extends LogKey
421427
case object NAMESPACE extends LogKey
422428
case object NETWORK_IF extends LogKey
423429
case object NEW_FEATURE_COLUMN_NAME extends LogKey
@@ -434,8 +440,10 @@ private[spark] object LogKeys {
434440
case object NUM_ADDED_PARTITIONS extends LogKey
435441
case object NUM_APPS extends LogKey
436442
case object NUM_ATTEMPT extends LogKey
443+
case object NUM_BATCHES extends LogKey
437444
case object NUM_BIN extends LogKey
438445
case object NUM_BLOCKS extends LogKey
446+
case object NUM_BLOCK_IDS extends LogKey
439447
case object NUM_BROADCAST_BLOCK extends LogKey
440448
case object NUM_BYTES extends LogKey
441449
case object NUM_BYTES_CURRENT extends LogKey
@@ -572,6 +580,7 @@ private[spark] object LogKeys {
572580
case object PATH extends LogKey
573581
case object PATHS extends LogKey
574582
case object PEER extends LogKey
583+
case object PENDING_TIMES extends LogKey
575584
case object PERCENT extends LogKey
576585
case object PIPELINE_STAGE_UID extends LogKey
577586
case object PLUGIN_NAME extends LogKey
@@ -656,6 +665,7 @@ private[spark] object LogKeys {
656665
case object RESOURCE_PROFILE_IDS extends LogKey
657666
case object RESOURCE_PROFILE_TO_TOTAL_EXECS extends LogKey
658667
case object RESPONSE_BODY_SIZE extends LogKey
668+
case object RESTART_TIME extends LogKey
659669
case object RESULT extends LogKey
660670
case object RESULT_SIZE_BYTES extends LogKey
661671
case object RESULT_SIZE_BYTES_MAX extends LogKey
@@ -669,6 +679,7 @@ private[spark] object LogKeys {
669679
case object RPC_ADDRESS extends LogKey
670680
case object RPC_ENDPOINT_REF extends LogKey
671681
case object RPC_MESSAGE_CAPACITY extends LogKey
682+
case object RPC_SSL_ENABLED extends LogKey
672683
case object RULE_NAME extends LogKey
673684
case object RUN_ID extends LogKey
674685
case object SCALA_VERSION extends LogKey
@@ -726,6 +737,7 @@ private[spark] object LogKeys {
726737
case object STAGE_ID extends LogKey
727738
case object STAGE_NAME extends LogKey
728739
case object START_INDEX extends LogKey
740+
case object START_TIME extends LogKey
729741
case object STATEMENT_ID extends LogKey
730742
case object STATE_STORE_ID extends LogKey
731743
case object STATE_STORE_PROVIDER extends LogKey
@@ -812,6 +824,7 @@ private[spark] object LogKeys {
812824
case object TRANSFER_TYPE extends LogKey
813825
case object TREE_NODE extends LogKey
814826
case object TRIGGER_INTERVAL extends LogKey
827+
case object UI_ACLS extends LogKey
815828
case object UI_FILTER extends LogKey
816829
case object UI_FILTER_PARAMS extends LogKey
817830
case object UI_PROXY_BASE extends LogKey
@@ -830,6 +843,8 @@ private[spark] object LogKeys {
830843
case object UUID extends LogKey
831844
case object VALUE extends LogKey
832845
case object VERSION_NUM extends LogKey
846+
case object VIEW_ACLS extends LogKey
847+
case object VIEW_ACLS_GROUPS extends LogKey
833848
case object VIRTUAL_CORES extends LogKey
834849
case object VOCAB_SIZE extends LogKey
835850
case object WAIT_RESULT_TIME extends LogKey

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
2626
import org.apache.kafka.clients.producer.ProducerConfig
2727
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
2828

29-
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3030
import org.apache.spark.kafka010.KafkaConfigUpdater
3131
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
3232
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -276,8 +276,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
276276
if (params.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
277277
logWarning(CUSTOM_GROUP_ID_ERROR_MESSAGE)
278278
if (params.contains(GROUP_ID_PREFIX)) {
279-
logWarning("Option 'groupIdPrefix' will be ignored as " +
280-
s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.")
279+
logWarning(log"Option groupIdPrefix will be ignored as " +
280+
log"option kafka.${MDC(LogKeys.CONFIG, ConsumerConfig.GROUP_ID_CONFIG)} has been set.")
281281
}
282282
}
283283

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.Text
2626
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2727

2828
import org.apache.spark.deploy.SparkHadoopUtil
29-
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3030
import org.apache.spark.internal.config._
3131
import org.apache.spark.internal.config.UI._
3232
import org.apache.spark.launcher.SparkLauncher
@@ -89,17 +89,18 @@ private[spark] class SecurityManager(
8989
private val sslRpcEnabled = sparkConf.getBoolean(
9090
"spark.ssl.rpc.enabled", false)
9191

92-
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
93-
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
94-
"; users with view permissions: " +
95-
(if (viewAcls.nonEmpty) viewAcls.mkString(", ") else "EMPTY") +
96-
"; groups with view permissions: " +
97-
(if (viewAclsGroups.nonEmpty) viewAclsGroups.mkString(", ") else "EMPTY") +
98-
"; users with modify permissions: " +
99-
(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
100-
"; groups with modify permissions: " +
101-
(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY") +
102-
"; RPC SSL " + (if (sslRpcEnabled) "enabled" else "disabled"))
92+
logInfo(log"SecurityManager: authentication ${MDC(LogKeys.AUTH_ENABLED,
93+
if (authOn) "enabled" else "disabled")}" +
94+
log"; ui acls ${MDC(LogKeys.UI_ACLS, if (aclsOn) "enabled" else "disabled")}" +
95+
log"; users with view permissions: ${MDC(LogKeys.VIEW_ACLS,
96+
if (viewAcls.nonEmpty) viewAcls.mkString(", ")
97+
else "EMPTY")} groups with view permissions: ${MDC(LogKeys.VIEW_ACLS_GROUPS,
98+
if (viewAclsGroups.nonEmpty) viewAclsGroups.mkString(", ") else "EMPTY")}" +
99+
log"; users with modify permissions: ${MDC(LogKeys.MODIFY_ACLS,
100+
if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY")}" +
101+
log"; groups with modify permissions: ${MDC(LogKeys.MODIFY_ACLS_GROUPS,
102+
if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY")}" +
103+
log"; RPC SSL ${MDC(LogKeys.RPC_SSL_ENABLED, if (sslRpcEnabled) "enabled" else "disabled")}")
103104

104105
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
105106
// the default SSL configuration - it will be used by all communication layers unless overwritten
@@ -213,7 +214,7 @@ private[spark] class SecurityManager(
213214

214215
def setAcls(aclSetting: Boolean): Unit = {
215216
aclsOn = aclSetting
216-
logInfo("Changing acls enabled to: " + aclsOn)
217+
logInfo(log"Changing acls enabled to: ${MDC(LogKeys.AUTH_ENABLED, aclsOn)}")
217218
}
218219

219220
def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ object SparkEnv extends Logging {
366366
name: String, endpointCreator: => RpcEndpoint):
367367
RpcEndpointRef = {
368368
if (isDriver) {
369-
logInfo("Registering " + name)
369+
logInfo(log"Registering ${MDC(LogKeys.ENDPOINT_NAME, name)}")
370370
rpcEnv.setupEndpoint(name, endpointCreator)
371371
} else {
372372
RpcUtils.makeDriverRef(name, conf, rpcEnv)

core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.Try
2626
import org.apache.hadoop.fs.FileSystem
2727

2828
import org.apache.spark.SparkConf
29-
import org.apache.spark.internal.{Logging, MDC}
29+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3030
import org.apache.spark.internal.LogKeys.PATH
3131
import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS
3232

@@ -66,7 +66,7 @@ private[spark] object ShutdownHookManager extends Logging {
6666
// shutdownDeletePaths as we are traversing through it.
6767
shutdownDeletePaths.toArray.foreach { dirPath =>
6868
try {
69-
logInfo("Deleting directory " + dirPath)
69+
logInfo(log"Deleting directory ${MDC(LogKeys.PATH, dirPath)}")
7070
Utils.deleteRecursively(new File(dirPath))
7171
} catch {
7272
case e: Exception =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
9292
logInfo("There is nothing to roll.")
9393
}
9494
case _ =>
95-
logWarning("This plugin expects " +
95+
logWarning(log"This plugin expects " +
9696
log"${MDC(CLASS_NAME, classOf[KubernetesClusterSchedulerBackend].getSimpleName)}.")
9797
}
9898
} catch {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919
import scala.collection.mutable
2020

2121
import org.apache.spark.SparkException
22-
import org.apache.spark.internal.Logging
22+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
2323
import org.apache.spark.sql.AnalysisException
2424
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
2525
import org.apache.spark.sql.catalyst.expressions._
@@ -913,11 +913,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
913913
val correlatedCols = AttributeSet(subqueryColumns)
914914
val invalidColsLegacy = groupByCols -- correlatedCols
915915
if (!nonEquivalentGroupByCols.isEmpty && invalidColsLegacy.isEmpty) {
916-
logWarning("Using legacy behavior for " +
917-
s"${SQLConf.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE
918-
.key}. Query would be rejected with non-legacy behavior but is allowed by " +
919-
s"legacy behavior. Query may be invalid and return wrong results if the scalar " +
920-
s"subquery's group-by outputs multiple rows.")
916+
logWarning(log"Using legacy behavior for " +
917+
log"${MDC(LogKeys.CONFIG, SQLConf
918+
.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE.key)}. " +
919+
log"Query would be rejected with non-legacy behavior but is allowed by " +
920+
log"legacy behavior. Query may be invalid and return wrong results if the scalar " +
921+
log"subquery's group-by outputs multiple rows.")
921922
}
922923
invalidColsLegacy
923924
}

streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
2222
import scala.collection.mutable
2323
import scala.collection.parallel.immutable.ParVector
2424

25-
import org.apache.spark.internal.Logging
25+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
2626
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
2727
import org.apache.spark.streaming.scheduler.Job
2828
import org.apache.spark.util.Utils
@@ -142,19 +142,19 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
142142
}
143143

144144
def updateCheckpointData(time: Time): Unit = {
145-
logInfo("Updating checkpoint data for time " + time)
145+
logInfo(log"Updating checkpoint data for time ${MDC(LogKeys.TIME, time)}")
146146
this.synchronized {
147147
outputStreams.foreach(_.updateCheckpointData(time))
148148
}
149-
logInfo("Updated checkpoint data for time " + time)
149+
logInfo(log"Updated checkpoint data for time ${MDC(LogKeys.TIME, time)}")
150150
}
151151

152152
def clearCheckpointData(time: Time): Unit = {
153-
logInfo("Clearing checkpoint data for time " + time)
153+
logInfo(log"Clearing checkpoint data for time ${MDC(LogKeys.TIME, time)}")
154154
this.synchronized {
155155
outputStreams.foreach(_.clearCheckpointData(time))
156156
}
157-
logInfo("Cleared checkpoint data for time " + time)
157+
logInfo(log"Cleared checkpoint data for time ${MDC(LogKeys.TIME, time)}")
158158
}
159159

160160
def restoreCheckpointData(): Unit = {

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.reflect.ClassTag
2424

2525
import org.apache.hadoop.fs.{FileSystem, Path}
2626

27-
import org.apache.spark.internal.{Logging, MDC}
27+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
2828
import org.apache.spark.internal.LogKeys.{PATH, TIME}
2929
import org.apache.spark.streaming.Time
3030
import org.apache.spark.util.Utils
@@ -89,7 +89,8 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
8989
fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
9090
}
9191
if (fileSystem.delete(path, true)) {
92-
logInfo("Deleted checkpoint file '" + file + "' for time " + time)
92+
logInfo(log"Deleted checkpoint file ${MDC(LogKeys.FILE_NAME, file)} " +
93+
log"for time ${MDC(LogKeys.TIME, time)}")
9394
} else {
9495
logWarning(log"Error deleting old checkpoint file '${MDC(PATH, file)}' for time " +
9596
log"${MDC(TIME, time)}")

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2727
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
2828

29-
import org.apache.spark.internal.LogKeys.{ELAPSED_TIME, PATH}
30-
import org.apache.spark.internal.MDC
29+
import org.apache.spark.internal.{LogKeys, MDC}
30+
import org.apache.spark.internal.LogKeys._
3131
import org.apache.spark.rdd.{RDD, UnionRDD}
3232
import org.apache.spark.streaming._
3333
import org.apache.spark.streaming.scheduler.StreamInputInfo
@@ -169,8 +169,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
169169
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
170170
batchTimeToSelectedFiles --= oldFiles.keys
171171
recentlySelectedFiles --= oldFiles.values.flatten
172-
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
173-
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
172+
logInfo(log"Cleared ${MDC(LogKeys.COUNT, oldFiles.size)} old files that were older " +
173+
log"than ${MDC(LogKeys.TIME, time - rememberDuration)}: " +
174+
log"${MDC(LogKeys.FILES, oldFiles.keys.mkString(", "))}")
174175
logDebug("Cleared files are:\n" +
175176
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
176177
}
@@ -341,8 +342,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
341342
hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
342343
case (t, f) =>
343344
// Restore the metadata in both files and generatedRDDs
344-
logInfo("Restoring files for time " + t + " - " +
345-
f.mkString("[", ", ", "]") )
345+
logInfo(log"Restoring files for time ${MDC(LogKeys.TIME, t)} - " +
346+
log"${MDC(LogKeys.FILES, f.mkString("[", ", ", "]"))}")
346347
batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
347348
recentlySelectedFiles ++= f
348349
generatedRDDs += ((t, filesToRDD(f.toImmutableArraySeq)))

0 commit comments

Comments
 (0)