Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
modify logkey names
  • Loading branch information
asl3 committed Jun 17, 2024
commit b65d9a675d705e60c9e8af96f1414753a94e7017
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object LogKeys {
case object ADDED_JARS extends LogKey
case object ADMIN_ACLS extends LogKey
case object ADMIN_ACL_GROUPS extends LogKey
case object ADVISORY_TARGET_SIZE extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALIGNED_FROM_TIME extends LogKey
case object ALIGNED_TO_TIME extends LogKey
Expand Down Expand Up @@ -199,7 +200,6 @@ object LogKeys {
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DESIRED_TREE_DEPTH extends LogKey
case object DESTINATION_PATH extends LogKey
case object DESTROY_SITE extends LogKey
case object DFS_FILE extends LogKey
case object DIFF_DELTA extends LogKey
case object DIRECTORY extends LogKey
Expand Down Expand Up @@ -282,7 +282,6 @@ object LogKeys {
case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FORMATTED_CODE extends LogKey
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
case object FROM_TIME extends LogKey
Expand Down Expand Up @@ -569,6 +568,7 @@ object LogKeys {
case object PARTITION_ID extends LogKey
case object PARTITION_IDS extends LogKey
case object PARTITION_SPECIFICATION extends LogKey
case object PARTITION_SIZE extends LogKey
case object PARTITION_SPECS extends LogKey
case object PATH extends LogKey
case object PATHS extends LogKey
Expand Down Expand Up @@ -617,6 +617,7 @@ object LogKeys {
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
case object RATE_LIMIT extends LogKey
case object RATIO extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
Expand Down Expand Up @@ -671,12 +672,10 @@ object LogKeys {
case object SCALA_VERSION extends LogKey
case object SCALING_UP_RATIO extends LogKey
case object SCALING_DOWN_RATIO extends LogKey
case object SCALING_INTERVAL_SECS extends LogKey
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEDULING_MODE extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
case object SENDER_ADDRESS extends LogKey
case object SERVER_NAME extends LogKey
case object SERVICE_NAME extends LogKey
case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
Expand Down Expand Up @@ -754,9 +753,9 @@ object LogKeys {
case object TABLE_TYPE extends LogKey
case object TABLE_TYPES extends LogKey
case object TAG extends LogKey
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
case object TARGET_SIZE extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_LOCALITY extends LogKey
Expand Down Expand Up @@ -785,6 +784,7 @@ object LogKeys {
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
case object TIMESTAMP extends LogKey
case object TIME_DURATION extends LogKey
case object TIME_UNITS extends LogKey
case object TIP extends LogKey
case object TOKEN extends LogKey
Expand Down Expand Up @@ -821,7 +821,6 @@ object LogKeys {
case object URL extends LogKey
case object URL2 extends LogKey
case object URLS extends LogKey
case object USER_CLASS extends LogKey
case object USER_ID extends LogKey
case object USER_NAME extends LogKey
case object UUID extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.Logging
import org.apache.spark.util.ArrayImplicits._

/** Provides utility functions to be used inside SparkSubmit. */
Expand Down Expand Up @@ -215,9 +215,7 @@ private[spark] object MavenUtils extends Logging {
if (artifactInfo.getExt == "jar") {
true
} else {
// scalastyle:off
logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
// scalastyle:on
false
}
}
Expand Down Expand Up @@ -517,9 +515,8 @@ private[spark] object MavenUtils extends Logging {
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
logInfo(log"Download failed: ${MDC(LogKeys.FAILED_ARTIFACTS,
failedArtifacts.mkString("[", ", ", "]"))}, " +
log"attempt to retry while skipping local-m2-cache.")
logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
s"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo
_isValid = false
_destroySite = Utils.getCallSite().shortForm
logInfo(log"Destroying ${MDC(LogKeys.BROADCAST, toString)} " +
log"(from ${MDC(LogKeys.DESTROY_SITE, _destroySite)})")
log"(from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, _destroySite)})")
doDestroy(blocking)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object UDTRegistration extends Serializable with Logging {
*/
def register(userClass: String, udtClass: String): Unit = {
if (udtMap.contains(userClass)) {
logWarning(log"Cannot register UDT for ${MDC(LogKeys.USER_CLASS, userClass)}, " +
logWarning(log"Cannot register UDT for ${MDC(LogKeys.CLASS_NAME, userClass)}, " +
log"which is already registered.")
} else {
// When register UDT with class name, we can't check if the UDT class is an UserDefinedType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1561,10 +1561,9 @@ object CodeGenerator extends Logging {
private def logGeneratedCode(code: CodeAndComment): Unit = {
val maxLines = SQLConf.get.loggingMaxLinesForCodegen
if (Utils.isTesting) {
logError(log"\n${MDC(LogKeys.FORMATTED_CODE,
CodeFormatter.format(code, MDC(LogKeys.MAX_LINES, maxLines)))}")
logError(s"\n${CodeFormatter.format(code, maxLines)}")
} else {
logInfo(log"\n${MDC(LogKeys.FORMATTED_CODE, CodeFormatter.format(code, maxLines))}")
logInfo(s"\n${CodeFormatter.format(code, maxLines)}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object ShufflePartitionsUtil extends Logging {

val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_ID, shuffleIds)}, advisory target size: " +
log"${MDC(LogKeys.TARGET_SIZE, advisoryTargetSize)}, actual target size " +
log"${MDC(LogKeys.ADVISORY_TARGET_SIZE, advisoryTargetSize)}, actual target size " +
log"${MDC(LogKeys.TARGET_SIZE, targetSize)}, minimum partition size: " +
log"${MDC(LogKeys.PARTITION_SIZE, minPartitionSize)}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ abstract class StreamExecution(
// Log logical plan at the start of the query to help debug issues related to
// plan changes.
logInfo(log"Finish initializing with logical plan:\n" +
log"${MDC(LogKeys.LOGICAL_PLAN_LEAVES, logicalPlan)}")
log"${MDC(LogKeys.QUERY_PLAN, logicalPlan)}")

// Unblock `awaitInitialization`
initializationLatch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, TTypeQualifiers, TTypeQualifierValue}

import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ class StreamingContext private[streaming] (
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN)
logInfo(log"Invoking stop(stopGracefully=" +
log"${MDC(LogKeys.STOP_SITE_SHORT_FORM, stopGracefully)}) from shutdown hook")
log"${MDC(LogKeys.VALUE, stopGracefully)}) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ abstract class DStream[T: ClassTag] (
logInfo(log"Time ${MDC(LogKeys.TIME, time)} is invalid as zeroTime is " +
log"${MDC(LogKeys.ZERO_TIME, zeroTime)}, slideDuration is " +
log"${MDC(LogKeys.SLIDE_DURATION, slideDuration)} and difference is " +
log"${MDC(LogKeys.TIME, time - zeroTime)}")
log"${MDC(LogKeys.TIME_DURATION, time - zeroTime)}")
false
} else {
logDebug(s"Time $time is valid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo(log"Stopped receiver ${MDC(LogKeys.RECEIVER_ID, streamId)}")
logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
}

override def createBlockGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[streaming] class ExecutorAllocationManager(
logInfo(log"ExecutorAllocationManager started with ratios = " +
log"[${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " +
log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}] and interval = " +
log"${MDC(LogKeys.SCALING_INTERVAL_SECS, scalingIntervalSecs)} sec")
log"${MDC(LogKeys.INTERVAL, scalingIntervalSecs)} sec")
}

def stop(): Unit = {
Expand Down Expand Up @@ -124,7 +124,7 @@ private[streaming] class ExecutorAllocationManager(
Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> targetTotalExecutors),
Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0),
Map.empty)
logInfo(log"Requested total ${MDC(LogKeys.TARGET_NUM_EXECUTOR,
logInfo(log"Requested total ${MDC(LogKeys.NUM_EXECUTORS,
targetTotalExecutors)} executors")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
logInfo(log"Registered receiver for stream ${MDC(LogKeys.STREAM_ID, streamId)} " +
log"from ${MDC(LogKeys.SENDER_ADDRESS, senderAddress)}")
log"from ${MDC(LogKeys.RPC_ADDRESS, senderAddress)}")
true
}
}
Expand Down