Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-48629] Migrate the remaining code to structured logging framework
  • Loading branch information
panbingkun committed Jun 14, 2024
commit d734306ed0c0ee368ef823cda1129b82aef7fbfe
19 changes: 19 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ object LogKeys {
case object ACL_ENABLED extends LogKey
case object ACTUAL_NUM_FILES extends LogKey
case object ACTUAL_PARTITION_COLUMN extends LogKey
case object ACTUAL_TARGET_SIZE extends LogKey
case object ADDED_JARS extends LogKey
case object ADMIN_ACLS extends LogKey
case object ADMIN_ACL_GROUPS extends LogKey
case object ADVISORY_TARGET_SIZE extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
Expand All @@ -77,6 +79,8 @@ object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
case object ARTIFACTS extends LogKey
case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
case object BACKUP_FILE extends LogKey
Expand Down Expand Up @@ -271,6 +275,7 @@ object LogKeys {
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
case object FROM_TIME extends LogKey
case object FS_DATA_OUTPUT_STREAM extends LogKey
case object FUNCTION_NAME extends LogKey
case object FUNCTION_PARAM extends LogKey
case object GLOBAL_INIT_FILE extends LogKey
Expand Down Expand Up @@ -325,6 +330,7 @@ object LogKeys {
case object KEY2 extends LogKey
case object KEYTAB extends LogKey
case object KEYTAB_FILE extends LogKey
case object KILL_EXECUTORS extends LogKey
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
Expand All @@ -345,6 +351,7 @@ object LogKeys {
case object LOCAL_BLOCKS_SIZE extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
case object LOGICAL_PLAN extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
Expand Down Expand Up @@ -373,6 +380,7 @@ object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
case object MAX_SPLIT_BYTES extends LogKey
Expand All @@ -383,6 +391,7 @@ object LogKeys {
case object MEMORY_THRESHOLD_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
case object MESSAGE extends LogKey
case object METADATA extends LogKey
case object METADATA_DIRECTORY extends LogKey
case object METADATA_JSON extends LogKey
case object META_FILE extends LogKey
Expand All @@ -393,6 +402,7 @@ object LogKeys {
case object MINI_BATCH_FRACTION extends LogKey
case object MIN_COMPACTION_BATCH_ID extends LogKey
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_PARTITION_SIZE extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
Expand Down Expand Up @@ -528,6 +538,7 @@ object LogKeys {
case object OPEN_COST_IN_BYTES extends LogKey
case object OPERATION_HANDLE extends LogKey
case object OPERATION_HANDLE_IDENTIFIER extends LogKey
case object OPERATION_ID extends LogKey
case object OPTIMIZED_PLAN_COLUMNS extends LogKey
case object OPTIMIZER_CLASS_NAME extends LogKey
case object OPTIONS extends LogKey
Expand Down Expand Up @@ -568,6 +579,7 @@ object LogKeys {
case object POST_SCAN_FILTERS extends LogKey
case object PREDICATE extends LogKey
case object PREDICATES extends LogKey
case object PREFERRED_SERVICE_NAME extends LogKey
case object PREFIX extends LogKey
case object PRETTY_ID_STRING extends LogKey
case object PRINCIPAL extends LogKey
Expand Down Expand Up @@ -623,6 +635,7 @@ object LogKeys {
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
case object REQUEST_EXECUTORS extends LogKey
case object REQUEST_ID extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
Expand Down Expand Up @@ -659,24 +672,28 @@ object LogKeys {
case object SESSION_KEY extends LogKey
case object SET_CLIENT_INFO_REQUEST extends LogKey
case object SHARD_ID extends LogKey
case object SHORTER_SERVICE_NAME extends LogKey
case object SHORT_USER_NAME extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
case object SHUFFLE_DB_BACKEND_NAME extends LogKey
case object SHUFFLE_ID extends LogKey
case object SHUFFLE_IDS extends LogKey
case object SHUFFLE_MERGE_ID extends LogKey
case object SHUFFLE_MERGE_RECOVERY_FILE extends LogKey
case object SHUFFLE_SERVICE_CONF_OVERLAY_URL extends LogKey
case object SHUFFLE_SERVICE_METRICS_NAMESPACE extends LogKey
case object SHUFFLE_SERVICE_NAME extends LogKey
case object SIGMAS_LENGTH extends LogKey
case object SIGNAL extends LogKey
case object SINK extends LogKey
case object SIZE extends LogKey
case object SLEEP_TIME extends LogKey
case object SLIDE_DURATION extends LogKey
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
case object SOURCE extends LogKey
case object SOURCE_PATH extends LogKey
case object SPARK_BRANCH extends LogKey
case object SPARK_BUILD_DATE extends LogKey
Expand Down Expand Up @@ -772,6 +789,7 @@ object LogKeys {
case object TOTAL_SIZE extends LogKey
case object TOTAL_TIME extends LogKey
case object TOTAL_TIME_READ extends LogKey
case object TOTAL_WAIT_TIME extends LogKey
case object TO_TIME extends LogKey
case object TRAINING_SIZE extends LogKey
case object TRAIN_VALIDATION_SPLIT_METRIC extends LogKey
Expand All @@ -792,6 +810,7 @@ object LogKeys {
case object URL extends LogKey
case object URL2 extends LogKey
case object URLS extends LogKey
case object USER_CLASS extends LogKey
case object USER_ID extends LogKey
case object USER_NAME extends LogKey
case object UUID extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.ArrayImplicits._

/** Provides utility functions to be used inside SparkSubmit. */
Expand Down Expand Up @@ -215,7 +215,7 @@ private[spark] object MavenUtils extends Logging {
if (artifactInfo.getExt == "jar") {
true
} else {
logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
logInfo(log"Skipping non-jar dependency ${MDC(LogKeys.ARTIFACT_ID, artifactInfo.getId)}")
false
}
}
Expand Down Expand Up @@ -515,8 +515,9 @@ private[spark] object MavenUtils extends Logging {
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
s"attempt to retry while skipping local-m2-cache.")
logInfo(log"Download failed: " +
log"${MDC(LogKeys.ARTIFACTS, failedArtifacts.mkString("[", ", ", "]"))}, " +
log"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}

private[spark] trait SparkErrorUtils extends Logging {
/**
Expand Down Expand Up @@ -74,7 +74,8 @@ private[spark] trait SparkErrorUtils extends Logging {
} catch {
case t: Throwable if (originalThrowable != null && originalThrowable != t) =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
logWarning(
log"Suppressing exception in finally: ${MDC(LogKeys.MESSAGE, t.getMessage)}", t)
throw originalThrowable
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.File
import java.net.{URI, URISyntaxException}
import java.nio.file.Files

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.network.util.JavaUtils

private[spark] trait SparkFileUtils extends Logging {
Expand Down Expand Up @@ -77,12 +77,12 @@ private[spark] trait SparkFileUtils extends Logging {
// remove the check when we're sure that Files.createDirectories() would never fail silently.
Files.createDirectories(dir.toPath)
if ( !dir.exists() || !dir.isDirectory) {
logError(s"Failed to create directory " + dir)
logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}")
}
dir.isDirectory
} catch {
case e: Exception =>
logError(s"Failed to create directory " + dir, e)
logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}", e)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList
import scala.jdk.CollectionConverters._

import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan, StreamingQueryEventType}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.CloseableIterator
import org.apache.spark.sql.streaming.StreamingQueryListener.{Event, QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
Expand Down Expand Up @@ -115,7 +115,7 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
case StreamingQueryEventType.QUERY_TERMINATED_EVENT =>
postToAll(QueryTerminatedEvent.fromJson(event.getEventJson))
case _ =>
logWarning(s"Unknown StreamingQueryListener event: $event")
logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}")
}
})
}
Expand Down Expand Up @@ -144,11 +144,12 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
listener.onQueryIdle(t)
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
case _ => logWarning(s"Unknown StreamingQueryListener event: $event")
case _ =>
logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make our code format look pretty

}
} catch {
case e: Exception =>
logWarning(s"Listener $listener threw an exception", e)
logWarning(log"Listener ${MDC(LogKeys.LISTENER, listener)} threw an exception", e)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.connect.service.ExecuteHolder

/**
Expand Down Expand Up @@ -57,9 +57,10 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
case StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
logWarning(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Redundant server side listener added. Exiting.")
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Redundant server side listener added. Exiting.")
return
case false =>
// This transfers sending back the response to the client until
Expand All @@ -82,30 +83,35 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
.build())
} catch {
case NonFatal(e) =>
logError(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Error sending listener added response.",
e)
logError(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Error sending listener added response.", e)
listenerHolder.cleanUp()
return
}
}
logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Server side listener added. Now blocking until " +
"all client side listeners are removed or there is error transmitting the event back.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until " +
log"all client side listeners are removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Server side listener long-running handling thread ended.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
sessionHolder.streamingServersideListenerHolder.cleanUp()
case false =>
logWarning(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] No active server side listener bus listener " +
s"but received remove listener call. Exiting.")
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_ID, executeHolder.operationId)}] " +
log"No active server side listener bus listener but received remove listener call. " +
log"Exiting.")
return
}
case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[kinesis] class KinesisCheckpointer(
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
logWarning(log"Failed to checkpoint shardId ${MDC(SHARD_ID, shardId)} to DynamoDB.", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream

Expand All @@ -36,7 +36,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
s" an fs (path: $path) with abortable stream support")
}

logInfo(s"Writing atomically to $path based on abortable stream")
logInfo(log"Writing atomically to ${MDC(LogKeys.PATH, path)} based on abortable stream")

class AbortableStreamBasedFSDataOutputStream(
fsDataOutputStream: FSDataOutputStream,
Expand All @@ -53,7 +53,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
logWarning(s"Error cancelling write to $path (stream: $fsDataOutputStream)", e)
logWarning(log"Error cancelling write to ${MDC(LogKeys.PATH, path)} " +
log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, fsDataOutputStream)})", e)
} finally {
terminated = true
}
Expand All @@ -71,7 +72,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
logWarning(log"Error closing ${MDC(LogKeys.PATH, path)} " +
log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, fsDataOutputStream)})", e)
} finally {
terminated = true
}
Expand Down
Loading