Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ case class MessageWithContext(message: String, context: java.util.HashMap[String
resultMap.putAll(mdc.context)
MessageWithContext(message + mdc.message, resultMap)
}

def ++(s: String): MessageWithContext = {
MessageWithContext(message + s, context)
}
}

object MDC {

implicit class StringImprovements(val s: String) {
def ++(mdc: MessageWithContext): MessageWithContext = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name cannot be called + and does not take effect, so it is called ++

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, in String ++ String, ++ is an alias for contact, which sounds reasonable.
image

MessageWithContext(s + mdc.message, mdc.context)
}
}
}

/**
Expand Down Expand Up @@ -96,17 +109,21 @@ trait Logging {
}

implicit class LogStringContext(val sc: StringContext) {
def log(args: MDC*): MessageWithContext = {
def log(args: Any*): MessageWithContext = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The args here is intended to MDC. So that we can enforce all the variables to be MDC.

val processedParts = sc.parts.iterator
val sb = new StringBuilder(processedParts.next())
val context = new java.util.HashMap[String, String]()

args.foreach { mdc =>
sb.append(mdc.value)
if (Logging.isStructuredLoggingEnabled) {
context.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value)
args.foreach { arg =>
arg match {
case MDC(k, v) =>
sb.append(v)
if (Logging.isStructuredLoggingEnabled) {
context.put(k.toString.toLowerCase(Locale.ROOT), v)
}
case v: Any =>
sb.append(v.toString)
}

if (processedParts.hasNext) {
sb.append(processedParts.next())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll {
override def expectedPatternForMsgWithMDCAndException(level: Level): String =
s""".*$level $className: Error in executor 1.\njava.lang.RuntimeException: OOM\n.*"""

override def expectedPatternForVariableAndMDC(level: Level): String = {
s""".*$level $className: Hello This is a log message, Lost executor 1.\n"""
}

override def expectedPatternForConcatStringAndMDC(level: Level): String = {
s""".*$level $className: Hello This is a log message, Lost executor 1.\n"""
}

override def expectedPatternForConcatMDCAndString(level: Level): String = {
s""".*$level $className: Lost executor 1. Hello This is a log message.\n"""
}

override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = {
val pattern =
s""".*$level $className: Min Size: 2, Max Size: 4. Please double check.\n"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

import org.apache.spark.internal.{LogEntry, Logging, MDC}
import org.apache.spark.internal.LogKey.{EXECUTOR_ID, MAX_SIZE, MIN_SIZE}
import org.apache.spark.internal.MDC._

trait LoggingSuiteBase
extends AnyFunSuite // scalastyle:ignore funsuite
Expand Down Expand Up @@ -63,12 +64,27 @@ trait LoggingSuiteBase
log"Max Size: ${MDC(MAX_SIZE, "4")}. " +
log"Please double check."

def logVariableAndMDC: LogEntry =
log"Hello $basicMsg, Lost executor ${MDC(EXECUTOR_ID, "1")}."

def concatMDCAndString: LogEntry =
(log"Lost executor ${MDC(EXECUTOR_ID, "1")}." ++ s" Hello $basicMsg.")

def concatStringAndMDC: LogEntry =
(s"Hello $basicMsg, " ++ log"Lost executor ${MDC(EXECUTOR_ID, "1")}.")

def expectedPatternForMsgWithMDC(level: Level): String

def expectedPatternForMsgWithMDCAndException(level: Level): String

def verifyMsgWithConcat(level: Level, logOutput: String): Unit

def expectedPatternForVariableAndMDC(level: Level): String

def expectedPatternForConcatStringAndMDC(level: Level): String

def expectedPatternForConcatMDCAndString(level: Level): String

test("Basic logging") {
Seq(
(Level.ERROR, () => logError(basicMsg)),
Expand Down Expand Up @@ -112,6 +128,39 @@ trait LoggingSuiteBase
verifyMsgWithConcat(level, logOutput)
}
}

test("Logging variable and MDC") {
Seq(
(Level.ERROR, () => logError(logVariableAndMDC)),
(Level.WARN, () => logWarning(logVariableAndMDC)),
(Level.INFO, () => logInfo(logVariableAndMDC))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForVariableAndMDC(level).r.matches(logOutput))
}
}

test("Logging concat string and MDC") {
Seq(
(Level.ERROR, () => logError(concatStringAndMDC)),
(Level.WARN, () => logWarning(concatStringAndMDC)),
(Level.INFO, () => logInfo(concatStringAndMDC))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForConcatStringAndMDC(level).r.matches(logOutput))
}
}

test("Logging concat MDC and string") {
Seq(
(Level.ERROR, () => logError(concatMDCAndString)),
(Level.WARN, () => logWarning(concatMDCAndString)),
(Level.INFO, () => logInfo(concatMDCAndString))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForConcatMDCAndString(level).r.matches(logOutput))
}
}
}

class StructuredLoggingSuite extends LoggingSuiteBase {
Expand Down Expand Up @@ -170,6 +219,48 @@ class StructuredLoggingSuite extends LoggingSuiteBase {
}""")
}

override def expectedPatternForVariableAndMDC(level: Level): String = {
compactAndToRegexPattern(
s"""
{
"ts": "<timestamp>",
"level": "$level",
"msg": "Hello This is a log message, Lost executor 1.",
"context": {
"executor_id": "1"
},
"logger": "$className"
}""")
}

override def expectedPatternForConcatStringAndMDC(level: Level): String = {
compactAndToRegexPattern(
s"""
{
"ts": "<timestamp>",
"level": "$level",
"msg": "Hello This is a log message, Lost executor 1.",
"context": {
"executor_id": "1"
},
"logger": "$className"
}""")
}

override def expectedPatternForConcatMDCAndString(level: Level): String = {
compactAndToRegexPattern(
s"""
{
"ts": "<timestamp>",
"level": "$level",
"msg": "Lost executor 1. Hello This is a log message.",
"context": {
"executor_id": "1"
},
"logger": "$className"
}""")
}

override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = {
val pattern1 = compactAndToRegexPattern(
s"""
Expand Down